Source code for ecoli.library.initial_conditions

"""
Functions to initialize molecule states from sim_data.
"""

import numpy as np
import numpy.typing as npt
from numpy.lib import recfunctions as rfn
from typing import Any
from unum import Unum

from ecoli.library.schema import (
    attrs,
    bulk_name_to_idx,
    counts,
    MetadataArray,
)
from ecoli.processes.polypeptide_elongation import (
    calculate_trna_charging,
    REMOVED_FROM_CHARGING,
    MICROMOLAR_UNITS,
)
from wholecell.utils import units
from wholecell.utils.fitting import (
    countsFromMassAndExpression,
    masses_and_counts_for_homeostatic_target,
    normalize,
)

try:
    from wholecell.utils.mc_complexation import mccFormComplexesWithPrebuiltMatrices
except ImportError as exc:
    raise RuntimeError(
        "Failed to import Cython module. Try running 'make clean compile'."
    ) from exc
from wholecell.utils.polymerize import computeMassIncrease
from wholecell.utils.random import stochasticRound

RAND_MAX = 2**31


[docs] def create_bulk_container( sim_data, n_seeds=1, condition=None, seed=0, ppgpp_regulation=True, trna_attenuation=True, mass_coeff=1, form_complexes=True, ): try: old_condition = sim_data.condition if condition is not None: sim_data.condition = condition media_id = sim_data.conditions[sim_data.condition]["nutrients"] exchange_data = sim_data.external_state.exchange_data_from_media(media_id) import_molecules = set( exchange_data["importUnconstrainedExchangeMolecules"] ) | set(exchange_data["importConstrainedExchangeMolecules"]) random_state = np.random.RandomState(seed=seed) # Construct bulk container ids_molecules = sim_data.internal_state.bulk_molecules.bulk_data["id"] average_container = np.array( [mol_data for mol_data in zip(ids_molecules, np.zeros(len(ids_molecules)))], dtype=[("id", ids_molecules.dtype), ("count", np.float64)], ) for n in range(n_seeds): random_state = np.random.RandomState(seed=seed + n) average_container["count"] += initialize_bulk_counts( sim_data, media_id, import_molecules, random_state, mass_coeff, ppgpp_regulation, trna_attenuation, form_complexes=form_complexes, )["count"] except Exception: raise RuntimeError( "sim_data might not be fully initialized. " "Make sure all attributes have been set before " "using this function." ) sim_data.condition = old_condition average_container["count"] = average_container["count"] / n_seeds return average_container
[docs] def initialize_bulk_counts( sim_data, media_id, import_molecules, random_state, mass_coeff, ppgpp_regulation, trna_attenuation, form_complexes=True, ): # Allocate count array to populate bulk_counts = np.zeros( len(sim_data.internal_state.bulk_molecules.bulk_data["id"]), dtype=int ) # Set protein counts from expression initialize_protein_monomers( bulk_counts, sim_data, random_state, mass_coeff, ppgpp_regulation, trna_attenuation, ) # Set RNA counts from expression initialize_rna( bulk_counts, sim_data, random_state, mass_coeff, ppgpp_regulation, trna_attenuation, ) # Set mature RNA counts initialize_mature_RNA(bulk_counts, sim_data) # Set other biomass components set_small_molecule_counts( bulk_counts, sim_data, media_id, import_molecules, mass_coeff ) # Form complexes if form_complexes: initialize_complexation(bulk_counts, sim_data, random_state) bulk_masses = sim_data.internal_state.bulk_molecules.bulk_data["mass"].asNumber( units.fg / units.mol ) / sim_data.constants.n_avogadro.asNumber(1 / units.mol) bulk_submasses = [] bulk_submass_dtypes = [] for submass, idx in sim_data.submass_name_to_index.items(): bulk_submasses.append(bulk_masses[:, idx]) bulk_submass_dtypes.append((f"{submass}_submass", np.float64)) bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data.struct_array["id"] bulk_array = np.array( [mol_data for mol_data in zip(bulk_ids, bulk_counts, *bulk_submasses)], dtype=[("id", bulk_ids.dtype), ("count", int)] + bulk_submass_dtypes, ) return bulk_array
[docs] def initialize_unique_molecules( bulk_state, sim_data, cell_mass, random_state, unique_id_rng, superhelical_density, ppgpp_regulation, trna_attenuation, mechanistic_replisome, ): unique_molecules = {} # Initialize counts of full chromosomes initialize_full_chromosome(unique_molecules, sim_data, unique_id_rng) # Initialize unique molecules relevant to replication initialize_replication( bulk_state, unique_molecules, sim_data, cell_mass, mechanistic_replisome, unique_id_rng, ) # Initialize bound transcription factors initialize_transcription_factors( bulk_state, unique_molecules, sim_data, random_state ) # Initialize active RNAPs and unique molecule representations of RNAs initialize_transcription( bulk_state, unique_molecules, sim_data, random_state, unique_id_rng, ppgpp_regulation, trna_attenuation, ) # Initialize linking numbers of chromosomal segments if superhelical_density: initialize_chromosomal_segments(unique_molecules, sim_data, unique_id_rng) else: unique_molecules["chromosomal_segment"] = create_new_unique_molecules( "chromosomal_segment", 0, sim_data, unique_id_rng ) # Initialize active ribosomes initialize_translation( bulk_state, unique_molecules, sim_data, random_state, unique_id_rng ) return unique_molecules
[docs] def create_new_unique_molecules(name, n_mols, sim_data, random_state, **attrs): """ Helper function to create a new Numpy structured array with n_mols instances of the unique molecule called name. Accepts keyword arguments that become initial values for specified attributes of the new molecules. """ dtypes = list( sim_data.internal_state.unique_molecule.unique_molecule_definitions[ name ].items() ) submasses = list(sim_data.submass_name_to_index) dtypes += [(f"massDiff_{submass}", "<f8") for submass in submasses] dtypes += [("_entryState", "i1"), ("unique_index", "<i8")] unique_mols = np.zeros(n_mols, dtype=dtypes) for attr_name, attr_value in attrs.items(): unique_mols[attr_name] = attr_value # Each unique molecule has unique prefix for indices to prevent conflicts unique_mol_names = list( sim_data.internal_state.unique_molecule.unique_molecule_definitions.keys() ) unique_prefix = unique_mol_names.index(name) << 59 unique_mols["unique_index"] = np.arange(unique_prefix, unique_prefix + n_mols) unique_mols["_entryState"] = 1 unique_mols = MetadataArray(unique_mols, unique_prefix + n_mols) return unique_mols
[docs] def initialize_protein_monomers( bulk_counts, sim_data, random_state, mass_coeff, ppgpp_regulation, trna_attenuation ): monomer_mass = ( mass_coeff * sim_data.mass.get_component_masses( sim_data.condition_to_doubling_time[sim_data.condition] )["proteinMass"] / sim_data.mass.avg_cell_to_initial_cell_conversion_factor ) # TODO: unify this logic with the parca so it doesn]t fall out of step # again (look at teh calProteinCounts function) transcription = sim_data.process.transcription if ppgpp_regulation: rna_expression = sim_data.calculate_ppgpp_expression(sim_data.condition) else: rna_expression = transcription.rna_expression[sim_data.condition] if trna_attenuation: # Need to adjust expression (calculated without attenuation) by basal_adjustment # to get the expected expression without any attenuation and then multiply # by the condition readthrough probability to get the condition specific expression readthrough = transcription.attenuation_readthrough[sim_data.condition] basal_adjustment = transcription.attenuation_readthrough["basal"] rna_expression[transcription.attenuated_rna_indices] *= ( readthrough / basal_adjustment ) monomer_expression = normalize( sim_data.process.transcription.cistron_tu_mapping_matrix.dot(rna_expression)[ sim_data.relation.cistron_to_monomer_mapping ] * sim_data.process.translation.translation_efficiencies_by_monomer / ( np.log(2) / sim_data.condition_to_doubling_time[sim_data.condition].asNumber(units.s) + sim_data.process.translation.monomer_data["deg_rate"].asNumber( 1 / units.s ) ) ) n_monomers = countsFromMassAndExpression( monomer_mass.asNumber(units.g), sim_data.process.translation.monomer_data["mw"].asNumber(units.g / units.mol), monomer_expression, sim_data.constants.n_avogadro.asNumber(1 / units.mol), ) # Get indices for monomers in bulk counts array monomer_ids = sim_data.process.translation.monomer_data["id"] bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"] monomer_idx = bulk_name_to_idx(monomer_ids, bulk_ids) # Calculate initial counts of each monomer from mutinomial distribution bulk_counts[monomer_idx] = random_state.multinomial(n_monomers, monomer_expression)
[docs] def initialize_rna( bulk_counts, sim_data, random_state, mass_coeff, ppgpp_regulation, trna_attenuation ): """ Initializes counts of RNAs in the bulk molecule container using RNA expression data. mRNA counts are also initialized here, but is later reset to zero when the representations for mRNAs are moved to the unique molecule container. """ transcription = sim_data.process.transcription rna_mass = ( mass_coeff * sim_data.mass.get_component_masses( sim_data.condition_to_doubling_time[sim_data.condition] )["rnaMass"] / sim_data.mass.avg_cell_to_initial_cell_conversion_factor ) if ppgpp_regulation: rna_expression = sim_data.calculate_ppgpp_expression(sim_data.condition) else: rna_expression = normalize(transcription.rna_expression[sim_data.condition]) if trna_attenuation: # Need to adjust expression (calculated without attenuation) by basal_adjustment # to get the expected expression without any attenuation and then multiply # by the condition readthrough probability to get the condition specific expression readthrough = transcription.attenuation_readthrough[sim_data.condition] basal_adjustment = transcription.attenuation_readthrough["basal"] rna_expression[transcription.attenuated_rna_indices] *= ( readthrough / basal_adjustment ) rna_expression /= rna_expression.sum() n_rnas = countsFromMassAndExpression( rna_mass.asNumber(units.g), transcription.rna_data["mw"].asNumber(units.g / units.mol), rna_expression, sim_data.constants.n_avogadro.asNumber(1 / units.mol), ) # Get indices for monomers in bulk counts array rna_ids = transcription.rna_data["id"] bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"] rna_idx = bulk_name_to_idx(rna_ids, bulk_ids) # Calculate initial counts of each RNA from mutinomial distribution bulk_counts[rna_idx] = random_state.multinomial(n_rnas, rna_expression)
[docs] def initialize_mature_RNA(bulk_counts, sim_data): """ Initializes counts of mature RNAs in the bulk molecule container using the counts of unprocessed RNAs. Also consolidates the different variants of each rRNA molecule into the main type. """ transcription = sim_data.process.transcription rna_data = transcription.rna_data unprocessed_rna_ids = rna_data["id"][rna_data["is_unprocessed"]] bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"] unprocessed_rna_idx = bulk_name_to_idx(unprocessed_rna_ids, bulk_ids) # Skip if there are no unprocessed RNAs represented if len(unprocessed_rna_ids) > 0: mature_rna_ids = transcription.mature_rna_data["id"] maturation_stoich_matrix = transcription.rna_maturation_stoich_matrix mature_rna_idx = bulk_name_to_idx(mature_rna_ids, bulk_ids) # Get counts of unprocessed RNAs unprocessed_rna_counts = bulk_counts[unprocessed_rna_idx] # Assume all unprocessed RNAs are converted to mature RNAs bulk_counts[unprocessed_rna_idx] = 0 bulk_counts[mature_rna_idx] += maturation_stoich_matrix.dot( unprocessed_rna_counts ) # Get IDs of rRNAs main_23s_rRNA_id = sim_data.molecule_groups.s50_23s_rRNA[0] main_16s_rRNA_id = sim_data.molecule_groups.s30_16s_rRNA[0] main_5s_rRNA_id = sim_data.molecule_groups.s50_5s_rRNA[0] variant_23s_rRNA_ids = sim_data.molecule_groups.s50_23s_rRNA[1:] variant_16s_rRNA_ids = sim_data.molecule_groups.s30_16s_rRNA[1:] variant_5s_rRNA_ids = sim_data.molecule_groups.s50_5s_rRNA[1:] # Get indices of main and variant rRNAs main_23s_rRNA_idx = bulk_name_to_idx(main_23s_rRNA_id, bulk_ids) main_16s_rRNA_idx = bulk_name_to_idx(main_16s_rRNA_id, bulk_ids) main_5s_rRNA_idx = bulk_name_to_idx(main_5s_rRNA_id, bulk_ids) variant_23s_rRNA_idx = bulk_name_to_idx(variant_23s_rRNA_ids, bulk_ids) variant_16s_rRNA_idx = bulk_name_to_idx(variant_16s_rRNA_ids, bulk_ids) variant_5s_rRNA_idx = bulk_name_to_idx(variant_5s_rRNA_ids, bulk_ids) # Evolve states bulk_counts[main_23s_rRNA_idx] += bulk_counts[variant_23s_rRNA_idx].sum() bulk_counts[main_16s_rRNA_idx] += bulk_counts[variant_16s_rRNA_idx].sum() bulk_counts[main_5s_rRNA_idx] += bulk_counts[variant_5s_rRNA_idx].sum() bulk_counts[variant_23s_rRNA_idx] -= bulk_counts[variant_23s_rRNA_idx] bulk_counts[variant_16s_rRNA_idx] -= bulk_counts[variant_16s_rRNA_idx] bulk_counts[variant_5s_rRNA_idx] -= bulk_counts[variant_5s_rRNA_idx]
# TODO: remove checks for zero concentrations (change to assertion) # TODO: move any rescaling logic to KB/fitting
[docs] def set_small_molecule_counts( bulk_counts, sim_data, media_id, import_molecules, mass_coeff, cell_mass=None ): doubling_time = sim_data.condition_to_doubling_time[sim_data.condition] conc_dict = sim_data.process.metabolism.concentration_updates.concentrations_based_on_nutrients( media_id=media_id, imports=import_molecules ) conc_dict.update(sim_data.mass.getBiomassAsConcentrations(doubling_time)) conc_dict[sim_data.molecule_ids.ppGpp] = ( sim_data.growth_rate_parameters.get_ppGpp_conc(doubling_time) ) molecule_ids = sorted(conc_dict) bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"] molecule_concentrations = (units.mol / units.L) * np.array( [conc_dict[key].asNumber(units.mol / units.L) for key in molecule_ids] ) if cell_mass is None: avg_cell_fraction_mass = sim_data.mass.get_component_masses(doubling_time) other_dry_mass = ( mass_coeff * ( avg_cell_fraction_mass["proteinMass"] + avg_cell_fraction_mass["rnaMass"] + avg_cell_fraction_mass["dnaMass"] ) / sim_data.mass.avg_cell_to_initial_cell_conversion_factor ) else: small_molecule_mass = 0 * units.fg for mol in conc_dict: mol_idx = bulk_name_to_idx(mol, bulk_ids) small_molecule_mass += ( bulk_counts[mol_idx] * sim_data.getter.get_mass(mol) / sim_data.constants.n_avogadro ) other_dry_mass = cell_mass - small_molecule_mass masses_to_add, counts_to_add = masses_and_counts_for_homeostatic_target( other_dry_mass, molecule_concentrations, sim_data.getter.get_masses(molecule_ids), sim_data.constants.cell_density, sim_data.constants.n_avogadro, ) molecule_idx = bulk_name_to_idx(molecule_ids, bulk_ids) bulk_counts[molecule_idx] = counts_to_add
[docs] def initialize_complexation(bulk_counts, sim_data, random_state): molecule_names = sim_data.process.complexation.molecule_names bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"] molecule_idx = bulk_name_to_idx(molecule_names, bulk_ids) stoich_matrix = sim_data.process.complexation.stoich_matrix().astype( np.int64, order="F" ) molecule_counts = bulk_counts[molecule_idx] updated_molecule_counts, complexation_events = mccFormComplexesWithPrebuiltMatrices( molecule_counts, random_state.randint(1000), stoich_matrix, *sim_data.process.complexation.prebuilt_matrices, ) bulk_counts[molecule_idx] = updated_molecule_counts if np.any(updated_molecule_counts < 0): raise ValueError("Negative counts after complexation")
[docs] def initialize_full_chromosome(unique_molecules, sim_data, unique_id_rng): """ Initializes the counts of full chromosomes to one. The division_time of this initial chromosome is set to be zero for consistency. """ unique_molecules["full_chromosome"] = create_new_unique_molecules( "full_chromosome", 1, sim_data, unique_id_rng, division_time=0.0, has_triggered_division=True, domain_index=0, )
[docs] def initialize_replication( bulk_state, unique_molecules, sim_data, cell_mass, mechanistic_replisome, unique_id_rng, ): """ Initializes replication by creating an appropriate number of replication forks given the cell growth rate. This also initializes the gene dosage bulk counts using the initial locations of the forks. """ # Determine the number and location of replication forks at the start of # the cell cycle # Get growth rate constants tau = sim_data.condition_to_doubling_time[sim_data.condition].asUnit(units.min) critical_mass = sim_data.mass.get_dna_critical_mass(tau) replication_rate = sim_data.process.replication.basal_elongation_rate # Calculate length of replichore genome_length = sim_data.process.replication.genome_length replichore_length = np.ceil(0.5 * genome_length) * units.nt # Calculate the maximum number of replisomes that could be formed with # the existing counts of replisome subunits. If mechanistic_replisome option # is off, set to an arbitrary high number. replisome_trimer_idx = bulk_name_to_idx( sim_data.molecule_groups.replisome_trimer_subunits, bulk_state["id"] ) replisome_monomer_idx = bulk_name_to_idx( sim_data.molecule_groups.replisome_monomer_subunits, bulk_state["id"] ) if mechanistic_replisome: n_max_replisomes = np.min( np.concatenate( ( bulk_state["count"][replisome_trimer_idx] // 3, bulk_state["count"][replisome_monomer_idx], ) ) ) else: n_max_replisomes = 1000 # Generate arrays specifying appropriate initial replication conditions oric_state, replisome_state, domain_state = determine_chromosome_state( tau, replichore_length, n_max_replisomes, sim_data.process.replication.no_child_place_holder, cell_mass, critical_mass, replication_rate, ) n_oric = oric_state["domain_index"].size n_replisome = replisome_state["domain_index"].size n_domain = domain_state["domain_index"].size # Add OriC molecules with the proposed attributes unique_molecules["oriC"] = create_new_unique_molecules( "oriC", n_oric, sim_data, unique_id_rng, domain_index=oric_state["domain_index"] ) # Add chromosome domain molecules with the proposed attributes unique_molecules["chromosome_domain"] = create_new_unique_molecules( "chromosome_domain", n_domain, sim_data, unique_id_rng, domain_index=domain_state["domain_index"], child_domains=domain_state["child_domains"], ) if n_replisome != 0: # Update mass of replisomes if the mechanistic replisome option is set if mechanistic_replisome: replisome_trimer_subunit_masses = np.vstack( [ sim_data.getter.get_submass_array(x).asNumber( units.fg / units.count ) for x in sim_data.molecule_groups.replisome_trimer_subunits ] ) replisome_monomer_subunit_masses = np.vstack( [ sim_data.getter.get_submass_array(x).asNumber( units.fg / units.count ) for x in sim_data.molecule_groups.replisome_monomer_subunits ] ) replisome_mass_array = 3 * replisome_trimer_subunit_masses.sum( axis=0 ) + replisome_monomer_subunit_masses.sum(axis=0) replisome_protein_mass = replisome_mass_array.sum() else: replisome_protein_mass = 0.0 # Update mass to account for DNA strands that have already been # elongated. sequences = sim_data.process.replication.replication_sequences fork_coordinates = replisome_state["coordinates"] sequence_elongations = np.abs(np.repeat(fork_coordinates, 2)) mass_increase_dna = computeMassIncrease( np.tile(sequences, (n_replisome // 2, 1)), sequence_elongations, sim_data.process.replication.replication_monomer_weights.asNumber(units.fg), ) # Add active replisomes as unique molecules and set attributes unique_molecules["active_replisome"] = create_new_unique_molecules( "active_replisome", n_replisome, sim_data, unique_id_rng, domain_index=replisome_state["domain_index"], coordinates=replisome_state["coordinates"], right_replichore=replisome_state["right_replichore"], massDiff_DNA=mass_increase_dna[0::2] + mass_increase_dna[1::2], massDiff_protein=replisome_protein_mass, ) if mechanistic_replisome: # Remove replisome subunits from bulk molecules bulk_state["count"][replisome_trimer_idx] -= 3 * n_replisome bulk_state["count"][replisome_monomer_idx] -= n_replisome else: # For n_replisome = 0, still create an empty structured array with # the expected fields unique_molecules["active_replisome"] = create_new_unique_molecules( "active_replisome", n_replisome, sim_data, unique_id_rng ) # Get coordinates of all genes, promoters and DnaA boxes all_gene_coordinates = sim_data.process.transcription.cistron_data[ "replication_coordinate" ] all_promoter_coordinates = sim_data.process.transcription.rna_data[ "replication_coordinate" ] all_DnaA_box_coordinates = sim_data.process.replication.motif_coordinates[ "DnaA_box" ] # Define function that initializes attributes of sequence motifs given the # initial state of the chromosome def get_motif_attributes(all_motif_coordinates): """ Using the initial positions of replication forks, calculate attributes of unique molecules representing DNA motifs, given their positions on the genome. Args: all_motif_coordinates (ndarray): Genomic coordinates of DNA motifs, represented in a specific order Returns: motif_index: Indices of all motif copies, in the case where different indexes imply a different functional role motif_coordinates: Genomic coordinates of all motif copies motif_domain_index: Domain indexes of the chromosome domain that each motif copy belongs to """ motif_index, motif_coordinates, motif_domain_index = [], [], [] def in_bounds(coordinates, lb, ub): return np.logical_and(coordinates < ub, coordinates > lb) # Loop through all chromosome domains for domain_idx in domain_state["domain_index"]: # If the domain is the mother domain of the initial chromosome, if domain_idx == 0: if n_replisome == 0: # No replisomes - all motifs should fall in this domain motif_mask = np.ones_like(all_motif_coordinates, dtype=bool) else: # Get domain boundaries domain_boundaries = replisome_state["coordinates"][ replisome_state["domain_index"] == 0 ] # Add motifs outside of this boundary motif_mask = np.logical_or( all_motif_coordinates > domain_boundaries.max(), all_motif_coordinates < domain_boundaries.min(), ) # If the domain contains the origin, elif np.isin(domain_idx, oric_state["domain_index"]): # Get index of the parent domain parent_domain_idx = domain_state["domain_index"][ np.where(domain_state["child_domains"] == domain_idx)[0] ] # Get domain boundaries of the parent domain parent_domain_boundaries = replisome_state["coordinates"][ replisome_state["domain_index"] == parent_domain_idx ] # Add motifs inside this boundary motif_mask = in_bounds( all_motif_coordinates, parent_domain_boundaries.min(), parent_domain_boundaries.max(), ) # If the domain neither contains the origin nor the terminus, else: # Get index of the parent domain parent_domain_idx = domain_state["domain_index"][ np.where(domain_state["child_domains"] == domain_idx)[0] ] # Get domain boundaries of the parent domain parent_domain_boundaries = replisome_state["coordinates"][ replisome_state["domain_index"] == parent_domain_idx ] # Get domain boundaries of this domain domain_boundaries = replisome_state["coordinates"][ replisome_state["domain_index"] == domain_idx ] # Add motifs between the boundaries motif_mask = np.logical_or( in_bounds( all_motif_coordinates, domain_boundaries.max(), parent_domain_boundaries.max(), ), in_bounds( all_motif_coordinates, parent_domain_boundaries.min(), domain_boundaries.min(), ), ) # Append attributes to existing list motif_index.extend(np.nonzero(motif_mask)[0]) motif_coordinates.extend(all_motif_coordinates[motif_mask]) motif_domain_index.extend(np.full(motif_mask.sum(), domain_idx)) return motif_index, motif_coordinates, motif_domain_index # Use function to get attributes for promoters and DnaA boxes TU_index, promoter_coordinates, promoter_domain_index = get_motif_attributes( all_promoter_coordinates ) cistron_index, gene_coordinates, gene_domain_index = get_motif_attributes( all_gene_coordinates ) _, DnaA_box_coordinates, DnaA_box_domain_index = get_motif_attributes( all_DnaA_box_coordinates ) # Add promoters as unique molecules and set attributes # Note: the bound_TF attribute is properly initialized in the function # initialize_transcription_factors n_promoter = len(TU_index) n_tf = len(sim_data.process.transcription_regulation.tf_ids) unique_molecules["promoter"] = create_new_unique_molecules( "promoter", n_promoter, sim_data, unique_id_rng, domain_index=promoter_domain_index, coordinates=promoter_coordinates, TU_index=TU_index, bound_TF=np.zeros((n_promoter, n_tf), dtype=bool), ) # Add genes as unique molecules and set attributes n_gene = len(cistron_index) unique_molecules["gene"] = create_new_unique_molecules( "gene", n_gene, sim_data, unique_id_rng, cistron_index=cistron_index, coordinates=gene_coordinates, domain_index=gene_domain_index, ) # Add DnaA boxes as unique molecules and set attributes n_DnaA_box = len(DnaA_box_coordinates) unique_molecules["DnaA_box"] = create_new_unique_molecules( "DnaA_box", n_DnaA_box, sim_data, unique_id_rng, domain_index=DnaA_box_domain_index, coordinates=DnaA_box_coordinates, DnaA_bound=np.zeros(n_DnaA_box, dtype=bool), )
[docs] def initialize_transcription_factors( bulk_state, unique_molecules, sim_data, random_state ): """ Initialize transcription factors that are bound to the chromosome. For each type of transcription factor, this function calculates the total number of transcription factors that should be bound to the chromosome using the binding probabilities of each transcription factor and the number of available promoter sites. The calculated number of transcription factors are then distributed randomly to promoters, whose bound_TF attributes and submasses are updated correspondingly. """ # Get transcription factor properties from sim_data tf_ids = sim_data.process.transcription_regulation.tf_ids tf_to_tf_type = sim_data.process.transcription_regulation.tf_to_tf_type p_promoter_bound_TF = sim_data.process.transcription_regulation.p_promoter_bound_tf # Build dict that maps TFs to transcription units they regulate delta_prob = sim_data.process.transcription_regulation.delta_prob TF_to_TU_idx = {} for i, tf in enumerate(tf_ids): TF_to_TU_idx[tf] = delta_prob["deltaI"][delta_prob["deltaJ"] == i] # Get views into bulk molecule representations of transcription factors active_tf_view = {} inactive_tf_view = {} active_tf_view_idx = {} inactive_tf_view_idx = {} for tf in tf_ids: tf_idx = bulk_name_to_idx(tf + "[c]", bulk_state["id"]) active_tf_view[tf] = bulk_state["count"][tf_idx] active_tf_view_idx[tf] = tf_idx if tf_to_tf_type[tf] == "1CS": if tf == sim_data.process.transcription_regulation.active_to_bound[tf]: inactive_tf_idx = bulk_name_to_idx( sim_data.process.equilibrium.get_unbound(tf + "[c]"), bulk_state["id"], ) inactive_tf_view[tf] = bulk_state["count"][inactive_tf_idx] else: inactive_tf_idx = bulk_name_to_idx( sim_data.process.transcription_regulation.active_to_bound[tf] + "[c]", bulk_state["id"], ) inactive_tf_view[tf] = bulk_state["count"][inactive_tf_idx] elif tf_to_tf_type[tf] == "2CS": inactive_tf_idx = bulk_name_to_idx( sim_data.process.two_component_system.active_to_inactive_tf[tf + "[c]"], bulk_state["id"], ) inactive_tf_view[tf] = bulk_state["count"][inactive_tf_idx] inactive_tf_view_idx[tf] = inactive_tf_idx # Get masses of active transcription factors tf_indexes = [np.where(bulk_state["id"] == tf_id + "[c]")[0][0] for tf_id in tf_ids] active_tf_masses = ( sim_data.internal_state.bulk_molecules.bulk_data["mass"][tf_indexes] / sim_data.constants.n_avogadro ).asNumber(units.fg) # Get TU indices of promoters TU_index = unique_molecules["promoter"]["TU_index"] # Initialize bound_TF array bound_TF = np.zeros((len(TU_index), len(tf_ids)), dtype=bool) for tf_idx, tf_id in enumerate(tf_ids): # Get counts of transcription factors active_tf_counts = active_tf_view[tf_id] # If there are no active transcription factors at initialization, # continue to the next transcription factor if active_tf_counts == 0: continue # Compute probability of binding the promoter if tf_to_tf_type[tf_id] == "0CS": p_promoter_bound = 1.0 else: inactive_tf_counts = inactive_tf_view[tf_id] p_promoter_bound = p_promoter_bound_TF(active_tf_counts, inactive_tf_counts) # Determine the number of available promoter sites available_promoters = np.isin(TU_index, TF_to_TU_idx[tf_id]) n_available_promoters = available_promoters.sum() # Calculate the number of promoters that should be bound n_to_bind = int( stochasticRound( random_state, np.full(n_available_promoters, p_promoter_bound) ).sum() ) bound_locs = np.zeros(n_available_promoters, dtype=bool) if n_to_bind > 0: # Determine randomly which DNA targets to bind based on which of # the following is more limiting: # number of promoter sites to bind, or number of active # transcription factors bound_locs[ random_state.choice( n_available_promoters, size=min(n_to_bind, active_tf_view[tf_id]), replace=False, ) ] = True # Update count of free transcription factors bulk_state["count"][active_tf_view_idx[tf_id]] -= bound_locs.sum() # Update bound_TF array bound_TF[available_promoters, tf_idx] = bound_locs # Calculate masses of bound TFs mass_diffs = bound_TF.dot(active_tf_masses) # Reset bound_TF attribute of promoters unique_molecules["promoter"]["bound_TF"] = bound_TF # Add mass_diffs array to promoter submass for submass, i in sim_data.submass_name_to_index.items(): unique_molecules["promoter"][f"massDiff_{submass}"] = mass_diffs[:, i]
[docs] def initialize_transcription( bulk_state, unique_molecules, sim_data, random_state, unique_id_rng, ppgpp_regulation, trna_attenuation, ): """ Activate RNA polymerases as unique molecules, and distribute them along lengths of trancription units, while decreasing counts of unactivated RNA polymerases (APORNAP-CPLX[c]). Also initialize unique molecule representations of fully transcribed mRNAs and partially transcribed RNAs, using counts of mRNAs initialized as bulk molecules, and the attributes of initialized RNA polymerases. The counts of full mRNAs represented as bulk molecules are reset to zero. RNA polymerases are placed randomly across the length of each transcription unit, with the synthesis probabilities for each TU determining the number of RNA polymerases placed at each gene. """ # Load parameters rna_lengths = sim_data.process.transcription.rna_data["length"].asNumber() rna_masses = ( sim_data.process.transcription.rna_data["mw"] / sim_data.constants.n_avogadro ).asNumber(units.fg) current_media_id = sim_data.conditions[sim_data.condition]["nutrients"] frac_active_rnap = sim_data.process.transcription.rnapFractionActiveDict[ current_media_id ] inactive_rnap_idx = bulk_name_to_idx( sim_data.molecule_ids.full_RNAP, bulk_state["id"] ) inactive_RNAP_counts = bulk_state["count"][inactive_rnap_idx] rna_sequences = sim_data.process.transcription.transcription_sequences nt_weights = sim_data.process.transcription.transcription_monomer_weights end_weight = sim_data.process.transcription.transcription_end_weight replichore_lengths = sim_data.process.replication.replichore_lengths chromosome_length = replichore_lengths.sum() # Number of rnaPoly to activate n_RNAPs_to_activate = np.int64(frac_active_rnap * inactive_RNAP_counts) # Get attributes of promoters TU_index, bound_TF, domain_index_promoters = attrs( unique_molecules["promoter"], ["TU_index", "bound_TF", "domain_index"] ) # Parameters for rnaSynthProb if ppgpp_regulation: doubling_time = sim_data.condition_to_doubling_time[sim_data.condition] ppgpp_conc = sim_data.growth_rate_parameters.get_ppGpp_conc(doubling_time) basal_prob, _ = sim_data.process.transcription.synth_prob_from_ppgpp( ppgpp_conc, sim_data.process.replication.get_average_copy_number ) ppgpp_scale = basal_prob[TU_index] # Use original delta prob if no ppGpp basal prob ppgpp_scale[ppgpp_scale == 0] = 1 else: basal_prob = sim_data.process.transcription_regulation.basal_prob.copy() ppgpp_scale = 1 if trna_attenuation: basal_prob[sim_data.process.transcription.attenuated_rna_indices] += ( sim_data.process.transcription.attenuation_basal_prob_adjustments ) n_TUs = len(basal_prob) delta_prob_matrix = sim_data.process.transcription_regulation.get_delta_prob_matrix( dense=True, ppgpp=ppgpp_regulation ) # Synthesis probabilities for different categories of genes rna_synth_prob_fractions = sim_data.process.transcription.rnaSynthProbFraction rna_synth_prob_R_protein = sim_data.process.transcription.rnaSynthProbRProtein rna_synth_prob_rna_polymerase = ( sim_data.process.transcription.rnaSynthProbRnaPolymerase ) # Get coordinates and transcription directions of transcription units replication_coordinate = sim_data.process.transcription.rna_data[ "replication_coordinate" ] transcription_direction = sim_data.process.transcription.rna_data["is_forward"] # Determine changes from genetic perturbations genetic_perturbations = {} perturbations = getattr(sim_data, "genetic_perturbations", {}) if len(perturbations) > 0: probability_indexes = [ (index, sim_data.genetic_perturbations[rna_data["id"]]) for index, rna_data in enumerate(sim_data.process.transcription.rna_data) if rna_data["id"] in sim_data.genetic_perturbations ] genetic_perturbations = { "fixedRnaIdxs": [pair[0] for pair in probability_indexes], "fixedSynthProbs": [pair[1] for pair in probability_indexes], } # ID Groups idx_rRNA = np.where(sim_data.process.transcription.rna_data["is_rRNA"])[0] idx_mRNA = np.where(sim_data.process.transcription.rna_data["is_mRNA"])[0] idx_tRNA = np.where(sim_data.process.transcription.rna_data["is_tRNA"])[0] idx_rprotein = np.where( sim_data.process.transcription.rna_data["includes_ribosomal_protein"] )[0] idx_rnap = np.where(sim_data.process.transcription.rna_data["includes_RNAP"])[0] # Calculate probabilities of the RNAP binding to the promoters promoter_init_probs = basal_prob[TU_index] + ppgpp_scale * np.multiply( delta_prob_matrix[TU_index, :], bound_TF ).sum(axis=1) if len(genetic_perturbations) > 0: rescale_initiation_probs( promoter_init_probs, TU_index, genetic_perturbations["fixedSynthProbs"], genetic_perturbations["fixedRnaIdxs"], ) # Adjust probabilities to not be negative promoter_init_probs[promoter_init_probs < 0] = 0.0 promoter_init_probs /= promoter_init_probs.sum() if np.any(promoter_init_probs < 0): raise Exception("Have negative RNA synthesis probabilities") # Adjust synthesis probabilities depending on environment synth_prob_fractions = rna_synth_prob_fractions[current_media_id] # Create masks for different types of RNAs is_mRNA = np.isin(TU_index, idx_mRNA) is_tRNA = np.isin(TU_index, idx_tRNA) is_rRNA = np.isin(TU_index, idx_rRNA) is_rprotein = np.isin(TU_index, idx_rprotein) is_rnap = np.isin(TU_index, idx_rnap) is_fixed = is_tRNA | is_rRNA | is_rprotein | is_rnap # Rescale initiation probabilities based on type of RNA promoter_init_probs[is_mRNA] *= ( synth_prob_fractions["mRna"] / promoter_init_probs[is_mRNA].sum() ) promoter_init_probs[is_tRNA] *= ( synth_prob_fractions["tRna"] / promoter_init_probs[is_tRNA].sum() ) promoter_init_probs[is_rRNA] *= ( synth_prob_fractions["rRna"] / promoter_init_probs[is_rRNA].sum() ) # Set fixed synthesis probabilities for RProteins and RNAPs rescale_initiation_probs( promoter_init_probs, TU_index, np.concatenate( ( rna_synth_prob_R_protein[current_media_id], rna_synth_prob_rna_polymerase[current_media_id], ) ), np.concatenate((idx_rprotein, idx_rnap)), ) assert promoter_init_probs[is_fixed].sum() < 1.0 # Adjust for attenuation that will stop transcription after initiation if trna_attenuation: attenuation_readthrough = { idx: prob for idx, prob in zip( sim_data.process.transcription.attenuated_rna_indices, sim_data.process.transcription.attenuation_readthrough[ sim_data.condition ], ) } readthrough_adjustment = np.array( [attenuation_readthrough.get(idx, 1) for idx in TU_index] ) promoter_init_probs *= readthrough_adjustment scale_the_rest_by = ( 1.0 - promoter_init_probs[is_fixed].sum() ) / promoter_init_probs[~is_fixed].sum() promoter_init_probs[~is_fixed] *= scale_the_rest_by # normalize to length of rna init_prob_length_adjusted = promoter_init_probs * rna_lengths[TU_index] init_prob_normalized = init_prob_length_adjusted / init_prob_length_adjusted.sum() # Sample a multinomial distribution of synthesis probabilities to determine # what RNA are initialized n_initiations = random_state.multinomial(n_RNAPs_to_activate, init_prob_normalized) # Build array of transcription unit indexes for partially transcribed mRNAs # and domain indexes for RNAPs TU_index_partial_RNAs = np.repeat(TU_index, n_initiations) domain_index_rnap = np.repeat(domain_index_promoters, n_initiations) # Build arrays of starting coordinates and transcription directions starting_coordinates = replication_coordinate[TU_index_partial_RNAs] is_forward = transcription_direction[TU_index_partial_RNAs] # Randomly advance RNAPs along the transcription units # TODO (Eran): make sure there aren't any RNAPs at same location on same TU updated_lengths = np.array( random_state.rand(n_RNAPs_to_activate) * rna_lengths[TU_index_partial_RNAs], dtype=int, ) # Rescale boolean array of directions to an array of 1's and -1's. direction_rescaled = (2 * (is_forward - 0.5)).astype(np.int64) # Compute the updated coordinates of RNAPs. Coordinates of RNAPs moving in # the positive direction are increased, whereas coordinates of RNAPs moving # in the negative direction are decreased. updated_coordinates = starting_coordinates + np.multiply( direction_rescaled, updated_lengths ) # Reset coordinates of RNAPs that cross the boundaries between right and # left replichores updated_coordinates[updated_coordinates > replichore_lengths[0]] -= ( chromosome_length ) updated_coordinates[updated_coordinates < -replichore_lengths[1]] += ( chromosome_length ) # Update mass sequences = rna_sequences[TU_index_partial_RNAs] added_mass = computeMassIncrease(sequences, updated_lengths, nt_weights) added_mass[updated_lengths != 0] += end_weight # add endWeight to all new Rna # Masses of partial mRNAs are counted as mRNA mass as they are already # functional, but the masses of other types of partial RNAs are counted as # generic RNA mass. added_RNA_mass = added_mass.copy() added_mRNA_mass = added_mass.copy() is_mRNA_partial_RNAs = np.isin(TU_index_partial_RNAs, idx_mRNA) added_RNA_mass[is_mRNA_partial_RNAs] = 0 added_mRNA_mass[np.logical_not(is_mRNA_partial_RNAs)] = 0 # Add active RNAPs and get their unique indexes unique_molecules["active_RNAP"] = create_new_unique_molecules( "active_RNAP", n_RNAPs_to_activate, sim_data, unique_id_rng, domain_index=domain_index_rnap, coordinates=updated_coordinates, is_forward=is_forward, ) # Decrement counts of bulk inactive RNAPs rnap_idx = bulk_name_to_idx(sim_data.molecule_ids.full_RNAP, bulk_state["id"]) bulk_state["count"][rnap_idx] = inactive_RNAP_counts - n_RNAPs_to_activate # Add partially transcribed RNAs partial_rnas = create_new_unique_molecules( "RNA", n_RNAPs_to_activate, sim_data, unique_id_rng, TU_index=TU_index_partial_RNAs, transcript_length=updated_lengths, is_mRNA=is_mRNA_partial_RNAs, is_full_transcript=np.zeros(n_RNAPs_to_activate, dtype=bool), can_translate=is_mRNA_partial_RNAs, RNAP_index=unique_molecules["active_RNAP"]["unique_index"], massDiff_nonspecific_RNA=added_RNA_mass, massDiff_mRNA=added_mRNA_mass, ) # Get counts of mRNAs initialized as bulk molecules mRNA_ids = sim_data.process.transcription.rna_data["id"][ sim_data.process.transcription.rna_data["is_mRNA"] ] mRNA_idx = bulk_name_to_idx(mRNA_ids, bulk_state["id"]) mRNA_counts = bulk_state["count"][mRNA_idx] # Subtract number of partially transcribed mRNAs that were initialized. # Note: some mRNAs with high degradation rates have more partial mRNAs than # the expected total number of mRNAs - for these mRNAs we simply set the # initial full mRNA counts to be zero. partial_mRNA_counts = np.bincount( TU_index_partial_RNAs[is_mRNA_partial_RNAs], minlength=n_TUs )[idx_mRNA] full_mRNA_counts = (mRNA_counts - partial_mRNA_counts).clip(min=0) # Get array of TU indexes for each full mRNA TU_index_full_mRNAs = np.repeat(idx_mRNA, full_mRNA_counts) # Add fully transcribed mRNAs. The RNAP_index attribute of these molecules # are set to -1. full_rnas = create_new_unique_molecules( "RNA", len(TU_index_full_mRNAs), sim_data, unique_id_rng, TU_index=TU_index_full_mRNAs, transcript_length=rna_lengths[TU_index_full_mRNAs], is_mRNA=np.ones_like(TU_index_full_mRNAs, dtype=bool), is_full_transcript=np.ones_like(TU_index_full_mRNAs, dtype=bool), can_translate=np.ones_like(TU_index_full_mRNAs, dtype=bool), RNAP_index=np.full(TU_index_full_mRNAs.shape, -1, dtype=np.int64), massDiff_mRNA=rna_masses[TU_index_full_mRNAs], ) unique_molecules["RNA"] = np.concatenate((partial_rnas, full_rnas)) # Have to recreate unique indices or else there will be conflicts between # full and partial RNAs unique_prefix = np.min(unique_molecules["RNA"]["unique_index"]) unique_molecules["RNA"]["unique_index"] = np.arange( unique_prefix, unique_prefix + len(unique_molecules["RNA"]) ) unique_molecules["RNA"] = MetadataArray( unique_molecules["RNA"], unique_prefix + len(unique_molecules["RNA"]), ) # Reset counts of bulk mRNAs to zero bulk_state["count"][mRNA_idx] = 0
[docs] def initialize_chromosomal_segments(unique_molecules, sim_data, unique_id_rng): """ Initialize unique molecule representations of chromosomal segments. All chromosomal segments are assumed to be at their relaxed states upon initialization. """ # Load parameters relaxed_DNA_base_pairs_per_turn = ( sim_data.process.chromosome_structure.relaxed_DNA_base_pairs_per_turn ) terC_index = sim_data.process.chromosome_structure.terC_dummy_molecule_index replichore_lengths = sim_data.process.replication.replichore_lengths min_coordinates = -replichore_lengths[1] max_coordinates = replichore_lengths[0] # Get attributes of replisomes, active RNAPs, chromosome domains, full # chromosomes, and oriCs (replisome_coordinates, replisome_domain_indexes, replisome_unique_indexes) = attrs( unique_molecules["active_replisome"], ["coordinates", "domain_index", "unique_index"], ) ( active_RNAP_coordinates, active_RNAP_domain_indexes, active_RNAP_unique_indexes, ) = attrs( unique_molecules["active_RNAP"], ["coordinates", "domain_index", "unique_index"] ) chromosome_domain_domain_indexes, child_domains = attrs( unique_molecules["chromosome_domain"], ["domain_index", "child_domains"] ) (full_chromosome_domain_indexes,) = attrs( unique_molecules["full_chromosome"], ["domain_index"] ) (origin_domain_indexes,) = attrs(unique_molecules["oriC"], ["domain_index"]) # Initialize chromosomal segment attributes all_boundary_molecule_indexes = np.empty((0, 2), dtype=np.int64) all_boundary_coordinates = np.empty((0, 2), dtype=np.int64) all_segment_domain_indexes = np.array([], dtype=np.int32) all_linking_numbers = np.array([], dtype=np.float64) def get_chromosomal_segment_attributes( coordinates, unique_indexes, spans_oriC, spans_terC ): """ Returns the attributes of all chromosomal segments from a continuous stretch of DNA, given the coordinates and unique indexes of all boundary molecules. """ coordinates_argsort = np.argsort(coordinates) coordinates_sorted = coordinates[coordinates_argsort] unique_indexes_sorted = unique_indexes[coordinates_argsort] # Add dummy molecule at terC if domain spans terC if spans_terC: coordinates_sorted = np.insert( coordinates_sorted, [0, len(coordinates_sorted)], [min_coordinates, max_coordinates], ) unique_indexes_sorted = np.insert( unique_indexes_sorted, [0, len(unique_indexes_sorted)], terC_index ) boundary_molecule_indexes = np.hstack( ( unique_indexes_sorted[:-1][:, np.newaxis], unique_indexes_sorted[1:][:, np.newaxis], ) ) boundary_coordinates = np.hstack( ( coordinates_sorted[:-1][:, np.newaxis], coordinates_sorted[1:][:, np.newaxis], ) ) # Remove segment that spans oriC if the domain does not span oriC if not spans_oriC: oriC_segment_index = np.where( np.sign(boundary_coordinates).sum(axis=1) == 0 )[0] assert len(oriC_segment_index) == 1 boundary_molecule_indexes = np.delete( boundary_molecule_indexes, oriC_segment_index, 0 ) boundary_coordinates = np.delete( boundary_coordinates, oriC_segment_index, 0 ) # Assumes all segments are at their relaxed state at initialization linking_numbers = ( boundary_coordinates[:, 1] - boundary_coordinates[:, 0] ) / relaxed_DNA_base_pairs_per_turn return boundary_molecule_indexes, boundary_coordinates, linking_numbers # Loop through each domain index for domain_index in chromosome_domain_domain_indexes: domain_spans_oriC = domain_index in origin_domain_indexes domain_spans_terC = domain_index in full_chromosome_domain_indexes # Get coordinates and indexes of all RNAPs on this domain RNAP_domain_mask = active_RNAP_domain_indexes == domain_index molecule_coordinates_this_domain = active_RNAP_coordinates[RNAP_domain_mask] molecule_indexes_this_domain = active_RNAP_unique_indexes[RNAP_domain_mask] # Append coordinates and indexes of replisomes on this domain, if any if not domain_spans_oriC: replisome_domain_mask = replisome_domain_indexes == domain_index molecule_coordinates_this_domain = np.concatenate( ( molecule_coordinates_this_domain, replisome_coordinates[replisome_domain_mask], ) ) molecule_indexes_this_domain = np.concatenate( ( molecule_indexes_this_domain, replisome_unique_indexes[replisome_domain_mask], ) ) # Append coordinates and indexes of parent domain replisomes, if any if not domain_spans_terC: parent_domain_index = chromosome_domain_domain_indexes[ np.where(child_domains == domain_index)[0][0] ] replisome_parent_domain_mask = ( replisome_domain_indexes == parent_domain_index ) molecule_coordinates_this_domain = np.concatenate( ( molecule_coordinates_this_domain, replisome_coordinates[replisome_parent_domain_mask], ) ) molecule_indexes_this_domain = np.concatenate( ( molecule_indexes_this_domain, replisome_unique_indexes[replisome_parent_domain_mask], ) ) # Get attributes of chromosomal segments on this domain ( boundary_molecule_indexes_this_domain, boundary_coordinates_this_domain, linking_numbers_this_domain, ) = get_chromosomal_segment_attributes( molecule_coordinates_this_domain, molecule_indexes_this_domain, domain_spans_oriC, domain_spans_terC, ) # Append to existing array of attributes all_boundary_molecule_indexes = np.vstack( [all_boundary_molecule_indexes, boundary_molecule_indexes_this_domain] ) all_boundary_coordinates = np.vstack( (all_boundary_coordinates, boundary_coordinates_this_domain) ) all_segment_domain_indexes = np.concatenate( ( all_segment_domain_indexes, np.full(len(linking_numbers_this_domain), domain_index, dtype=np.int32), ) ) all_linking_numbers = np.concatenate( (all_linking_numbers, linking_numbers_this_domain) ) # Confirm total counts of all segments n_segments = len(all_linking_numbers) assert ( n_segments == len(active_RNAP_unique_indexes) + 1.5 * len(replisome_unique_indexes) + 1 ) # Add chromosomal segments unique_molecules["chromosomal_segment"] = create_new_unique_molecules( "chromosomal_segment", n_segments, sim_data, unique_id_rng, boundary_molecule_indexes=all_boundary_molecule_indexes, boundary_coordinates=all_boundary_coordinates, domain_index=all_segment_domain_indexes, linking_number=all_linking_numbers, )
[docs] def initialize_translation( bulk_state, unique_molecules, sim_data, random_state, unique_id_rng ): """ Activate ribosomes as unique molecules, and distribute them along lengths of mRNAs, while decreasing counts of unactivated ribosomal subunits (30S and 50S). Ribosomes are placed randomly across the lengths of each mRNA. """ # Load translation parameters current_nutrients = sim_data.conditions[sim_data.condition]["nutrients"] frac_active_ribosome = sim_data.process.translation.ribosomeFractionActiveDict[ current_nutrients ] protein_sequences = sim_data.process.translation.translation_sequences protein_lengths = sim_data.process.translation.monomer_data["length"].asNumber() translation_efficiencies = normalize( sim_data.process.translation.translation_efficiencies_by_monomer ) aa_weights_incorporated = sim_data.process.translation.translation_monomer_weights end_weight = sim_data.process.translation.translation_end_weight cistron_lengths = sim_data.process.transcription.cistron_data["length"].asNumber( units.nt ) TU_ids = sim_data.process.transcription.rna_data["id"] monomer_index_to_tu_indexes = sim_data.relation.monomer_index_to_tu_indexes monomer_index_to_cistron_index = { i: sim_data.process.transcription._cistron_id_to_index[monomer["cistron_id"]] for (i, monomer) in enumerate(sim_data.process.translation.monomer_data) } # Get attributes of RNAs ( TU_index_all_RNAs, length_all_RNAs, is_mRNA, is_full_transcript_all_RNAs, unique_index_all_RNAs, ) = attrs( unique_molecules["RNA"], [ "TU_index", "transcript_length", "is_mRNA", "is_full_transcript", "unique_index", ], ) TU_index_mRNAs = TU_index_all_RNAs[is_mRNA] length_mRNAs = length_all_RNAs[is_mRNA] is_full_transcript_mRNAs = is_full_transcript_all_RNAs[is_mRNA] unique_index_mRNAs = unique_index_all_RNAs[is_mRNA] # Calculate available template lengths of each mRNA cistron from fully # transcribed mRNA transcription units TU_index_full_mRNAs = TU_index_mRNAs[is_full_transcript_mRNAs] TU_counts_full_mRNAs = np.bincount(TU_index_full_mRNAs, minlength=len(TU_ids)) cistron_counts_full_mRNAs = ( sim_data.process.transcription.cistron_tu_mapping_matrix.dot( TU_counts_full_mRNAs ) ) available_cistron_lengths = np.multiply(cistron_counts_full_mRNAs, cistron_lengths) # Add available template lengths from each partially transcribed mRNAs TU_index_incomplete_mRNAs = TU_index_mRNAs[np.logical_not(is_full_transcript_mRNAs)] length_incomplete_mRNAs = length_mRNAs[np.logical_not(is_full_transcript_mRNAs)] TU_index_to_mRNA_lengths = {} for TU_index, length in zip(TU_index_incomplete_mRNAs, length_incomplete_mRNAs): TU_index_to_mRNA_lengths.setdefault(TU_index, []).append(length) for TU_index, available_lengths in TU_index_to_mRNA_lengths.items(): cistron_indexes = sim_data.process.transcription.rna_id_to_cistron_indexes( TU_ids[TU_index] ) cistron_start_positions = np.array( [ sim_data.process.transcription.cistron_start_end_pos_in_tu[ (cistron_index, TU_index) ][0] for cistron_index in cistron_indexes ] ) for length in available_lengths: available_cistron_lengths[cistron_indexes] += np.clip( length - cistron_start_positions, 0, cistron_lengths[cistron_indexes] ) # Find number of ribosomes to activate ribosome30S_idx = bulk_name_to_idx( sim_data.molecule_ids.s30_full_complex, bulk_state["id"] ) ribosome30S = bulk_state["count"][ribosome30S_idx] ribosome50S_idx = bulk_name_to_idx( sim_data.molecule_ids.s50_full_complex, bulk_state["id"] ) ribosome50S = bulk_state["count"][ribosome50S_idx] inactive_ribosome_count = np.minimum(ribosome30S, ribosome50S) n_ribosomes_to_activate = np.int64(frac_active_ribosome * inactive_ribosome_count) # Add total available template lengths as weights and normalize protein_init_probs = normalize( available_cistron_lengths[sim_data.relation.cistron_to_monomer_mapping] * translation_efficiencies ) # Sample a multinomial distribution of synthesis probabilities to determine # which types of mRNAs are initialized n_new_proteins = random_state.multinomial( n_ribosomes_to_activate, protein_init_probs ) # Build attributes for active ribosomes protein_indexes = np.empty(n_ribosomes_to_activate, np.int64) cistron_start_positions_on_mRNA = np.empty(n_ribosomes_to_activate, np.int64) positions_on_mRNA_from_cistron_start_site = np.empty( n_ribosomes_to_activate, np.int64 ) mRNA_indexes = np.empty(n_ribosomes_to_activate, np.int64) start_index = 0 nonzero_count = n_new_proteins > 0 for protein_index, protein_counts in zip( np.arange(n_new_proteins.size)[nonzero_count], n_new_proteins[nonzero_count] ): # Set protein index protein_indexes[start_index : start_index + protein_counts] = protein_index # Get index of cistron corresponding to this protein cistron_index = monomer_index_to_cistron_index[protein_index] # Initialize list of available lengths for each transcript and the # indexes of each transcript in the list of mRNA attributes available_lengths = [] attribute_indexes = [] cistron_start_positions = [] # Distribute ribosomes among mRNAs that produce this protein, weighted # by their lengths for TU_index in monomer_index_to_tu_indexes[protein_index]: attribute_indexes_this_TU = np.where(TU_index_mRNAs == TU_index)[0] cistron_start_position = ( sim_data.process.transcription.cistron_start_end_pos_in_tu[ (cistron_index, TU_index) ][0] ) available_lengths.extend( np.clip( length_mRNAs[attribute_indexes_this_TU] - cistron_start_position, 0, cistron_lengths[cistron_index], ) ) attribute_indexes.extend(attribute_indexes_this_TU) cistron_start_positions.extend( [cistron_start_position] * len(attribute_indexes_this_TU) ) available_lengths = np.array(available_lengths) attribute_indexes = np.array(attribute_indexes) cistron_start_positions = np.array(cistron_start_positions) n_ribosomes_per_RNA = random_state.multinomial( protein_counts, normalize(available_lengths) ) # Get unique indexes of each mRNA mRNA_indexes[start_index : start_index + protein_counts] = np.repeat( unique_index_mRNAs[attribute_indexes], n_ribosomes_per_RNA ) # Get full length of this polypeptide peptide_full_length = protein_lengths[protein_index] # Randomly place ribosomes along the length of each mRNA, capped by the # mRNA length expected from the full polypeptide length to prevent # ribosomes from overshooting full peptide lengths cistron_start_positions_on_mRNA[start_index : start_index + protein_counts] = ( np.repeat(cistron_start_positions, n_ribosomes_per_RNA) ) positions_on_mRNA_from_cistron_start_site[ start_index : start_index + protein_counts ] = np.floor( random_state.rand(protein_counts) * np.repeat( np.minimum(available_lengths, peptide_full_length * 3), n_ribosomes_per_RNA, ) ) start_index += protein_counts # Calculate the lengths of the partial polypeptide, and rescale position on # mRNA to be a multiple of three using this peptide length peptide_lengths = np.floor_divide(positions_on_mRNA_from_cistron_start_site, 3) positions_on_mRNA = cistron_start_positions_on_mRNA + 3 * peptide_lengths # Update masses of partially translated proteins sequences = protein_sequences[protein_indexes] mass_increase_protein = computeMassIncrease( sequences, peptide_lengths, aa_weights_incorporated ) # Add end weight mass_increase_protein[peptide_lengths != 0] += end_weight # Add active ribosomes unique_molecules["active_ribosome"] = create_new_unique_molecules( "active_ribosome", n_ribosomes_to_activate, sim_data, unique_id_rng, protein_index=protein_indexes, peptide_length=peptide_lengths, mRNA_index=mRNA_indexes, pos_on_mRNA=positions_on_mRNA, massDiff_protein=mass_increase_protein, ) # Decrease counts of free 30S and 50S ribosomal subunits bulk_state["count"][ribosome30S_idx] = ribosome30S - n_ribosomes_to_activate bulk_state["count"][ribosome50S_idx] = ribosome50S - n_ribosomes_to_activate
[docs] def determine_chromosome_state( tau: Unum, replichore_length: Unum, n_max_replisomes: int, place_holder: int, cell_mass: Unum, critical_mass: Unum, replication_rate: float, ) -> tuple[ dict[str, npt.NDArray[np.int32]], dict[str, npt.NDArray[Any]], dict[str, npt.NDArray[np.int32]], ]: """ Calculates the attributes of oriC's, replisomes, and chromosome domains on the chromosomes at the beginning of the cell cycle. Args: tau: the doubling time of the cell (with Unum time unit) replichore_length: the amount of DNA to be replicated per fork, usually half of the genome, in base-pairs (with Unum nucleotide unit) n_max_replisomes: the maximum number of replisomes that can be formed given the initial counts of replisome subunits place_holder: placeholder value for chromosome domains without child domains cell_mass: total mass of the cell with mass units (with Unum mass unit) critical_mass: mass per oriC before replication is initiated (with Unum mass unit) replication_rate: rate of nucleotide elongation (with Unum nucleotides per time unit) Returns: Three dictionaries, each containing updates to attributes of a unique molecule type. - ``oric_state``: dictionary of the following format:: {'domain_index': a vector of integers indicating which chromosome domain the oriC sequence belongs to.} - ``replisome_state``: dictionary of the following format:: {'coordinates': a vector of integers that indicates where the replisomes are located on the chromosome relative to the origin in base pairs, 'right_replichore': a vector of boolean values that indicates whether the replisome is on the right replichore (True) or the left replichore (False), 'domain_index': a vector of integers indicating which chromosome domain the replisomes belong to. The index of the "mother" domain of the replication fork is assigned to the replisome} - ``domain_state``: dictionary of the following format:: {'domain_index': the indexes of the domains, 'child_domains': the (n_domain X 2) array of the domain indexes of the two children domains that are connected on the oriC side with the given domain.} """ # All inputs must be positive numbers unitless_tau = tau.asNumber(units.s) unitless_replichore_length = replichore_length.asNumber(units.nt) assert unitless_tau >= 0, "tau value can't be negative." assert unitless_replichore_length > 0, "replichore_length must be positive." # Convert to unitless unitless_cell_mass = cell_mass.asNumber(units.fg) unitless_critical_mass = critical_mass.asNumber(units.fg) # Calculate the maximum number of replication rounds given the maximum # count of replisomes n_max_rounds = int(np.log2(n_max_replisomes / 2 + 1)) # Calculate the number of active replication rounds n_rounds = min( n_max_rounds, max(0, int(np.ceil(np.log2(unitless_cell_mass / unitless_critical_mass)))), ) # Initialize arrays for replisomes n_replisomes = 2 * (2**n_rounds - 1) coordinates = np.zeros(n_replisomes, dtype=np.int64) right_replichore_replisome = np.zeros(n_replisomes, dtype=bool) domain_index_replisome = np.zeros(n_replisomes, dtype=np.int32) # Initialize child domain array for chromosome domains n_domains = 2 ** (n_rounds + 1) - 1 child_domains = np.full((n_domains, 2), place_holder, dtype=np.int32) # Set domain_index attribute of oriC's and chromosome domains domain_index_oric = np.arange( 2**n_rounds - 1, 2 ** (n_rounds + 1) - 1, dtype=np.int32 ) domain_index_domains = np.arange(0, n_domains, dtype=np.int32) def n_events_before_this_round(round_idx): """ Calculates the number of replication events that happen before the replication round index given as an argument. Since 2**i events happen at each round i = 0, 1, ..., the sum of the number of events before round j is 2**j - 1. """ return 2**round_idx - 1 # Loop through active replication rounds, starting from the oldest round. # If n_round = 0 skip loop entirely - no active replication round. for round_idx in np.arange(n_rounds): # Determine at which location (base) of the chromosome the replication # forks should be initialized to round_critical_mass = 2**round_idx * unitless_critical_mass growth_rate = np.log(2) / unitless_tau replication_time = ( np.log(unitless_cell_mass / round_critical_mass) / growth_rate ) # TODO: this should handle completed replication (instead of taking min) # for accuracy but will likely never start with multiple chromosomes fork_location = min( np.floor(replication_time * replication_rate), unitless_replichore_length - 1, ) # Add 2^n initiation events per round. A single initiation event # generates two replication forks. n_events_this_round = 2**round_idx # Set attributes of replisomes for this replication round coordinates[ 2 * n_events_before_this_round(round_idx) : 2 * n_events_before_this_round(round_idx + 1) ] = np.tile(np.array([fork_location, -fork_location]), n_events_this_round) right_replichore_replisome[ 2 * n_events_before_this_round(round_idx) : 2 * n_events_before_this_round(round_idx + 1) ] = np.tile(np.array([True, False]), n_events_this_round) for i, domain_index in enumerate( np.arange( n_events_before_this_round(round_idx), n_events_before_this_round(round_idx + 1), ) ): domain_index_replisome[ 2 * n_events_before_this_round(round_idx) + 2 * i : 2 * n_events_before_this_round(round_idx) + 2 * (i + 1) ] = np.repeat(domain_index, 2) # Set attributes of chromosome domains for this replication round for i, domain_index in enumerate( np.arange( n_events_before_this_round(round_idx + 1), n_events_before_this_round(round_idx + 2), 2, ) ): child_domains[n_events_before_this_round(round_idx) + i, :] = np.array( [domain_index, domain_index + 1] ) # Convert to numpy arrays and wrap into dictionaries oric_state = {"domain_index": domain_index_oric} replisome_state = { "coordinates": coordinates, "right_replichore": right_replichore_replisome, "domain_index": domain_index_replisome, } domain_state = { "child_domains": child_domains, "domain_index": domain_index_domains, } return oric_state, replisome_state, domain_state
[docs] def rescale_initiation_probs(init_probs, TU_index, fixed_synth_probs, fixed_TU_indexes): """ Rescales the initiation probabilities of each promoter such that the total synthesis probabilities of certain types of RNAs are fixed to a predetermined value. For instance, if there are two copies of promoters for RNA A, whose synthesis probability should be fixed to 0.1, each promoter is given an initiation probability of 0.05. """ for rna_idx, synth_prob in zip(fixed_TU_indexes, fixed_synth_probs): fixed_rna_mask = TU_index == rna_idx init_probs[fixed_rna_mask] = synth_prob / fixed_rna_mask.sum()
[docs] def calculate_cell_mass(bulk_state, unique_molecules, sim_data): """ Calculates cell mass in femtograms. """ bulk_submass_names = [ f"{submass}_submass" for submass in sim_data.submass_name_to_index.keys() ] cell_mass = ( bulk_state["count"] .dot(rfn.structured_to_unstructured(bulk_state[bulk_submass_names])) .sum() ) if len(unique_molecules) > 0: unique_masses = sim_data.internal_state.unique_molecule.unique_molecule_masses[ "mass" ].asNumber(units.fg / units.mol) / sim_data.constants.n_avogadro.asNumber( 1 / units.mol ) unique_ids = sim_data.internal_state.unique_molecule.unique_molecule_masses[ "id" ] unique_submass_names = [ f"massDiff_{submass}" for submass in sim_data.submass_name_to_index.keys() ] for unique_id, unique_submasses in zip(unique_ids, unique_masses): if unique_id in unique_molecules: cell_mass += ( unique_molecules[unique_id]["_entryState"].sum() * unique_submasses ).sum() cell_mass += rfn.structured_to_unstructured( unique_molecules[unique_id][unique_submass_names] ).sum() return units.fg * cell_mass
[docs] def initialize_trna_charging( bulk_state: np.ndarray, unique_molecules: dict[str, np.ndarray], sim_data: Any, variable_elongation: bool, ): """ Initializes charged tRNA from uncharged tRNA and amino acids Args: bulk_state: Structured array with IDs and counts of all bulk molecules unique_molecules: Mapping of unique molecule names to structured arrays of their current simulation states sim_data: Simulation data loaded from pickle generated by ParCa variable_elongation: Sets max elongation higher if True .. note:: Does not adjust for mass of amino acids on charged tRNA (~0.01% of cell mass) """ # Calculate cell volume for concentrations cell_volume = ( calculate_cell_mass(bulk_state, unique_molecules, sim_data) / sim_data.constants.cell_density ) counts_to_molar = 1 / (sim_data.constants.n_avogadro * cell_volume) # Get molecule views and concentrations transcription = sim_data.process.transcription aa_from_synthetase = transcription.aa_from_synthetase aa_from_trna = transcription.aa_from_trna synthetases = counts( bulk_state, bulk_name_to_idx(transcription.synthetase_names, bulk_state["id"]) ) uncharged_trna_idx = bulk_name_to_idx( transcription.uncharged_trna_names, bulk_state["id"] ) uncharged_trna = counts(bulk_state, uncharged_trna_idx) charged_trna_idx = bulk_name_to_idx( transcription.charged_trna_names, bulk_state["id"] ) charged_trna = counts(bulk_state, charged_trna_idx) aas = counts( bulk_state, bulk_name_to_idx(sim_data.molecule_groups.amino_acids, bulk_state["id"]), ) ribosome_counts = unique_molecules["active_ribosome"]["_entryState"].sum() synthetase_conc = counts_to_molar * np.dot(aa_from_synthetase, synthetases) uncharged_trna_conc = counts_to_molar * np.dot(aa_from_trna, uncharged_trna) charged_trna_conc = counts_to_molar * np.dot(aa_from_trna, charged_trna) aa_conc = counts_to_molar * aas ribosome_conc = counts_to_molar * ribosome_counts # Estimate fraction of amino acids from sequences, excluding first index for padding of -1 _, aas_in_sequences = np.unique( sim_data.process.translation.translation_sequences, return_counts=True ) f = aas_in_sequences[1:] / np.sum(aas_in_sequences[1:]) # Estimate initial charging state constants = sim_data.constants transcription = sim_data.process.transcription metabolism = sim_data.process.metabolism elongation_max = ( constants.ribosome_elongation_rate_max if variable_elongation else constants.ribosome_elongation_rate_basal ) charging_params = { "kS": constants.synthetase_charging_rate.asNumber(1 / units.s), "KMaa": transcription.aa_kms.asNumber(MICROMOLAR_UNITS), "KMtf": transcription.trna_kms.asNumber(MICROMOLAR_UNITS), "krta": constants.Kdissociation_charged_trna_ribosome.asNumber( MICROMOLAR_UNITS ), "krtf": constants.Kdissociation_uncharged_trna_ribosome.asNumber( MICROMOLAR_UNITS ), "max_elong_rate": float(elongation_max.asNumber(units.aa / units.s)), "charging_mask": np.array( [ aa not in REMOVED_FROM_CHARGING for aa in sim_data.molecule_groups.amino_acids ] ), "unit_conversion": metabolism.get_amino_acid_conc_conversion(MICROMOLAR_UNITS), } fraction_charged, *_ = calculate_trna_charging( synthetase_conc, uncharged_trna_conc, charged_trna_conc, aa_conc, ribosome_conc, f, charging_params, ) # Update counts of tRNA to match charging total_trna_counts = uncharged_trna + charged_trna charged_trna_counts = np.round( total_trna_counts * np.dot(fraction_charged, aa_from_trna) ) uncharged_trna_counts = total_trna_counts - charged_trna_counts bulk_state["count"][charged_trna_idx] = charged_trna_counts bulk_state["count"][uncharged_trna_idx] = uncharged_trna_counts