Source code for ecoli.processes.chromosome_structure

"""
====================
Chromosome Structure
====================

- Resolve collisions between molecules and replication forks on the chromosome.
- Remove and replicate promoters and motifs that are traversed by replisomes.
- Reset the boundaries and linking numbers of chromosomal segments.
"""

import numpy as np
import numpy.typing as npt
import warnings
from vivarium.core.process import Step
from vivarium.core.composer import Composer
from vivarium.core.engine import Engine

from ecoli.processes.global_clock import GlobalClock
from ecoli.processes.unique_update import UniqueUpdate
from ecoli.processes.registries import topology_registry
from ecoli.library.schema import (
    listener_schema,
    numpy_schema,
    attrs,
    bulk_name_to_idx,
    get_free_indices,
)
from ecoli.library.json_state import get_state_from_file
from wholecell.utils.polymerize import buildSequences

# Register default topology for this process, associating it with process name
NAME = "ecoli-chromosome-structure"
TOPOLOGY = {
    "bulk": ("bulk",),
    "listeners": ("listeners",),
    "active_replisomes": (
        "unique",
        "active_replisome",
    ),
    "oriCs": (
        "unique",
        "oriC",
    ),
    "chromosome_domains": (
        "unique",
        "chromosome_domain",
    ),
    "active_RNAPs": ("unique", "active_RNAP"),
    "RNAs": ("unique", "RNA"),
    "active_ribosome": ("unique", "active_ribosome"),
    "full_chromosomes": (
        "unique",
        "full_chromosome",
    ),
    "promoters": ("unique", "promoter"),
    "DnaA_boxes": ("unique", "DnaA_box"),
    "genes": ("unique", "gene"),
    "chromosomal_segments": ("unique", "chromosomal_segment"),
    "global_time": ("global_time",),
    "timestep": ("timestep",),
    "next_update_time": ("next_update_time", "chromosome_structure"),
}
topology_registry.register(NAME, TOPOLOGY)


[docs] class ChromosomeStructure(Step): """Chromosome Structure Process""" name = NAME topology = TOPOLOGY defaults = { # Load parameters "rna_sequences": [], "protein_sequences": [], "n_TUs": 1, "n_TFs": 1, "n_amino_acids": 1, "n_fragment_bases": 1, "replichore_lengths": [0, 0], "relaxed_DNA_base_pairs_per_turn": 1, "terC_index": -1, "calculate_superhelical_densities": False, # Get placeholder value for chromosome domains without children "no_child_place_holder": -1, # Load bulk molecule views "inactive_RNAPs": [], "fragmentBases": [], "ppi": "ppi", "active_tfs": [], "ribosome_30S_subunit": "30S", "ribosome_50S_subunit": "50S", "amino_acids": [], "water": "water", "seed": 0, "emit_unique": False, "rna_ids": [], "n_mature_rnas": 0, "mature_rna_ids": [], "mature_rna_end_positions": [], "mature_rna_nt_counts": [], "unprocessed_rna_index_mapping": {}, "time_step": 1.0, } # Constructor def __init__(self, parameters=None): super().__init__(parameters) self.rna_sequences = self.parameters["rna_sequences"] self.protein_sequences = self.parameters["protein_sequences"] self.n_TUs = self.parameters["n_TUs"] self.n_TFs = self.parameters["n_TFs"] self.rna_ids = self.parameters["rna_ids"] self.n_amino_acids = self.parameters["n_amino_acids"] self.n_fragment_bases = self.parameters["n_fragment_bases"] replichore_lengths = self.parameters["replichore_lengths"] self.min_coordinates = -replichore_lengths[1] self.max_coordinates = replichore_lengths[0] self.relaxed_DNA_base_pairs_per_turn = self.parameters[ "relaxed_DNA_base_pairs_per_turn" ] self.terC_index = self.parameters["terC_index"] self.n_mature_rnas = self.parameters["n_mature_rnas"] self.mature_rna_ids = self.parameters["mature_rna_ids"] self.mature_rna_end_positions = self.parameters["mature_rna_end_positions"] self.mature_rna_nt_counts = self.parameters["mature_rna_nt_counts"] self.unprocessed_rna_index_mapping = self.parameters[ "unprocessed_rna_index_mapping" ] # Load sim options self.calculate_superhelical_densities = self.parameters[ "calculate_superhelical_densities" ] # Get placeholder value for chromosome domains without children self.no_child_place_holder = self.parameters["no_child_place_holder"] self.inactive_RNAPs = self.parameters["inactive_RNAPs"] self.fragmentBases = self.parameters["fragmentBases"] self.ppi = self.parameters["ppi"] self.active_tfs = self.parameters["active_tfs"] self.ribosome_30S_subunit = self.parameters["ribosome_30S_subunit"] self.ribosome_50S_subunit = self.parameters["ribosome_50S_subunit"] self.amino_acids = self.parameters["amino_acids"] self.water = self.parameters["water"] self.inactive_RNAPs_idx = None self.emit_unique = self.parameters.get("emit_unique", True)
[docs] def ports_schema(self): ports = { "listeners": { "rnap_data": listener_schema( { "n_total_collisions": 0, "n_headon_collisions": 0, "n_codirectional_collisions": 0, "headon_collision_coordinates": [], "codirectional_collision_coordinates": [], "n_removed_ribosomes": 0, "incomplete_transcription_events": ( np.zeros(self.n_TUs, np.int64), self.rna_ids, ), "n_empty_fork_collisions": 0, "empty_fork_collision_coordinates": [], } ) }, "bulk": numpy_schema("bulk"), # Unique molecules "active_replisomes": numpy_schema( "active_replisomes", emit=self.parameters["emit_unique"] ), "oriCs": numpy_schema("oriCs", emit=self.parameters["emit_unique"]), "chromosome_domains": numpy_schema( "chromosome_domains", emit=self.parameters["emit_unique"] ), "active_RNAPs": numpy_schema( "active_RNAPs", emit=self.parameters["emit_unique"] ), "RNAs": numpy_schema("RNAs", emit=self.parameters["emit_unique"]), "active_ribosome": numpy_schema( "active_ribosome", emit=self.parameters["emit_unique"] ), "full_chromosomes": numpy_schema( "full_chromosomes", emit=self.parameters["emit_unique"] ), "promoters": numpy_schema("promoters", emit=self.parameters["emit_unique"]), "DnaA_boxes": numpy_schema( "DnaA_boxes", emit=self.parameters["emit_unique"] ), "chromosomal_segments": numpy_schema( "chromosomal_segments", emit=self.parameters["emit_unique"] ), "genes": numpy_schema("genes", emit=self.parameters["emit_unique"]), "global_time": {"_default": 0.0}, "timestep": {"_default": self.parameters["time_step"]}, "next_update_time": { "_default": self.parameters["time_step"], "_updater": "set", "_divider": "set", }, } return ports
[docs] def update_condition(self, timestep, states): """ See :py:meth:`~ecoli.processes.partition.Requester.update_condition`. """ if states["next_update_time"] <= states["global_time"]: if states["next_update_time"] < states["global_time"]: warnings.warn( f"{self.name} updated at t=" f"{states['global_time']} instead of t=" f"{states['next_update_time']}. Decrease the " "timestep for the global clock process for more " "accurate timekeeping." ) return True return False
[docs] def next_update(self, timestep, states): # At t=0, convert all strings to indices if self.inactive_RNAPs_idx is None: self.fragmentBasesIdx = bulk_name_to_idx( self.fragmentBases, states["bulk"]["id"] ) self.active_tfs_idx = bulk_name_to_idx( self.active_tfs, states["bulk"]["id"] ) self.ribosome_30S_subunit_idx = bulk_name_to_idx( self.ribosome_30S_subunit, states["bulk"]["id"] ) self.ribosome_50S_subunit_idx = bulk_name_to_idx( self.ribosome_50S_subunit, states["bulk"]["id"] ) self.amino_acids_idx = bulk_name_to_idx( self.amino_acids, states["bulk"]["id"] ) self.water_idx = bulk_name_to_idx(self.water, states["bulk"]["id"]) self.ppi_idx = bulk_name_to_idx(self.ppi, states["bulk"]["id"]) self.inactive_RNAPs_idx = bulk_name_to_idx( self.inactive_RNAPs, states["bulk"]["id"] ) self.mature_rna_idx = bulk_name_to_idx( self.mature_rna_ids, states["bulk"]["id"] ) # Read unique molecule attributes (replisome_domain_indexes, replisome_coordinates, replisome_unique_indexes) = ( attrs( states["active_replisomes"], ["domain_index", "coordinates", "unique_index"], ) ) (all_chromosome_domain_indexes, child_domains) = attrs( states["chromosome_domains"], ["domain_index", "child_domains"] ) ( RNAP_domain_indexes, RNAP_coordinates, RNAP_is_forward, RNAP_unique_indexes, ) = attrs( states["active_RNAPs"], ["domain_index", "coordinates", "is_forward", "unique_index"], ) (origin_domain_indexes,) = attrs(states["oriCs"], ["domain_index"]) (mother_domain_indexes,) = attrs(states["full_chromosomes"], ["domain_index"]) (RNA_TU_indexes, transcript_lengths, RNA_RNAP_indexes, RNA_unique_indexes) = ( attrs( states["RNAs"], ["TU_index", "transcript_length", "RNAP_index", "unique_index"], ) ) (ribosome_protein_indexes, ribosome_peptide_lengths, ribosome_mRNA_indexes) = ( attrs( states["active_ribosome"], ["protein_index", "peptide_length", "mRNA_index"], ) ) ( promoter_TU_indexes, promoter_domain_indexes, promoter_coordinates, promoter_bound_TFs, ) = attrs( states["promoters"], ["TU_index", "domain_index", "coordinates", "bound_TF"] ) (gene_cistron_indexes, gene_domain_indexes, gene_coordinates) = attrs( states["genes"], ["cistron_index", "domain_index", "coordinates"] ) (DnaA_box_domain_indexes, DnaA_box_coordinates, DnaA_box_bound) = attrs( states["DnaA_boxes"], ["domain_index", "coordinates", "DnaA_bound"] ) # Build dictionary of replisome coordinates with domain indexes as keys replisome_coordinates_from_domains = { domain_index: replisome_coordinates[ replisome_domain_indexes == domain_index ] for domain_index in np.unique(replisome_domain_indexes) } def get_removed_molecules_mask(domain_indexes, coordinates): """ Computes the boolean mask of unique molecules that should be removed based on the progression of the replication forks. """ mask = np.zeros_like(domain_indexes, dtype=np.bool_) # Loop through all domains for domain_index in np.unique(domain_indexes): # Domain has active replisomes if domain_index in replisome_coordinates_from_domains: domain_replisome_coordinates = replisome_coordinates_from_domains[ domain_index ] # Get mask for molecules on this domain that are out of range # It's rare but we have to remove molecules at the exact same # coordinates as the replisomes as well so that they do not break # the chromosome segment calculations if they are removed by a # different process (hence, >= and <= instead of > and <) domain_mask = np.logical_and.reduce( ( domain_indexes == domain_index, coordinates >= domain_replisome_coordinates.min(), coordinates <= domain_replisome_coordinates.max(), ) ) # Domain has no active replisomes else: children_of_domain = child_domains[ all_chromosome_domain_indexes == domain_index ] # Child domains are full chromosomes (domain has finished replicating) if np.all(np.isin(children_of_domain, mother_domain_indexes)): # Remove all molecules on this domain domain_mask = domain_indexes == domain_index # Domain has not started replication or replication was interrupted else: continue mask[domain_mask] = True return mask # Build mask for molecules that should be removed removed_RNAPs_mask = get_removed_molecules_mask( RNAP_domain_indexes, RNAP_coordinates ) removed_promoters_mask = get_removed_molecules_mask( promoter_domain_indexes, promoter_coordinates ) removed_genes_mask = get_removed_molecules_mask( gene_domain_indexes, gene_coordinates ) removed_DnaA_boxes_mask = get_removed_molecules_mask( DnaA_box_domain_indexes, DnaA_box_coordinates ) # Build masks for head-on and co-directional collisions between RNAPs # and replication forks RNAP_headon_collision_mask = np.logical_and( removed_RNAPs_mask, np.logical_xor(RNAP_is_forward, RNAP_coordinates > 0) ) RNAP_codirectional_collision_mask = np.logical_and( removed_RNAPs_mask, np.logical_not(RNAP_headon_collision_mask) ) n_total_collisions = np.count_nonzero(removed_RNAPs_mask) n_headon_collisions = np.count_nonzero(RNAP_headon_collision_mask) n_codirectional_collisions = np.count_nonzero(RNAP_codirectional_collision_mask) # Write values to listeners update = { "listeners": { "rnap_data": { "n_total_collisions": n_total_collisions, "n_headon_collisions": n_headon_collisions, "n_codirectional_collisions": n_codirectional_collisions, "headon_collision_coordinates": RNAP_coordinates[ RNAP_headon_collision_mask ], "codirectional_collision_coordinates": RNAP_coordinates[ RNAP_codirectional_collision_mask ], } }, "bulk": [], "active_replisomes": {}, "oriCs": {}, "chromosome_domains": {}, "active_RNAPs": {}, "RNAs": {}, "active_ribosome": {}, "full_chromosomes": {}, "chromosomal_segments": {}, "promoters": {}, "genes": {}, "DnaA_boxes": {}, } if self.calculate_superhelical_densities: # Get attributes of existing segments ( boundary_molecule_indexes, boundary_coordinates, segment_domain_indexes, linking_numbers, ) = attrs( states["chromosomal_segments"], [ "boundary_molecule_indexes", "boundary_coordinates", "domain_index", "linking_number", ], ) # Initialize new attributes of chromosomal segments all_new_boundary_molecule_indexes = np.empty((0, 2), dtype=np.int64) all_new_boundary_coordinates = np.empty((0, 2), dtype=np.int64) all_new_segment_domain_indexes = np.array([], dtype=np.int32) all_new_linking_numbers = np.array([], dtype=np.float64) # Iteratively tally RNAPs that were removed due to collisions with # replication forks with or without replisomes on each domain removed_RNAP_masks_all_domains = np.full_like(removed_RNAPs_mask, False) for domain_index in np.unique(all_chromosome_domain_indexes): # Skip domains that have completed replication if np.all(domain_index < mother_domain_indexes): continue domain_spans_oriC = domain_index in origin_domain_indexes domain_spans_terC = domain_index in mother_domain_indexes # Parse attributes of remaining RNAPs in this domain RNAPs_domain_mask = RNAP_domain_indexes == domain_index RNAP_coordinates_this_domain = RNAP_coordinates[RNAPs_domain_mask] RNAP_unique_indexes_this_domain = RNAP_unique_indexes[RNAPs_domain_mask] domain_remaining_RNAPs_mask = ~removed_RNAPs_mask[RNAPs_domain_mask] # Parse attributes of segments in this domain segments_domain_mask = segment_domain_indexes == domain_index boundary_molecule_indexes_this_domain = boundary_molecule_indexes[ segments_domain_mask, : ] boundary_coordinates_this_domain = boundary_coordinates[ segments_domain_mask, : ] linking_numbers_this_domain = linking_numbers[segments_domain_mask] new_molecule_coordinates_this_domain = np.array([], dtype=np.int64) new_molecule_indexes_this_domain = np.array([], dtype=np.int64) # Append coordinates and indexes of replisomes on this domain, # if any if not domain_spans_oriC: replisome_domain_mask = replisome_domain_indexes == domain_index replisome_coordinates_this_domain = replisome_coordinates[ replisome_domain_mask ] replisome_molecule_indexes_this_domain = replisome_unique_indexes[ replisome_domain_mask ] # If one or more replisomes was removed in the last time step, # use the last known location and molecule index. if len(replisome_molecule_indexes_this_domain) != 2: assert len(replisome_molecule_indexes_this_domain) < 2 ( replisome_coordinates_this_domain, replisome_molecule_indexes_this_domain, ) = get_last_known_replisome_data( boundary_coordinates_this_domain, boundary_molecule_indexes_this_domain, replisome_coordinates_this_domain, replisome_molecule_indexes_this_domain, ) # Assume that RNAPs that run into a replication fork # are removed even if there is no replisome RNAPs_on_forks = np.isin( RNAP_coordinates_this_domain, replisome_coordinates_this_domain, ) domain_remaining_RNAPs_mask = np.logical_and( domain_remaining_RNAPs_mask, ~RNAPs_on_forks ) full_removed_RNAPs_mask = np.full_like( removed_RNAPs_mask, False ) full_removed_RNAPs_mask[RNAPs_domain_mask] = RNAPs_on_forks removed_RNAP_masks_all_domains = np.logical_or( removed_RNAP_masks_all_domains, full_removed_RNAPs_mask ) new_molecule_coordinates_this_domain = np.concatenate( ( new_molecule_coordinates_this_domain, replisome_coordinates_this_domain, ) ) new_molecule_indexes_this_domain = np.concatenate( ( new_molecule_indexes_this_domain, replisome_molecule_indexes_this_domain, ) ) # Append coordinates and indexes of parent domain replisomes, # if any if not domain_spans_terC: parent_domain_index = all_chromosome_domain_indexes[ np.where(child_domains == domain_index)[0][0] ] replisome_parent_domain_mask = ( replisome_domain_indexes == parent_domain_index ) replisome_coordinates_parent_domain = replisome_coordinates[ replisome_parent_domain_mask ] replisome_molecule_indexes_parent_domain = replisome_unique_indexes[ replisome_parent_domain_mask ] # If one or more replisomes was removed in the last time step, # use the last known location and molecule index. if len(replisome_molecule_indexes_parent_domain) != 2: assert len(replisome_molecule_indexes_parent_domain) < 2 # Parse attributes of segments in parent domain parent_segments_domain_mask = ( segment_domain_indexes == parent_domain_index ) boundary_molecule_indexes_parent_domain = ( boundary_molecule_indexes[parent_segments_domain_mask, :] ) boundary_coordinates_parent_domain = boundary_coordinates[ parent_segments_domain_mask, : ] ( replisome_coordinates_parent_domain, replisome_molecule_indexes_parent_domain, ) = get_last_known_replisome_data( boundary_coordinates_parent_domain, boundary_molecule_indexes_parent_domain, replisome_coordinates_parent_domain, replisome_molecule_indexes_parent_domain, ) # Assume that RNAPs that run into a replication fork # are removed even if there is no replisome RNAPs_on_forks = np.isin( RNAP_coordinates_this_domain, replisome_coordinates_parent_domain, ) domain_remaining_RNAPs_mask = np.logical_and( domain_remaining_RNAPs_mask, ~RNAPs_on_forks ) full_removed_RNAPs_mask = np.full_like( removed_RNAPs_mask, False ) full_removed_RNAPs_mask[RNAPs_domain_mask] = RNAPs_on_forks removed_RNAP_masks_all_domains = np.logical_or( removed_RNAP_masks_all_domains, full_removed_RNAPs_mask ) new_molecule_coordinates_this_domain = np.concatenate( ( new_molecule_coordinates_this_domain, replisome_coordinates_parent_domain, ) ) new_molecule_indexes_this_domain = np.concatenate( ( new_molecule_indexes_this_domain, replisome_molecule_indexes_parent_domain, ) ) # Add remaining RNAPs in this domain after accounting for removals # due to collisions with replication forks new_molecule_coordinates_this_domain = np.concatenate( ( new_molecule_coordinates_this_domain, RNAP_coordinates_this_domain[domain_remaining_RNAPs_mask], ) ) new_molecule_indexes_this_domain = np.concatenate( ( new_molecule_indexes_this_domain, RNAP_unique_indexes_this_domain[domain_remaining_RNAPs_mask], ) ) # If there are no molecules left on this domain, continue if len(new_molecule_indexes_this_domain) == 0: continue # Calculate attributes of new segments new_segment_attrs = self._compute_new_segment_attributes( boundary_molecule_indexes_this_domain, boundary_coordinates_this_domain, linking_numbers_this_domain, new_molecule_indexes_this_domain, new_molecule_coordinates_this_domain, domain_spans_oriC, domain_spans_terC, ) # Append to existing array of new segment attributes all_new_boundary_molecule_indexes = np.vstack( ( all_new_boundary_molecule_indexes, new_segment_attrs["boundary_molecule_indexes"], ) ) all_new_boundary_coordinates = np.vstack( ( all_new_boundary_coordinates, new_segment_attrs["boundary_coordinates"], ) ) all_new_segment_domain_indexes = np.concatenate( ( all_new_segment_domain_indexes, np.full( len(new_segment_attrs["linking_numbers"]), domain_index, dtype=np.int32, ), ) ) all_new_linking_numbers = np.concatenate( (all_new_linking_numbers, new_segment_attrs["linking_numbers"]) ) # Delete all existing chromosomal segments if len(boundary_molecule_indexes) > 0: update["chromosomal_segments"].update( {"delete": np.arange(len(boundary_molecule_indexes))} ) # Add new chromosomal segments update["chromosomal_segments"].update( { "add": { "boundary_molecule_indexes": all_new_boundary_molecule_indexes, "boundary_coordinates": all_new_boundary_coordinates, "domain_index": all_new_segment_domain_indexes, "linking_number": all_new_linking_numbers, } } ) # Figure out if any additional RNAPs were removed due to collisions with # replication forks where there were no replisomes empty_fork_RNAP_collision_mask = np.logical_and( removed_RNAP_masks_all_domains, np.logical_not( np.logical_or( RNAP_headon_collision_mask, RNAP_codirectional_collision_mask ) ), ) update["listeners"]["rnap_data"].update( { "n_empty_fork_collisions": empty_fork_RNAP_collision_mask.sum(), "empty_fork_collision_coordinates": RNAP_coordinates[ empty_fork_RNAP_collision_mask ], } ) # Get mask for RNAs that are transcribed from removed RNAPs removed_RNAs_mask = np.isin( RNA_RNAP_indexes, RNAP_unique_indexes[removed_RNAPs_mask] ) # Initialize counts of incomplete transcription events incomplete_transcription_event = np.zeros(self.n_TUs) # Remove RNAPs and RNAs that have collided with replisomes if n_total_collisions > 0: if removed_RNAPs_mask.sum() > 0: update["active_RNAPs"].update( {"delete": np.where(removed_RNAPs_mask)[0]} ) if removed_RNAs_mask.sum() > 0: update["RNAs"].update({"delete": np.where(removed_RNAs_mask)[0]}) # Increment counts of inactive RNAPs update["bulk"].append((self.inactive_RNAPs_idx, n_total_collisions)) # Get sequences of incomplete transcripts incomplete_sequence_lengths = transcript_lengths[removed_RNAs_mask] n_initiated_sequences = np.count_nonzero(incomplete_sequence_lengths) n_ppi_added = n_initiated_sequences if n_initiated_sequences > 0: incomplete_rna_indexes = RNA_TU_indexes[removed_RNAs_mask] incomplete_transcription_event = np.bincount( incomplete_rna_indexes, minlength=self.n_TUs ) incomplete_sequences = buildSequences( self.rna_sequences, incomplete_rna_indexes, np.zeros(n_total_collisions, dtype=np.int64), np.full(n_total_collisions, incomplete_sequence_lengths.max()), ) mature_rna_counts = np.zeros(self.n_mature_rnas, dtype=np.int64) base_counts = np.zeros(self.n_fragment_bases, dtype=np.int64) for ri, sl, seq in zip( incomplete_rna_indexes, incomplete_sequence_lengths, incomplete_sequences, ): # Check if incomplete RNA is an unprocessed RNA if ri in self.unprocessed_rna_index_mapping: # Find mature RNA molecules that would need to be added # given the length of the incomplete RNA mature_rna_end_pos = self.mature_rna_end_positions[ :, self.unprocessed_rna_index_mapping[ri] ] mature_rnas_produced = np.logical_and( mature_rna_end_pos != 0, mature_rna_end_pos < sl ) # Increment counts of mature RNAs mature_rna_counts += mature_rnas_produced # Increment counts of fragment NTPs, but exclude bases # that are part of the mature RNAs generated base_counts += np.bincount( seq[:sl], minlength=self.n_fragment_bases ) - self.mature_rna_nt_counts[mature_rnas_produced, :].sum( axis=0 ) # Exclude ppi molecules that are part of mature RNAs n_ppi_added -= mature_rnas_produced.sum() else: base_counts += np.bincount( seq[:sl], minlength=self.n_fragment_bases ) base_counts += np.bincount( seq[:sl], minlength=self.n_fragment_bases ) # Increment counts of mature RNAs, fragment NTPs and phosphates update["bulk"].append((self.mature_rna_idx, mature_rna_counts)) update["bulk"].append((self.fragmentBasesIdx, base_counts)) update["bulk"].append((self.ppi_idx, n_ppi_added)) assert n_initiated_sequences == incomplete_transcription_event.sum() update["listeners"]["rnap_data"]["incomplete_transcription_event"] = ( incomplete_transcription_event ) # Get mask for ribosomes that are bound to nonexisting mRNAs remaining_RNA_unique_indexes = RNA_unique_indexes[ np.logical_not(removed_RNAs_mask) ] removed_ribosomes_mask = np.logical_not( np.isin(ribosome_mRNA_indexes, remaining_RNA_unique_indexes) ) n_removed_ribosomes = np.count_nonzero(removed_ribosomes_mask) # Remove ribosomes that are bound to missing RNA molecules. This # includes both RNAs removed by this function and RNAs removed # by other processes (e.g. RNA degradation). if n_removed_ribosomes > 0: update["active_ribosome"].update( {"delete": np.where(removed_ribosomes_mask)[0]} ) # Increment counts of inactive ribosomal subunits update["bulk"].extend( [ (self.ribosome_30S_subunit_idx, n_removed_ribosomes), (self.ribosome_50S_subunit_idx, n_removed_ribosomes), ] ) # Get amino acid sequences of incomplete polypeptides incomplete_sequence_lengths = ribosome_peptide_lengths[ removed_ribosomes_mask ] n_initiated_sequences = np.count_nonzero(incomplete_sequence_lengths) if n_initiated_sequences > 0: incomplete_sequences = buildSequences( self.protein_sequences, ribosome_protein_indexes[removed_ribosomes_mask], np.zeros(n_removed_ribosomes, dtype=np.int64), np.full(n_removed_ribosomes, incomplete_sequence_lengths.max()), ) amino_acid_counts = np.zeros(self.n_amino_acids, dtype=np.int64) for sl, seq in zip(incomplete_sequence_lengths, incomplete_sequences): amino_acid_counts += np.bincount( seq[:sl], minlength=self.n_amino_acids ) # Increment counts of free amino acids and decrease counts of # free water molecules update["bulk"].append((self.amino_acids_idx, amino_acid_counts)) update["bulk"].append( ( self.water_idx, (n_initiated_sequences - incomplete_sequence_lengths.sum()), ) ) # Write to listener update["listeners"]["rnap_data"]["n_removed_ribosomes"] = n_removed_ribosomes def get_replicated_motif_attributes(old_coordinates, old_domain_indexes): """ Computes the attributes of replicated motifs on the chromosome, given the old coordinates and domain indexes of the original motifs. """ # Coordinates are simply repeated new_coordinates = np.repeat(old_coordinates, 2) # Domain indexes are set to the child indexes of the original index new_domain_indexes = child_domains[ np.array( [ np.where(all_chromosome_domain_indexes == idx)[0][0] for idx in old_domain_indexes ] ), :, ].flatten() return new_coordinates, new_domain_indexes ####################### # Replicate promoters # ####################### n_new_promoters = 2 * np.count_nonzero(removed_promoters_mask) if n_new_promoters > 0: # Delete original promoters update["promoters"].update({"delete": np.where(removed_promoters_mask)[0]}) # Add freed active tfs update["bulk"].append( ( self.active_tfs_idx, promoter_bound_TFs[removed_promoters_mask, :].sum(axis=0), ) ) # Set up attributes for the replicated promoters promoter_TU_indexes_new = np.repeat( promoter_TU_indexes[removed_promoters_mask], 2 ) (promoter_coordinates_new, promoter_domain_indexes_new) = ( get_replicated_motif_attributes( promoter_coordinates[removed_promoters_mask], promoter_domain_indexes[removed_promoters_mask], ) ) # Add new promoters with new domain indexes update["promoters"].update( { "add": { "TU_index": promoter_TU_indexes_new, "coordinates": promoter_coordinates_new, "domain_index": promoter_domain_indexes_new, "bound_TF": np.zeros( (n_new_promoters, self.n_TFs), dtype=np.bool_ ), } } ) # Replicate genes n_new_genes = 2 * np.count_nonzero(removed_genes_mask) if n_new_genes > 0: # Delete original genes update["genes"].update({"delete": np.where(removed_genes_mask)[0]}) # Set up attributes for the replicated genes gene_cistron_indexes_new = np.repeat( gene_cistron_indexes[removed_genes_mask], 2 ) gene_coordinates_new, gene_domain_indexes_new = ( get_replicated_motif_attributes( gene_coordinates[removed_genes_mask], gene_domain_indexes[removed_genes_mask], ) ) # Add new genes with new domain indexes update["genes"].update( { "add": { "cistron_index": gene_cistron_indexes_new, "coordinates": gene_coordinates_new, "domain_index": gene_domain_indexes_new, } } ) ######################## # Replicate DnaA boxes # ######################## n_new_DnaA_boxes = 2 * np.count_nonzero(removed_DnaA_boxes_mask) if n_new_DnaA_boxes > 0: # Delete original DnaA boxes if removed_DnaA_boxes_mask.sum() > 0: update["DnaA_boxes"].update( {"delete": np.where(removed_DnaA_boxes_mask)[0]} ) # Set up attributes for the replicated boxes (DnaA_box_coordinates_new, DnaA_box_domain_indexes_new) = ( get_replicated_motif_attributes( DnaA_box_coordinates[removed_DnaA_boxes_mask], DnaA_box_domain_indexes[removed_DnaA_boxes_mask], ) ) # Add new DnaA boxes with new domain indexes dict_dna = { "add": { "coordinates": DnaA_box_coordinates_new, "domain_index": DnaA_box_domain_indexes_new, "DnaA_bound": np.zeros(n_new_DnaA_boxes, dtype=np.bool_), } } update["DnaA_boxes"].update(dict_dna) update["next_update_time"] = states["global_time"] + states["timestep"] return update
[docs] def _compute_new_segment_attributes( self, old_boundary_molecule_indexes: npt.NDArray[np.int64], old_boundary_coordinates: npt.NDArray[np.int64], old_linking_numbers: npt.NDArray[np.int64], new_molecule_indexes: npt.NDArray[np.int64], new_molecule_coordinates: npt.NDArray[np.int64], spans_oriC: bool, spans_terC: bool, ) -> dict[str, npt.NDArray[np.int64]]: """ Calculates the updated attributes of chromosomal segments belonging to a specific chromosomal domain, given the previous and current coordinates of molecules bound to the chromosome. Args: old_boundary_molecule_indexes: (N, 2) array of unique indexes of molecules that formed the boundaries of each chromosomal segment in the previous timestep. old_boundary_coordinates: (N, 2) array of chromosomal coordinates of molecules that formed the boundaries of each chromosomal segment in the previous timestep. old_linking_numbers: (N,) array of linking numbers of each chromosomal segment in the previous timestep. new_molecule_indexes: (N,) array of unique indexes of all molecules bound to the domain at the current timestep. new_molecule_coordinates: (N,) array of chromosomal coordinates of all molecules bound to the domain at the current timestep. spans_oriC: True if the domain spans the origin. spans_terC: True if the domain spans the terminus. Returns: Dictionary of the following format:: { 'boundary_molecule_indexes': (M, 2) array of unique indexes of molecules that form the boundaries of new chromosomal segments, 'boundary_coordinates': (M, 2) array of chromosomal coordinates of molecules that form the boundaries of new chromosomal segments, 'linking_numbers': (M,) array of linking numbers of new chromosomal segments } """ # Sort old segment arrays by coordinates of left boundary old_coordinates_argsort = np.argsort(old_boundary_coordinates[:, 0]) old_boundary_coordinates_sorted = old_boundary_coordinates[ old_coordinates_argsort, : ] old_boundary_molecule_indexes_sorted = old_boundary_molecule_indexes[ old_coordinates_argsort, : ] old_linking_numbers_sorted = old_linking_numbers[old_coordinates_argsort] # Sort new segment arrays by molecular coordinates new_coordinates_argsort = np.argsort(new_molecule_coordinates) new_molecule_coordinates_sorted = new_molecule_coordinates[ new_coordinates_argsort ] new_molecule_indexes_sorted = new_molecule_indexes[new_coordinates_argsort] # Domain does not span the origin if not spans_oriC: # A fragment spans oriC if two boundaries have opposite signs, # or both are equal to zero oriC_fragment_counts = np.count_nonzero( np.logical_not( np.logical_xor( old_boundary_coordinates_sorted[:, 0] < 0, old_boundary_coordinates_sorted[:, 1] > 0, ) ) ) # if oriC fragment did not exist in the domain in the previous # timestep, add a dummy fragment that covers the origin with # linking number zero. This is done to generalize the # implementation of this method. if oriC_fragment_counts == 0: # Index of first segment where left boundary is nonnegative oriC_fragment_index = np.argmax( old_boundary_coordinates_sorted[:, 0] >= 0 ) # Get indexes of boundary molecules for this dummy segment oriC_fragment_boundary_molecule_indexes = np.array( [ old_boundary_molecule_indexes_sorted[ oriC_fragment_index - 1, 1 ], old_boundary_molecule_indexes_sorted[oriC_fragment_index, 0], ] ) # Insert dummy segment to array old_boundary_molecule_indexes_sorted = np.insert( old_boundary_molecule_indexes_sorted, oriC_fragment_index, oriC_fragment_boundary_molecule_indexes, axis=0, ) old_linking_numbers_sorted = np.insert( old_linking_numbers_sorted, oriC_fragment_index, 0 ) else: # There should not be more than one fragment that spans oriC assert oriC_fragment_counts == 1 # Domain spans the terminus if spans_terC: # If the domain spans the terminus, dummy molecules are added to # each end of the chromosome s.t. the segment that spans terC is # split to two segments and we can maintain a linear representation # for the circular chromosome. These two segments are later # adjusted to have the same superhelical densities. new_molecule_coordinates_sorted = np.insert( new_molecule_coordinates_sorted, [0, len(new_molecule_coordinates_sorted)], [self.min_coordinates, self.max_coordinates], ) new_molecule_indexes_sorted = np.insert( new_molecule_indexes_sorted, [0, len(new_molecule_indexes_sorted)], self.terC_index, ) # Add dummy molecule to old segments if they do not already exist if old_boundary_molecule_indexes_sorted[0, 0] != self.terC_index: old_boundary_molecule_indexes_sorted = np.vstack( ( np.array( [ self.terC_index, old_boundary_molecule_indexes_sorted[0, 0], ] ), old_boundary_molecule_indexes_sorted, np.array( [ old_boundary_molecule_indexes_sorted[-1, 1], self.terC_index, ] ), ) ) old_linking_numbers_sorted = np.insert( old_linking_numbers_sorted, [0, len(old_linking_numbers_sorted)], 0 ) # Recalculate linking numbers of each segment after accounting for # boundary molecules that were removed in the current timestep linking_numbers_after_removal = [] right_boundaries_retained = np.isin( old_boundary_molecule_indexes_sorted[:, 1], new_molecule_indexes_sorted ) # Add up linking numbers of each segment until each retained boundary ln_this_fragment = 0.0 for retained, ln in zip(right_boundaries_retained, old_linking_numbers_sorted): ln_this_fragment += ln if retained: linking_numbers_after_removal.append(ln_this_fragment) ln_this_fragment = 0.0 # Number of segments should be equal to number of retained boundaries assert len(linking_numbers_after_removal) == right_boundaries_retained.sum() # Redistribute linking numbers of the two terC segments such that the # segments have same superhelical densities if spans_terC and np.count_nonzero(right_boundaries_retained) > 1: # Get molecule indexes of the boundaries of the two terC segments # left and right of terC retained_boundary_indexes = np.where(right_boundaries_retained)[0] left_segment_boundary_index = old_boundary_molecule_indexes_sorted[ retained_boundary_indexes[0], 1 ] right_segment_boundary_index = old_boundary_molecule_indexes_sorted[ retained_boundary_indexes[-2], 1 ] # Get mapping from molecule index to chromosomal coordinates molecule_index_to_coordinates = { index: coordinates for index, coordinates in zip( new_molecule_indexes_sorted, new_molecule_coordinates_sorted ) } # Distribute linking number between two segments proportional to # the length of each segment left_segment_length = ( molecule_index_to_coordinates[left_segment_boundary_index] - self.min_coordinates ) right_segment_length = ( self.max_coordinates - molecule_index_to_coordinates[right_segment_boundary_index] ) full_segment_length = left_segment_length + right_segment_length full_linking_number = ( linking_numbers_after_removal[0] + linking_numbers_after_removal[-1] ) linking_numbers_after_removal[0] = ( full_linking_number * left_segment_length / full_segment_length ) linking_numbers_after_removal[-1] = ( full_linking_number * right_segment_length / full_segment_length ) # Get mask for molecules that already existed in the previous timestep existing_molecules_mask = np.isin( new_molecule_indexes_sorted, old_boundary_molecule_indexes_sorted ) # Get numbers and lengths of new segments that each segment will be # split into segment_split_sizes = np.diff(np.where(existing_molecules_mask)[0]) segment_lengths = np.diff(new_molecule_coordinates_sorted) assert len(segment_split_sizes) == len(linking_numbers_after_removal) # Calculate linking numbers of each segment after accounting for new # boundaries that were added new_linking_numbers = [] i = 0 for ln, size in zip(linking_numbers_after_removal, segment_split_sizes): if size == 1: new_linking_numbers.append(ln) else: # Split linking number proportional to length of segment total_length = segment_lengths[i : i + size].sum() new_linking_numbers.extend( list(ln * segment_lengths[i : i + size] / total_length) ) i += size # Handle edge case where a domain was just initialized, and two # replisomes are bound to the origin if len(new_linking_numbers) == 0: new_linking_numbers = [0] # Build Mx2 array for boundary indexes and coordinates new_boundary_molecule_indexes = np.hstack( ( new_molecule_indexes_sorted[:-1, np.newaxis], new_molecule_indexes_sorted[1:, np.newaxis], ) ) new_boundary_coordinates = np.hstack( ( new_molecule_coordinates_sorted[:-1, np.newaxis], new_molecule_coordinates_sorted[1:, np.newaxis], ) ) new_linking_numbers = np.array(new_linking_numbers) # If domain does not span oriC, remove new segment that spans origin if not spans_oriC: oriC_fragment_mask = np.logical_not( np.logical_xor( new_boundary_coordinates[:, 0] < 0, new_boundary_coordinates[:, 1] > 0, ) ) assert oriC_fragment_mask.sum() == 1 new_boundary_molecule_indexes = new_boundary_molecule_indexes[ np.logical_not(oriC_fragment_mask), : ] new_boundary_coordinates = new_boundary_coordinates[ np.logical_not(oriC_fragment_mask), : ] new_linking_numbers = new_linking_numbers[ np.logical_not(oriC_fragment_mask) ] return { "boundary_molecule_indexes": new_boundary_molecule_indexes, "boundary_coordinates": new_boundary_coordinates, "linking_numbers": new_linking_numbers, }
[docs] def get_last_known_replisome_data( boundary_coordinates: np.ndarray, boundary_molecule_indexes: np.ndarray, replisome_coordinates: np.ndarray, replisome_molecule_indexes: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: """ Gets the last known coordinates and molecule indexes of both replisomes for a chromosome domain. Args: boundary_coordinates: (N, 2) array of chromosomal coordinates of all boundary molecules in the domain during the last time step. boundary_molecule_indexes: (N, 2) array of unique indexes of all boundary molecules in the domain during the last time step. replisome_coordinates: (1,) or (0,) array of chromosomal coordinates of the replisomes in the domain in the current time step. replisome_molecule_indexes: (1,) or (0,) array of unique indexes of the replisomes in the domain in the current time step. Returns: Tuple of the following format:: ( (2,) array of last known replisome coordinates in domain, (2,) array of last known replisome molecule indexes in domain ) """ # Sort old boundary coordinates and molecule indexes to find first index # where left boundary is non-negative boundary_coordinates_argsort = np.argsort(boundary_coordinates[:, 0]) boundary_coordinates_sorted = boundary_coordinates[boundary_coordinates_argsort] boundary_molecule_indexes_sorted = boundary_molecule_indexes[ boundary_coordinates_argsort ] replisome_index = np.argmax(boundary_coordinates_sorted[:, 0] >= 0) # If positive coordinate replisome still exists, add # last known info on negative coordinate replisome. if np.any(replisome_coordinates > 0): replisome_coordinates = np.insert( replisome_coordinates, 0, boundary_coordinates_sorted[replisome_index - 1, 1], ) replisome_molecule_indexes = np.insert( replisome_molecule_indexes, 0, boundary_molecule_indexes_sorted[replisome_index - 1, 1], ) # If negative coordinate replisome still exists, add # last known info on positive coordinate replisome. elif np.any(replisome_coordinates < 0): replisome_coordinates = np.insert( replisome_coordinates, 0, boundary_coordinates_sorted[replisome_index, 0] ) replisome_molecule_indexes = np.insert( replisome_molecule_indexes, 0, boundary_molecule_indexes_sorted[replisome_index, 0], ) # If neither replisomes exist, use last known info on both. else: replisome_coordinates = np.array( [ boundary_coordinates_sorted[replisome_index - 1, 1], boundary_coordinates_sorted[replisome_index, 0], ] ) replisome_molecule_indexes = np.array( [ boundary_molecule_indexes_sorted[replisome_index - 1, 1], boundary_molecule_indexes_sorted[replisome_index, 0], ] ) return replisome_coordinates, replisome_molecule_indexes
def test_superhelical_removal_sim(): """ Run a single time step simulation of :py:class:`~.ChromosomeStructure` that tests some edge cases in superhelical density calculations. Start with a chromosome that has four active replication forks for a total of 4 replisomes and 5 chromosome domains. There are 4 RNAPs per domain and 1 to 2 of those will be intentionally placed at the same coordinates as a replisome per domain. They should be removed, causing some segment boundaries to be redefined. Additionally, 3 of the replisomes will be removed. This should be detected and superhelical density calculations should still work using the last known information for the removed replisomes. """ # Get topology for UniqueUpdate Steps unique_topology = TOPOLOGY.copy() for non_unique in [ "bulk", "listeners", "global_time", "timestep", "next_update_time", ]: unique_topology.pop(non_unique) class TestComposer(Composer): def generate_processes(self, config): return { "chromosome_structure": ChromosomeStructure( { "inactive_RNAPs": "APORNAP-CPLX[c]", "ppi": "PPI[c]", "active_tfs": ["CPLX-125[c]"], "ribosome_30S_subunit": "CPLX0-3953[c]", "ribosome_50S_subunit": "CPLX0-3962[c]", "amino_acids": ["L-ALPHA-ALANINE[c]"], "water": "WATER[c]", "mature_rna_ids": ["alaT-tRNA[c]"], "fragmentBases": ["polymerized_ATP[c]"], "replichore_lengths": [100000, 100000], "calculate_superhelical_densities": True, } ), "unique_update": UniqueUpdate({"unique_topo": unique_topology}), "global_clock": GlobalClock(), } def generate_topology(self, config): return { "chromosome_structure": TOPOLOGY, "unique_update": unique_topology, "global_clock": { "global_time": ("global_time",), "next_update_time": ("next_update_time",), }, } def generate_flow(self, config): return { "chromosome_structure": [], "unique_update": [("chromosome_structure",)], } composer = TestComposer() template_initial_state = get_state_from_file() # Zero out all unique molecules for unique_mol in template_initial_state["unique"].values(): unique_mol.flags.writeable = True unique_mol["_entryState"] = 0 unique_mol.flags.writeable = False # Set up a single full chromosome full_chromosomes = template_initial_state["unique"]["full_chromosome"] full_chromosomes.flags.writeable = True full_chromosomes["_entryState"][0] = 1 full_chromosomes["domain_index"][0] = 0 full_chromosomes["unique_index"][0] = 0 full_chromosomes.flags.writeable = False # Set up chromosome domains chromosome_domains, replisome_idx = get_free_indices( template_initial_state["unique"]["chromosome_domain"], 5 ) chromosome_domains.flags.writeable = True chromosome_domains["_entryState"][replisome_idx] = 1 chromosome_domains["domain_index"][replisome_idx] = np.arange(5) chromosome_domains["unique_index"][replisome_idx] = np.arange(5) chromosome_domains["child_domains"][replisome_idx] = [ [1, 2], [3, 4], [-1, -1], [-1, -1], [-1, -1], ] chromosome_domains.flags.writeable = False template_initial_state["unique"]["chromosome_domain"] = chromosome_domains # Set up 1 oriC per domain that is not actively replicating oriCs, oriC_idx = get_free_indices(template_initial_state["unique"]["oriC"], 3) oriCs.flags.writeable = True oriCs["_entryState"][oriC_idx] = 1 oriCs["domain_index"][oriC_idx] = [2, 3, 4] oriCs["unique_index"][oriC_idx] = np.arange(3) oriCs.flags.writeable = False template_initial_state["unique"]["oriC"] = oriCs # Set up replisome for actively replicating domain # Notice that the replisomes previously at 45000, 20000, and -20000 # when the chromosomal segment data below was tabulated are removed active_replisomes, replisome_idx = get_free_indices( template_initial_state["unique"]["active_replisome"], 1 ) active_replisomes.flags.writeable = True active_replisomes["_entryState"][replisome_idx] = 1 active_replisomes["domain_index"][replisome_idx] = 0 active_replisomes["coordinates"][replisome_idx] = -50000 active_replisomes["unique_index"][replisome_idx] = 0 active_replisomes.flags.writeable = False template_initial_state["unique"]["active_replisome"] = active_replisomes # Set up 4 RNAPs per domain, some of which will intentionally have # the same coordinates as replisomes (either active or removed) active_RNAP = template_initial_state["unique"]["active_RNAP"] active_RNAP.flags.writeable = True coordinates_per_domain = [ [-65000, -50000, 60000, 75000], [-40000, -26000, 25000, 35000], [-30000, -10000, 20000, 40000], [-20000, -15000, 10000, 15000], ] for i in range(4): active_RNAP["_entryState"][i * 4 : (i + 1) * 4] = 1 active_RNAP["domain_index"][i * 4 : (i + 1) * 4] = i active_RNAP["is_forward"][i * 4 : (i + 1) * 4] = True active_RNAP["coordinates"][i * 4 : (i + 1) * 4] = coordinates_per_domain[i] active_RNAP["unique_index"][i * 4 : (i + 1) * 4] = np.arange( 4 + i * 4, 4 + (i + 1) * 4 ) # Special domain 4 that will be left with no molecules active_RNAP["_entryState"][16:18] = 1 active_RNAP["domain_index"][16:18] = 4 active_RNAP["is_forward"][16:18] = True active_RNAP["coordinates"][16:18] = [-20000, 20000] active_RNAP["unique_index"][16:18] = np.arange(20, 22) active_RNAP.flags.writeable = False # Construct chromosomal segments by domain # Assume that RNAPs have advanced 1000 bp from their initial coordinates # and replisomes have advanced 5000 bp boundary_coordinates = [] boundary_molecule_indexes = [] domain_index = [] linking_number = [] # Segments for domain 0 # - Includes replisome at 45000 (index 1) that will be removed # - Includes RNAP (index 5) that will conflict with replisome at -50000 (index 0) boundary_coordinates.extend( [[-64000, -49000], [-49000, -45000], [45000, 59000], [59000, 74000]] ) boundary_molecule_indexes.extend([[4, 5], [5, 0], [1, 6], [6, 7]]) domain_index.extend([0, 0, 0, 0]) linking_number.extend([1, 1, 1, 1]) # Segments for domain 1 # - Includes replisome at 20000 (index 2) that will be removed # - Includes replisome at -20000 (index 3) that will be removed # - Parent domain replisome at 45000 (index 1) will be removed boundary_coordinates.extend( [ [-45000, -39000], [-39000, -25000], [-25000, -20000], [20000, 24000], [24000, 34000], [34000, 45000], ] ) boundary_molecule_indexes.extend( [[0, 8], [8, 9], [9, 3], [2, 10], [10, 11], [11, 1]] ) domain_index.extend([1, 1, 1, 1, 1, 1]) linking_number.extend([1, 1, 1, 1, 1, 1]) # Segments for domain 2 # - Parent domain replisome at 45000 (index 1) will be removed boundary_coordinates.extend( [ [-45000, -29000], [-29000, -9000], [-9000, 19000], [19000, 39000], [39000, 45000], ] ) boundary_molecule_indexes.extend([[0, 12], [12, 13], [13, 14], [14, 15], [15, 1]]) domain_index.extend([2, 2, 2, 2, 2]) linking_number.extend([1, 1, 1, 1, 1]) # Segments for domain 3 # - Includes RNAP (index 16) that will conflict with replisome that will # be removed at -20000 (index 3) # - Includes replisome at 20000 (index 2) that will be removed boundary_coordinates.extend( [ [-20000, -19000], [-19000, -14000], [-14000, 9000], [9000, 14000], [14000, 20000], ] ) boundary_molecule_indexes.extend([[3, 16], [16, 17], [17, 18], [18, 19], [19, 2]]) domain_index.extend([3, 3, 3, 3, 3]) linking_number.extend([1, 1, 1, 1, 1]) # Segments for domain 4 # - Includes RNAP (index 20) that will conflict with replisome that will # be removed at -20000 (index 3) # - Includes RNAP (index 23) that will conflict with replisome that will # be removed at 20000 (index 2) boundary_coordinates.extend([[-20000, -19000], [-19000, 19000], [19000, 20000]]) boundary_molecule_indexes.extend([[3, 20], [20, 21], [21, 2]]) domain_index.extend([4, 4, 4]) linking_number.extend([1, 1, 1]) chromosomal_segments, segment_idx = get_free_indices( template_initial_state["unique"]["chromosomal_segment"], len(linking_number) ) chromosomal_segments.flags.writeable = True chromosomal_segments["_entryState"][segment_idx] = 1 chromosomal_segments["boundary_coordinates"][segment_idx] = boundary_coordinates chromosomal_segments["boundary_molecule_indexes"][segment_idx] = ( boundary_molecule_indexes ) chromosomal_segments["domain_index"][segment_idx] = domain_index chromosomal_segments["linking_number"][segment_idx] = linking_number chromosomal_segments["unique_index"][segment_idx] = np.arange(len(linking_number)) chromosomal_segments.flags.writeable = False template_initial_state["unique"]["chromosomal_segment"] = chromosomal_segments # Since unique numpy updater is an class method, internal # deepcopying in vivarium-core causes this warning to appear warnings.filterwarnings( "ignore", message="Incompatible schema " "assignment at .+ Trying to assign the value <bound method " r"UniqueNumpyUpdater\.updater .+ to key updater, which already " r"has the value <bound method UniqueNumpyUpdater\.updater", ) engine = Engine( composite=composer.generate(), initial_state=template_initial_state, ) engine.update(1) state = engine.state.get_value() # Check that right number of collisions happened at right coordinates rnap_data = state["listeners"]["rnap_data"] assert rnap_data["n_total_collisions"] == 1 assert rnap_data["n_headon_collisions"] == 1 assert rnap_data["n_codirectional_collisions"] == 0 assert np.array_equal( rnap_data["headon_collision_coordinates"], np.array([-50000], dtype=int) ) assert rnap_data["n_empty_fork_collisions"] == 3 assert np.array_equal( rnap_data["empty_fork_collision_coordinates"], np.array([-20000, -20000, 20000], dtype=int), ) # Check chromosomal segments chromosomal_segments = state["unique"]["chromosomal_segment"][ state["unique"]["chromosomal_segment"]["_entryState"].view(np.bool_) ] assert np.array_equal( chromosomal_segments["boundary_coordinates"], np.array( [ # Domain 0 [-100000, -65000], [-65000, -50000], [45000, 60000], [60000, 75000], [75000, 100000], # Domain 1 [-50000, -40000], [-40000, -26000], [-26000, -20000], [20000, 25000], [25000, 35000], [35000, 45000], # Domain 2 [-50000, -30000], [-30000, -10000], [-10000, 20000], [20000, 40000], [40000, 45000], # Domain 3 [-20000, -15000], [-15000, 10000], [10000, 15000], [15000, 20000], # Domain 4 [-20000, 20000], ], dtype=int, ), ) assert np.array_equal( chromosomal_segments["boundary_molecule_indexes"], np.array( [ # Domain 0: index 5 is gone due to conflict with replisome, # segments were added flanking terC (-1 placeholder index), # index 1 replisome is gone but still exists as placeholder # for replication fork in our superhelical density calculations [-1, 4], [4, 0], [1, 6], [6, 7], [7, -1], # Domain 1: index 1, 2, and 3 replisomes are removed but still exist # here as placeholders [0, 8], [8, 9], [9, 3], [2, 10], [10, 11], [11, 1], # Domain 2: index 1 replisome was removed but still exists here # as a placeholder [0, 12], [12, 13], [13, 14], [14, 15], [15, 1], # Domain 3: index 2 and 3 replisomes are removed but still exist # here as placeholders, index 16 RNAP was removed due to conflict # with replisome placeholder index 3 [3, 17], [17, 18], [18, 19], [19, 2], # Domain 4: index 2 and 3 replisomes were removed but still exist # here as placeholders, index 20 and 21 RNAPs were removed due to # conflicts with replisome placeholders [3, 2], ], dtype=int, ), ) assert np.array_equal( chromosomal_segments["domain_index"], np.array( [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4], dtype=int ), ) assert np.array_equal( chromosomal_segments["linking_number"], np.array( [ # Domain 0: terC segments have 0 linking number, second segment # combines two original segments due to removed RNAP index 5 0, 2, 1, 1, 0, # Domain 1: No molecule boundaries were changed 1, 1, 1, 1, 1, 1, # Domain 2: No molecule boundaries were changed 1, 1, 1, 1, 1, # Domain 3: First segment combines two original segments due to # removed RNAP index 16 2, 1, 1, 1, # Domain 4: Sole segment combines all three original segments # due to removed RNAP index 20 and 21, leaving only a single # segment between two unoccupied replication forks 3, ], dtype=float, ), ) if __name__ == "__main__": test_superhelical_removal_sim()