"""
====================
Chromosome Structure
====================
- Resolve collisions between molecules and replication forks on the chromosome.
- Remove and replicate promoters and motifs that are traversed by replisomes.
- Reset the boundaries and linking numbers of chromosomal segments.
"""
import numpy as np
import numpy.typing as npt
import warnings
from vivarium.core.process import Step
from ecoli.processes.registries import topology_registry
from ecoli.library.schema import (
create_unique_indexes,
listener_schema,
numpy_schema,
attrs,
bulk_name_to_idx,
)
from wholecell.utils.polymerize import buildSequences
# Register default topology for this process, associating it with process name
NAME = "ecoli-chromosome-structure"
TOPOLOGY = {
"bulk": ("bulk",),
"listeners": ("listeners",),
"active_replisomes": (
"unique",
"active_replisome",
),
"oriCs": (
"unique",
"oriC",
),
"chromosome_domains": (
"unique",
"chromosome_domain",
),
"active_RNAPs": ("unique", "active_RNAP"),
"RNAs": ("unique", "RNA"),
"active_ribosome": ("unique", "active_ribosome"),
"full_chromosomes": (
"unique",
"full_chromosome",
),
"promoters": ("unique", "promoter"),
"DnaA_boxes": ("unique", "DnaA_box"),
"genes": ("unique", "gene"),
# TODO(vivarium): Only include if superhelical density flag is passed
# "chromosomal_segments": ("unique", "chromosomal_segment")
"global_time": ("global_time",),
"timestep": ("timestep",),
"next_update_time": ("next_update_time", "chromosome_structure"),
}
topology_registry.register(NAME, TOPOLOGY)
[docs]
class ChromosomeStructure(Step):
"""Chromosome Structure Process"""
name = NAME
topology = TOPOLOGY
defaults = {
# Load parameters
"RNA_sequences": [],
"protein_sequences": [],
"n_TUs": 1,
"n_TFs": 1,
"n_amino_acids": 1,
"n_fragment_bases": 1,
"replichore_lengths": [0, 0],
"relaxed_DNA_base_pairs_per_turn": 1,
"terC_index": 1,
"calculate_superhelical_densities": False,
# Get placeholder value for chromosome domains without children
"no_child_place_holder": -1,
# Load bulk molecule views
"inactive_RNAPs": [],
"fragmentBases": [],
"ppi": "ppi",
"active_tfs": [],
"ribosome_30S_subunit": "30S",
"ribosome_50S_subunit": "50S",
"amino_acids": [],
"water": "water",
"seed": 0,
"emit_unique": False,
}
# Constructor
def __init__(self, parameters=None):
super().__init__(parameters)
self.rna_sequences = self.parameters["rna_sequences"]
self.protein_sequences = self.parameters["protein_sequences"]
self.n_TUs = self.parameters["n_TUs"]
self.n_TFs = self.parameters["n_TFs"]
self.rna_ids = self.parameters["rna_ids"]
self.n_amino_acids = self.parameters["n_amino_acids"]
self.n_fragment_bases = self.parameters["n_fragment_bases"]
replichore_lengths = self.parameters["replichore_lengths"]
self.min_coordinates = -replichore_lengths[1]
self.max_coordinates = replichore_lengths[0]
self.relaxed_DNA_base_pairs_per_turn = self.parameters[
"relaxed_DNA_base_pairs_per_turn"
]
self.terC_index = self.parameters["terC_index"]
self.n_mature_rnas = self.parameters["n_mature_rnas"]
self.mature_rna_ids = self.parameters["mature_rna_ids"]
self.mature_rna_end_positions = self.parameters["mature_rna_end_positions"]
self.mature_rna_nt_counts = self.parameters["mature_rna_nt_counts"]
self.unprocessed_rna_index_mapping = self.parameters[
"unprocessed_rna_index_mapping"
]
# Load sim options
self.calculate_superhelical_densities = self.parameters[
"calculate_superhelical_densities"
]
# Get placeholder value for chromosome domains without children
self.no_child_place_holder = self.parameters["no_child_place_holder"]
self.inactive_RNAPs = self.parameters["inactive_RNAPs"]
self.fragmentBases = self.parameters["fragmentBases"]
self.ppi = self.parameters["ppi"]
self.active_tfs = self.parameters["active_tfs"]
self.ribosome_30S_subunit = self.parameters["ribosome_30S_subunit"]
self.ribosome_50S_subunit = self.parameters["ribosome_50S_subunit"]
self.amino_acids = self.parameters["amino_acids"]
self.water = self.parameters["water"]
self.inactive_RNAPs_idx = None
self.emit_unique = self.parameters.get("emit_unique", True)
self.chromosome_segment_index = 0
self.promoter_index = 60000
self.DnaA_box_index = 60000
self.random_state = np.random.RandomState(seed=self.parameters["seed"])
[docs]
def ports_schema(self):
ports = {
"listeners": {
"rnap_data": listener_schema(
{
"n_total_collisions": 0,
"n_headon_collisions": 0,
"n_codirectional_collisions": 0,
"headon_collision_coordinates": [],
"codirectional_collision_coordinates": [],
"n_removed_ribosomes": 0,
"incomplete_transcription_events": (
np.zeros(self.n_TUs, np.int64),
self.rna_ids,
),
}
)
},
"bulk": numpy_schema("bulk"),
# Unique molecules
"active_replisomes": numpy_schema(
"active_replisomes", emit=self.parameters["emit_unique"]
),
"oriCs": numpy_schema("oriCs", emit=self.parameters["emit_unique"]),
"chromosome_domains": numpy_schema(
"chromosome_domains", emit=self.parameters["emit_unique"]
),
"active_RNAPs": numpy_schema(
"active_RNAPs", emit=self.parameters["emit_unique"]
),
"RNAs": numpy_schema("RNAs", emit=self.parameters["emit_unique"]),
"active_ribosome": numpy_schema(
"active_ribosome", emit=self.parameters["emit_unique"]
),
"full_chromosomes": numpy_schema(
"full_chromosomes", emit=self.parameters["emit_unique"]
),
"promoters": numpy_schema("promoters", emit=self.parameters["emit_unique"]),
"DnaA_boxes": numpy_schema(
"DnaA_boxes", emit=self.parameters["emit_unique"]
),
"genes": numpy_schema("genes", emit=self.parameters["emit_unique"]),
"global_time": {"_default": 0.0},
"timestep": {"_default": self.parameters["time_step"]},
"next_update_time": {
"_default": self.parameters["time_step"],
"_updater": "set",
"_divider": "set",
},
}
# TODO: Work on this functionality
if self.calculate_superhelical_densities:
ports["chromosomal_segments"] = {
"*": {
"boundary_molecule_indexes": {
"_default": np.empty((0, 2), dtype=np.int64)
},
"boundary_coordinates": {
"_default": np.empty((0, 2), dtype=np.int64)
},
"domain_index": {"_default": 0},
"linking_number": {"_default": 0},
}
}
return ports
[docs]
def update_condition(self, timestep, states):
"""
See :py:meth:`~ecoli.processes.partition.Requester.update_condition`.
"""
if states["next_update_time"] <= states["global_time"]:
if states["next_update_time"] < states["global_time"]:
warnings.warn(
f"{self.name} updated at t="
f"{states['global_time']} instead of t="
f"{states['next_update_time']}. Decrease the "
"timestep for the global clock process for more "
"accurate timekeeping."
)
return True
return False
[docs]
def next_update(self, timestep, states):
# At t=0, convert all strings to indices
if self.inactive_RNAPs_idx is None:
self.fragmentBasesIdx = bulk_name_to_idx(
self.fragmentBases, states["bulk"]["id"]
)
self.active_tfs_idx = bulk_name_to_idx(
self.active_tfs, states["bulk"]["id"]
)
self.ribosome_30S_subunit_idx = bulk_name_to_idx(
self.ribosome_30S_subunit, states["bulk"]["id"]
)
self.ribosome_50S_subunit_idx = bulk_name_to_idx(
self.ribosome_50S_subunit, states["bulk"]["id"]
)
self.amino_acids_idx = bulk_name_to_idx(
self.amino_acids, states["bulk"]["id"]
)
self.water_idx = bulk_name_to_idx(self.water, states["bulk"]["id"])
self.ppi_idx = bulk_name_to_idx(self.ppi, states["bulk"]["id"])
self.inactive_RNAPs_idx = bulk_name_to_idx(
self.inactive_RNAPs, states["bulk"]["id"]
)
self.mature_rna_idx = bulk_name_to_idx(
self.mature_rna_ids, states["bulk"]["id"]
)
# Read unique molecule attributes
(replisome_domain_indexes, replisome_coordinates, replisome_unique_indexes) = (
attrs(
states["active_replisomes"],
["domain_index", "coordinates", "unique_index"],
)
)
(all_chromosome_domain_indexes, child_domains) = attrs(
states["chromosome_domains"], ["domain_index", "child_domains"]
)
(
RNAP_domain_indexes,
RNAP_coordinates,
RNAP_is_forward,
RNAP_unique_indexes,
) = attrs(
states["active_RNAPs"],
["domain_index", "coordinates", "is_forward", "unique_index"],
)
(origin_domain_indexes,) = attrs(states["oriCs"], ["domain_index"])
(mother_domain_indexes,) = attrs(states["full_chromosomes"], ["domain_index"])
(RNA_TU_indexes, transcript_lengths, RNA_RNAP_indexes, RNA_unique_indexes) = (
attrs(
states["RNAs"],
["TU_index", "transcript_length", "RNAP_index", "unique_index"],
)
)
(ribosome_protein_indexes, ribosome_peptide_lengths, ribosome_mRNA_indexes) = (
attrs(
states["active_ribosome"],
["protein_index", "peptide_length", "mRNA_index"],
)
)
(
promoter_TU_indexes,
promoter_domain_indexes,
promoter_coordinates,
promoter_bound_TFs,
) = attrs(
states["promoters"], ["TU_index", "domain_index", "coordinates", "bound_TF"]
)
(gene_cistron_indexes, gene_domain_indexes, gene_coordinates) = attrs(
states["genes"], ["cistron_index", "domain_index", "coordinates"]
)
(DnaA_box_domain_indexes, DnaA_box_coordinates, DnaA_box_bound) = attrs(
states["DnaA_boxes"], ["domain_index", "coordinates", "DnaA_bound"]
)
# Build dictionary of replisome coordinates with domain indexes as keys
replisome_coordinates_from_domains = {
domain_index: replisome_coordinates[
replisome_domain_indexes == domain_index
]
for domain_index in np.unique(replisome_domain_indexes)
}
def get_removed_molecules_mask(domain_indexes, coordinates):
"""
Computes the boolean mask of unique molecules that should be
removed based on the progression of the replication forks.
"""
mask = np.zeros_like(domain_indexes, dtype=np.bool_)
# Loop through all domains
for domain_index in np.unique(domain_indexes):
# Domain has active replisomes
if domain_index in replisome_coordinates_from_domains:
domain_replisome_coordinates = replisome_coordinates_from_domains[
domain_index
]
# Get mask for molecules on this domain that are out of range
domain_mask = np.logical_and.reduce(
(
domain_indexes == domain_index,
coordinates > domain_replisome_coordinates.min(),
coordinates < domain_replisome_coordinates.max(),
)
)
# Domain has no active replisomes
else:
# Domain has child domains (has finished replicating)
if (
child_domains[all_chromosome_domain_indexes == domain_index, 0]
!= self.no_child_place_holder
):
# Remove all molecules on this domain
domain_mask = domain_indexes == domain_index
# Domain has not started replication
else:
continue
mask[domain_mask] = True
return mask
# Build mask for molecules that should be removed
removed_RNAPs_mask = get_removed_molecules_mask(
RNAP_domain_indexes, RNAP_coordinates
)
removed_promoters_mask = get_removed_molecules_mask(
promoter_domain_indexes, promoter_coordinates
)
removed_genes_mask = get_removed_molecules_mask(
gene_domain_indexes, gene_coordinates
)
removed_DnaA_boxes_mask = get_removed_molecules_mask(
DnaA_box_domain_indexes, DnaA_box_coordinates
)
# Get attribute arrays of remaining RNAPs
remaining_RNAPs_mask = np.logical_not(removed_RNAPs_mask)
remaining_RNAP_domain_indexes = RNAP_domain_indexes[remaining_RNAPs_mask]
remaining_RNAP_coordinates = RNAP_coordinates[remaining_RNAPs_mask]
remaining_RNAP_unique_indexes = RNAP_unique_indexes[remaining_RNAPs_mask]
# Build masks for head-on and co-directional collisions between RNAPs
# and replication forks
RNAP_headon_collision_mask = np.logical_and(
removed_RNAPs_mask, np.logical_xor(RNAP_is_forward, RNAP_coordinates > 0)
)
RNAP_codirectional_collision_mask = np.logical_and(
removed_RNAPs_mask, np.logical_not(RNAP_headon_collision_mask)
)
n_total_collisions = np.count_nonzero(removed_RNAPs_mask)
n_headon_collisions = np.count_nonzero(RNAP_headon_collision_mask)
n_codirectional_collisions = np.count_nonzero(RNAP_codirectional_collision_mask)
# Write values to listeners
update = {
"listeners": {
"rnap_data": {
"n_total_collisions": n_total_collisions,
"n_headon_collisions": n_headon_collisions,
"n_codirectional_collisions": n_codirectional_collisions,
"headon_collision_coordinates": RNAP_coordinates[
RNAP_headon_collision_mask
],
"codirectional_collision_coordinates": RNAP_coordinates[
RNAP_codirectional_collision_mask
],
}
},
"bulk": [],
"active_replisomes": {},
"oriCs": {},
"chromosome_domains": {},
"active_RNAPs": {},
"RNAs": {},
"active_ribosome": {},
"full_chromosomes": {},
"promoters": {},
"genes": {},
"DnaA_boxes": {},
}
if self.calculate_superhelical_densities:
# Get attributes of existing segments
(
boundary_molecule_indexes,
boundary_coordinates,
segment_domain_indexes,
linking_numbers,
) = attrs(
states["chromosomal_segments"],
[
"boundary_molecule_indexes",
"boundary_coordinates",
"domain_index",
"linking_number",
],
)
# Initialize new attributes of chromosomal segments
all_new_boundary_molecule_indexes = np.empty((0, 2), dtype=np.int64)
all_new_boundary_coordinates = np.empty((0, 2), dtype=np.int64)
all_new_segment_domain_indexes = np.array([], dtype=np.int32)
all_new_linking_numbers = np.array([], dtype=np.float64)
for domain_index in np.unique(all_chromosome_domain_indexes):
# Skip domains that have completed replication
if np.all(domain_index < mother_domain_indexes):
continue
domain_spans_oriC = domain_index in origin_domain_indexes
domain_spans_terC = domain_index in mother_domain_indexes
# Get masks for segments and RNAPs in this domain
segments_domain_mask = segment_domain_indexes == domain_index
RNAP_domain_mask = remaining_RNAP_domain_indexes == domain_index
# Parse attributes of segments in this domain
boundary_molecule_indexes_this_domain = boundary_molecule_indexes[
segments_domain_mask, :
]
boundary_coordinates_this_domain = boundary_coordinates[
segments_domain_mask, :
]
linking_numbers_this_domain = linking_numbers[segments_domain_mask]
# Parse attributes of remaining RNAPs in this domain
new_molecule_coordinates_this_domain = remaining_RNAP_coordinates[
RNAP_domain_mask
]
new_molecule_indexes_this_domain = remaining_RNAP_unique_indexes[
RNAP_domain_mask
]
# Append coordinates and indexes of replisomes on this domain,
# if any
if not domain_spans_oriC:
replisome_domain_mask = replisome_domain_indexes == domain_index
new_molecule_coordinates_this_domain = np.concatenate(
(
new_molecule_coordinates_this_domain,
replisome_coordinates[replisome_domain_mask],
)
)
new_molecule_indexes_this_domain = np.concatenate(
(
new_molecule_indexes_this_domain,
replisome_unique_indexes[replisome_domain_mask],
)
)
# Append coordinates and indexes of parent domain replisomes,
# if any
if not domain_spans_terC:
parent_domain_index = all_chromosome_domain_indexes[
np.where(child_domains == domain_index)[0][0]
]
replisome_parent_domain_mask = (
replisome_domain_indexes == parent_domain_index
)
new_molecule_coordinates_this_domain = np.concatenate(
(
new_molecule_coordinates_this_domain,
replisome_coordinates[replisome_parent_domain_mask],
)
)
new_molecule_indexes_this_domain = np.concatenate(
(
new_molecule_indexes_this_domain,
replisome_unique_indexes[replisome_parent_domain_mask],
)
)
# If there are no molecules left on this domain, continue
if len(new_molecule_indexes_this_domain) == 0:
continue
# Calculate attributes of new segments
new_segment_attrs = self._compute_new_segment_attributes(
boundary_molecule_indexes_this_domain,
boundary_coordinates_this_domain,
linking_numbers_this_domain,
new_molecule_indexes_this_domain,
new_molecule_coordinates_this_domain,
domain_spans_oriC,
domain_spans_terC,
)
# Append to existing array of new segment attributes
all_new_boundary_molecule_indexes = np.vstack(
(
all_new_boundary_molecule_indexes,
new_segment_attrs["boundary_molecule_indexes"],
)
)
all_new_boundary_coordinates = np.vstack(
(
all_new_boundary_coordinates,
new_segment_attrs["boundary_coordinates"],
)
)
all_new_segment_domain_indexes = np.concatenate(
(
all_new_segment_domain_indexes,
np.full(
len(new_segment_attrs["linking_numbers"]),
domain_index,
dtype=np.int32,
),
)
)
all_new_linking_numbers = np.concatenate(
(all_new_linking_numbers, new_segment_attrs["linking_numbers"])
)
# Delete all existing chromosomal segments
if len(boundary_molecule_indexes) > 0:
update["chromosomal_segments"].update(
{"delete": np.arange(len(boundary_molecule_indexes))}
)
# Add new chromosomal segments
n_segments = len(all_new_linking_numbers)
if "chromosomal_segments" in states and states["chromosomal_segments"]:
self.chromosome_segment_index = (
int(
max(
[
int(index)
for index in list(states["chromosomal_segments"].keys())
]
)
)
+ 1
)
update["chromosomal_segments"].update(
{
"add": {
"unique_index": np.arange(
self.chromosome_segment_index,
self.chromosome_segment_index + n_segments,
),
"boundary_molecule_indexes": all_new_boundary_molecule_indexes,
"boundary_coordinates": all_new_boundary_coordinates,
"domain_index": all_new_segment_domain_indexes,
"linking_number": all_new_linking_numbers,
}
}
)
# Get mask for RNAs that are transcribed from removed RNAPs
removed_RNAs_mask = np.isin(
RNA_RNAP_indexes, RNAP_unique_indexes[removed_RNAPs_mask]
)
# Initialize counts of incomplete transcription events
incomplete_transcription_event = np.zeros(self.n_TUs)
# Remove RNAPs and RNAs that have collided with replisomes
if n_total_collisions > 0:
if removed_RNAPs_mask.sum() > 0:
update["active_RNAPs"].update(
{"delete": np.where(removed_RNAPs_mask)[0]}
)
if removed_RNAs_mask.sum() > 0:
update["RNAs"].update({"delete": np.where(removed_RNAs_mask)[0]})
# Increment counts of inactive RNAPs
update["bulk"].append((self.inactive_RNAPs_idx, n_total_collisions))
# Get sequences of incomplete transcripts
incomplete_sequence_lengths = transcript_lengths[removed_RNAs_mask]
n_initiated_sequences = np.count_nonzero(incomplete_sequence_lengths)
n_ppi_added = n_initiated_sequences
if n_initiated_sequences > 0:
incomplete_rna_indexes = RNA_TU_indexes[removed_RNAs_mask]
incomplete_transcription_event = np.bincount(
incomplete_rna_indexes, minlength=self.n_TUs
)
incomplete_sequences = buildSequences(
self.rna_sequences,
incomplete_rna_indexes,
np.zeros(n_total_collisions, dtype=np.int64),
np.full(n_total_collisions, incomplete_sequence_lengths.max()),
)
mature_rna_counts = np.zeros(self.n_mature_rnas, dtype=np.int64)
base_counts = np.zeros(self.n_fragment_bases, dtype=np.int64)
for ri, sl, seq in zip(
incomplete_rna_indexes,
incomplete_sequence_lengths,
incomplete_sequences,
):
# Check if incomplete RNA is an unprocessed RNA
if ri in self.unprocessed_rna_index_mapping:
# Find mature RNA molecules that would need to be added
# given the length of the incomplete RNA
mature_rna_end_pos = self.mature_rna_end_positions[
:, self.unprocessed_rna_index_mapping[ri]
]
mature_rnas_produced = np.logical_and(
mature_rna_end_pos != 0, mature_rna_end_pos < sl
)
# Increment counts of mature RNAs
mature_rna_counts += mature_rnas_produced
# Increment counts of fragment NTPs, but exclude bases
# that are part of the mature RNAs generated
base_counts += np.bincount(
seq[:sl], minlength=self.n_fragment_bases
) - self.mature_rna_nt_counts[mature_rnas_produced, :].sum(
axis=0
)
# Exclude ppi molecules that are part of mature RNAs
n_ppi_added -= mature_rnas_produced.sum()
else:
base_counts += np.bincount(
seq[:sl], minlength=self.n_fragment_bases
)
base_counts += np.bincount(
seq[:sl], minlength=self.n_fragment_bases
)
# Increment counts of mature RNAs, fragment NTPs and phosphates
update["bulk"].append((self.mature_rna_idx, mature_rna_counts))
update["bulk"].append((self.fragmentBasesIdx, base_counts))
update["bulk"].append((self.ppi_idx, n_ppi_added))
assert n_initiated_sequences == incomplete_transcription_event.sum()
update["listeners"]["rnap_data"]["incomplete_transcription_event"] = (
incomplete_transcription_event
)
# Get mask for ribosomes that are bound to nonexisting mRNAs
remaining_RNA_unique_indexes = RNA_unique_indexes[
np.logical_not(removed_RNAs_mask)
]
removed_ribosomes_mask = np.logical_not(
np.isin(ribosome_mRNA_indexes, remaining_RNA_unique_indexes)
)
n_removed_ribosomes = np.count_nonzero(removed_ribosomes_mask)
# Remove ribosomes that are bound to missing RNA molecules. This
# includes both RNAs removed by this function and RNAs removed
# by other processes (e.g. RNA degradation).
if n_removed_ribosomes > 0:
update["active_ribosome"].update(
{"delete": np.where(removed_ribosomes_mask)[0]}
)
# Increment counts of inactive ribosomal subunits
update["bulk"].extend(
[
(self.ribosome_30S_subunit_idx, n_removed_ribosomes),
(self.ribosome_50S_subunit_idx, n_removed_ribosomes),
]
)
# Get amino acid sequences of incomplete polypeptides
incomplete_sequence_lengths = ribosome_peptide_lengths[
removed_ribosomes_mask
]
n_initiated_sequences = np.count_nonzero(incomplete_sequence_lengths)
if n_initiated_sequences > 0:
incomplete_sequences = buildSequences(
self.protein_sequences,
ribosome_protein_indexes[removed_ribosomes_mask],
np.zeros(n_removed_ribosomes, dtype=np.int64),
np.full(n_removed_ribosomes, incomplete_sequence_lengths.max()),
)
amino_acid_counts = np.zeros(self.n_amino_acids, dtype=np.int64)
for sl, seq in zip(incomplete_sequence_lengths, incomplete_sequences):
amino_acid_counts += np.bincount(
seq[:sl], minlength=self.n_amino_acids
)
# Increment counts of free amino acids and decrease counts of
# free water molecules
update["bulk"].append((self.amino_acids_idx, amino_acid_counts))
update["bulk"].append(
(
self.water_idx,
(n_initiated_sequences - incomplete_sequence_lengths.sum()),
)
)
# Write to listener
update["listeners"]["rnap_data"]["n_removed_ribosomes"] = n_removed_ribosomes
def get_replicated_motif_attributes(old_coordinates, old_domain_indexes):
"""
Computes the attributes of replicated motifs on the chromosome,
given the old coordinates and domain indexes of the original motifs.
"""
# Coordinates are simply repeated
new_coordinates = np.repeat(old_coordinates, 2)
# Domain indexes are set to the child indexes of the original index
new_domain_indexes = child_domains[
np.array(
[
np.where(all_chromosome_domain_indexes == idx)[0][0]
for idx in old_domain_indexes
]
),
:,
].flatten()
return new_coordinates, new_domain_indexes
#######################
# Replicate promoters #
#######################
n_new_promoters = 2 * np.count_nonzero(removed_promoters_mask)
if n_new_promoters > 0:
# Delete original promoters
update["promoters"].update({"delete": np.where(removed_promoters_mask)[0]})
# Add freed active tfs
update["bulk"].append(
(
self.active_tfs_idx,
promoter_bound_TFs[removed_promoters_mask, :].sum(axis=0),
)
)
# Set up attributes for the replicated promoters
promoter_TU_indexes_new = np.repeat(
promoter_TU_indexes[removed_promoters_mask], 2
)
(promoter_coordinates_new, promoter_domain_indexes_new) = (
get_replicated_motif_attributes(
promoter_coordinates[removed_promoters_mask],
promoter_domain_indexes[removed_promoters_mask],
)
)
# Add new promoters with new domain indexes
promoter_indices = create_unique_indexes(n_new_promoters, self.random_state)
update["promoters"].update(
{
"add": {
"unique_index": promoter_indices,
"TU_index": promoter_TU_indexes_new,
"coordinates": promoter_coordinates_new,
"domain_index": promoter_domain_indexes_new,
"bound_TF": np.zeros(
(n_new_promoters, self.n_TFs), dtype=np.bool_
),
}
}
)
# Replicate genes
n_new_genes = 2 * np.count_nonzero(removed_genes_mask)
if n_new_genes > 0:
# Delete original genes
update["genes"].update({"delete": np.where(removed_genes_mask)[0]})
# Set up attributes for the replicated genes
gene_cistron_indexes_new = np.repeat(
gene_cistron_indexes[removed_genes_mask], 2
)
gene_coordinates_new, gene_domain_indexes_new = (
get_replicated_motif_attributes(
gene_coordinates[removed_genes_mask],
gene_domain_indexes[removed_genes_mask],
)
)
# Add new genes with new domain indexes
gene_indices = create_unique_indexes(n_new_genes, self.random_state)
update["genes"].update(
{
"add": {
"unique_index": gene_indices,
"cistron_index": gene_cistron_indexes_new,
"coordinates": gene_coordinates_new,
"domain_index": gene_domain_indexes_new,
}
}
)
########################
# Replicate DnaA boxes #
########################
n_new_DnaA_boxes = 2 * np.count_nonzero(removed_DnaA_boxes_mask)
if n_new_DnaA_boxes > 0:
# Delete original DnaA boxes
if removed_DnaA_boxes_mask.sum() > 0:
update["DnaA_boxes"].update(
{"delete": np.where(removed_DnaA_boxes_mask)[0]}
)
# Set up attributes for the replicated boxes
(DnaA_box_coordinates_new, DnaA_box_domain_indexes_new) = (
get_replicated_motif_attributes(
DnaA_box_coordinates[removed_DnaA_boxes_mask],
DnaA_box_domain_indexes[removed_DnaA_boxes_mask],
)
)
# Add new DnaA boxes with new domain indexes
DnaA_box_indices = create_unique_indexes(
n_new_DnaA_boxes, self.random_state
)
dict_dna = {
"add": {
"unique_index": DnaA_box_indices,
"coordinates": DnaA_box_coordinates_new,
"domain_index": DnaA_box_domain_indexes_new,
"DnaA_bound": np.zeros(n_new_DnaA_boxes, dtype=np.bool_),
}
}
update["DnaA_boxes"].update(dict_dna)
update["next_update_time"] = states["global_time"] + states["timestep"]
return update
[docs]
def _compute_new_segment_attributes(
self,
old_boundary_molecule_indexes: npt.NDArray[np.int64],
old_boundary_coordinates: npt.NDArray[np.int64],
old_linking_numbers: npt.NDArray[np.int64],
new_molecule_indexes: npt.NDArray[np.int64],
new_molecule_coordinates: npt.NDArray[np.int64],
spans_oriC: bool,
spans_terC: bool,
) -> dict[str, npt.NDArray[np.int64]]:
"""
Calculates the updated attributes of chromosomal segments belonging to
a specific chromosomal domain, given the previous and current
coordinates of molecules bound to the chromosome.
Args:
old_boundary_molecule_indexes: (N, 2) array of unique
indexes of molecules that formed the boundaries of each
chromosomal segment in the previous timestep.
old_boundary_coordinates: (N, 2) array of chromosomal
coordinates of molecules that formed the boundaries of each
chromosomal segment in the previous timestep.
old_linking_numbers: (N,) array of linking numbers of each
chromosomal segment in the previous timestep.
new_molecule_indexes: (N,) array of unique indexes of all
molecules bound to the domain at the current timestep.
new_molecule_coordinates: (N,) array of chromosomal
coordinates of all molecules bound to the domain at the current
timestep.
spans_oriC: True if the domain spans the origin.
spans_terC: True if the domain spans the terminus.
Returns:
Dictionary of the following format::
{
'boundary_molecule_indexes': (M, 2) array of unique
indexes of molecules that form the boundaries of new
chromosomal segments,
'boundary_coordinates': (M, 2) array of chromosomal
coordinates of molecules that form the boundaries of
new chromosomal segments,
'linking_numbers': (M,) array of linking numbers of new
chromosomal segments
}
"""
# Sort old segment arrays by coordinates of left boundary
old_coordinates_argsort = np.argsort(old_boundary_coordinates[:, 0])
old_boundary_coordinates_sorted = old_boundary_coordinates[
old_coordinates_argsort, :
]
old_boundary_molecule_indexes_sorted = old_boundary_molecule_indexes[
old_coordinates_argsort, :
]
old_linking_numbers_sorted = old_linking_numbers[old_coordinates_argsort]
# Sort new segment arrays by molecular coordinates
new_coordinates_argsort = np.argsort(new_molecule_coordinates)
new_molecule_coordinates_sorted = new_molecule_coordinates[
new_coordinates_argsort
]
new_molecule_indexes_sorted = new_molecule_indexes[new_coordinates_argsort]
# Domain does not span the origin
if not spans_oriC:
# A fragment spans oriC if two boundaries have opposite signs,
# or both are equal to zero
oriC_fragment_counts = np.count_nonzero(
np.logical_not(
np.logical_xor(
old_boundary_coordinates_sorted[:, 0] < 0,
old_boundary_coordinates_sorted[:, 1] > 0,
)
)
)
# if oriC fragment did not exist in the domain in the previous
# timestep, add a dummy fragment that covers the origin with
# linking number zero. This is done to generalize the
# implementation of this method.
if oriC_fragment_counts == 0:
# Index of first segment where left boundary is nonnegative
oriC_fragment_index = np.argmax(
old_boundary_coordinates_sorted[:, 0] >= 0
)
# Get indexes of boundary molecules for this dummy segment
oriC_fragment_boundary_molecule_indexes = np.array(
[
old_boundary_molecule_indexes_sorted[
oriC_fragment_index - 1, 1
],
old_boundary_molecule_indexes_sorted[oriC_fragment_index, 0],
]
)
# Insert dummy segment to array
old_boundary_molecule_indexes_sorted = np.insert(
old_boundary_molecule_indexes_sorted,
oriC_fragment_index,
oriC_fragment_boundary_molecule_indexes,
axis=0,
)
old_linking_numbers_sorted = np.insert(
old_linking_numbers_sorted, oriC_fragment_index, 0
)
else:
# There should not be more than one fragment that spans oriC
assert oriC_fragment_counts == 1
# Domain spans the terminus
if spans_terC:
# If the domain spans the terminus, dummy molecules are added to
# each end of the chromosome s.t. the segment that spans terC is
# split to two segments and we can maintain a linear representation
# for the circular chromosome. These two segments are later
# adjusted to have the same superhelical densities.
new_molecule_coordinates_sorted = np.insert(
new_molecule_coordinates_sorted,
[0, len(new_molecule_coordinates_sorted)],
[self.min_coordinates, self.max_coordinates],
)
new_molecule_indexes_sorted = np.insert(
new_molecule_indexes_sorted,
[0, len(new_molecule_indexes_sorted)],
self.terC_index,
)
# Add dummy molecule to old segments if they do not already exist
if old_boundary_molecule_indexes_sorted[0, 0] != self.terC_index:
old_boundary_molecule_indexes_sorted = np.vstack(
(
np.array(
[
self.terC_index,
old_boundary_molecule_indexes_sorted[0, 0],
]
),
old_boundary_molecule_indexes_sorted,
np.array(
[
old_boundary_molecule_indexes_sorted[-1, 1],
self.terC_index,
]
),
)
)
old_linking_numbers_sorted = np.insert(
old_linking_numbers_sorted, [0, len(old_linking_numbers_sorted)], 0
)
# Recalculate linking numbers of each segment after accounting for
# boundary molecules that were removed in the current timestep
linking_numbers_after_removal = []
right_boundaries_retained = np.isin(
old_boundary_molecule_indexes_sorted[:, 1], new_molecule_indexes_sorted
)
# Add up linking numbers of each segment until each retained boundary
ln_this_fragment = 0.0
for retained, ln in zip(right_boundaries_retained, old_linking_numbers_sorted):
ln_this_fragment += ln
if retained:
linking_numbers_after_removal.append(ln_this_fragment)
ln_this_fragment = 0.0
# Number of segments should be equal to number of retained boundaries
assert len(linking_numbers_after_removal) == right_boundaries_retained.sum()
# Redistribute linking numbers of the two terC segments such that the
# segments have same superhelical densities
if spans_terC and np.count_nonzero(right_boundaries_retained) > 1:
# Get molecule indexes of the boundaries of the two terC segments
# left and right of terC
retained_boundary_indexes = np.where(right_boundaries_retained)[0]
left_segment_boundary_index = old_boundary_molecule_indexes_sorted[
retained_boundary_indexes[0], 1
]
right_segment_boundary_index = old_boundary_molecule_indexes_sorted[
retained_boundary_indexes[-2], 1
]
# Get mapping from molecule index to chromosomal coordinates
molecule_index_to_coordinates = {
index: coordinates
for index, coordinates in zip(
new_molecule_indexes_sorted, new_molecule_coordinates_sorted
)
}
# Distribute linking number between two segments proportional to
# the length of each segment
left_segment_length = (
molecule_index_to_coordinates[left_segment_boundary_index]
- self.min_coordinates
)
right_segment_length = (
self.max_coordinates
- molecule_index_to_coordinates[right_segment_boundary_index]
)
full_segment_length = left_segment_length + right_segment_length
full_linking_number = (
linking_numbers_after_removal[0] + linking_numbers_after_removal[-1]
)
linking_numbers_after_removal[0] = (
full_linking_number * left_segment_length / full_segment_length
)
linking_numbers_after_removal[-1] = (
full_linking_number * right_segment_length / full_segment_length
)
# Get mask for molecules that already existed in the previous timestep
existing_molecules_mask = np.isin(
new_molecule_indexes_sorted, old_boundary_molecule_indexes_sorted
)
# Get numbers and lengths of new segments that each segment will be
# split into
segment_split_sizes = np.diff(np.where(existing_molecules_mask)[0])
segment_lengths = np.diff(new_molecule_coordinates_sorted)
assert len(segment_split_sizes) == len(linking_numbers_after_removal)
# Calculate linking numbers of each segment after accounting for new
# boundaries that were added
new_linking_numbers = []
i = 0
for ln, size in zip(linking_numbers_after_removal, segment_split_sizes):
if size == 1:
new_linking_numbers.append(ln)
else:
# Split linking number proportional to length of segment
total_length = segment_lengths[i : i + size].sum()
new_linking_numbers.extend(
list(ln * segment_lengths[i : i + size] / total_length)
)
i += size
# Handle edge case where a domain was just initialized, and two
# replisomes are bound to the origin
if len(new_linking_numbers) == 0:
new_linking_numbers = [0]
# Build Mx2 array for boundary indexes and coordinates
new_boundary_molecule_indexes = np.hstack(
(
new_molecule_indexes_sorted[:-1, np.newaxis],
new_molecule_indexes_sorted[1:, np.newaxis],
)
)
new_boundary_coordinates = np.hstack(
(
new_molecule_coordinates_sorted[:-1, np.newaxis],
new_molecule_coordinates_sorted[1:, np.newaxis],
)
)
new_linking_numbers = np.array(new_linking_numbers)
# If domain does not span oriC, remove new segment that spans origin
if not spans_oriC:
oriC_fragment_mask = np.logical_not(
np.logical_xor(
new_boundary_coordinates[:, 0] < 0,
new_boundary_coordinates[:, 1] > 0,
)
)
assert oriC_fragment_mask.sum() == 1
new_boundary_molecule_indexes = new_boundary_molecule_indexes[
np.logical_not(oriC_fragment_mask), :
]
new_boundary_coordinates = new_boundary_coordinates[
np.logical_not(oriC_fragment_mask), :
]
new_linking_numbers = new_linking_numbers[
np.logical_not(oriC_fragment_mask)
]
return {
"boundary_molecule_indexes": new_boundary_molecule_indexes,
"boundary_coordinates": new_boundary_coordinates,
"linking_numbers": new_linking_numbers,
}