Source code for reconstruction.ecoli.simulation_data

"""
SimulationData for Ecoli

Raw data processed into forms convenient for whole-cell modeling

"""

from __future__ import annotations

import collections

import numpy as np

# Data classes
from reconstruction.ecoli.dataclasses.getter_functions import (
    GetterFunctions,
    EXCLUDED_RNA_TYPES,
)
from reconstruction.ecoli.dataclasses.molecule_groups import MoleculeGroups
from reconstruction.ecoli.dataclasses.molecule_ids import MoleculeIds
from reconstruction.ecoli.dataclasses.constants import Constants
from reconstruction.ecoli.dataclasses.common_names import CommonNames
from reconstruction.ecoli.dataclasses.state.internal_state import InternalState
from reconstruction.ecoli.dataclasses.state.external_state import ExternalState
from reconstruction.ecoli.dataclasses.process.process import Process
from reconstruction.ecoli.dataclasses.growth_rate_dependent_parameters import (
    Mass,
    GrowthRateParameters,
)
from reconstruction.ecoli.dataclasses.relation import Relation
from reconstruction.ecoli.dataclasses.adjustments import Adjustments
from wholecell.utils.fitting import normalize


VERBOSE = False


[docs] class SimulationDataEcoli(object): """SimulationDataEcoli""" def __init__(self): # Doubling time (used in fitting) self.doubling_time = None
[docs] def initialize(self, raw_data, basal_expression_condition="M9 Glucose minus AAs"): self.operons_on = raw_data.operons_on self.stable_rrna = raw_data.stable_rrna self._add_condition_data(raw_data) self.condition = "basal" self.doubling_time = self.condition_to_doubling_time[self.condition] # TODO: Check that media condition is valid self.basal_expression_condition = basal_expression_condition self._add_molecular_weight_keys(raw_data) self._add_compartment_keys(raw_data) self._add_base_codes(raw_data) # General helper functions (have no dependencies) self.common_names = CommonNames(raw_data) self.constants = Constants(raw_data) self.adjustments = Adjustments(raw_data) # Reference helper function for molecule IDs (can depend on preceding # helper functions) self.molecule_ids = MoleculeIds(raw_data, self) # Reference helper function for molecule groups (can depend on preceding # helper functions) self.molecule_groups = MoleculeGroups(raw_data, self) # Getter functions (can depend on helper functions and reference classes) self.getter = GetterFunctions(raw_data, self) # Growth rate dependent parameters are set first self.growth_rate_parameters = GrowthRateParameters(raw_data, self) self.mass = Mass(raw_data, self) # Data classes (can depend on helper and getter functions) # Data classes cannot depend on each other self.external_state = ExternalState(raw_data, self) self.process = Process(raw_data, self) self.internal_state = InternalState(raw_data, self) # Relations between data classes (can depend on data classes) # Relations cannot depend on each other self.relation = Relation(raw_data, self) self.translation_supply_rate = {} self.pPromoterBound = {}
[docs] def _add_molecular_weight_keys(self, raw_data): self.submass_name_to_index = { mw_key["submass_name"]: mw_key["index"] for mw_key in raw_data.molecular_weight_keys }
[docs] def _add_compartment_keys(self, raw_data): self.compartment_abbrev_to_index = { compartment["abbrev"]: i for i, compartment in enumerate(raw_data.compartments) } self.compartment_id_to_index = { compartment["id"]: i for i, compartment in enumerate(raw_data.compartments) } self.compartment_abbrev_to_id = {} for compartment in raw_data.compartments: self.compartment_abbrev_to_id[compartment["abbrev"]] = compartment["id"]
[docs] def _add_base_codes(self, raw_data): self.amino_acid_code_to_id_ordered = collections.OrderedDict( tuple((row["code"], row["id"]) for row in raw_data.base_codes.amino_acids) ) self.ntp_code_to_id_ordered = collections.OrderedDict( tuple((row["code"], row["id"]) for row in raw_data.base_codes.ntp) ) self.nmp_code_to_id_ordered = collections.OrderedDict( tuple((row["code"], row["id"]) for row in raw_data.base_codes.nmp) ) self.dntp_code_to_id_ordered = collections.OrderedDict( tuple((row["code"], row["id"]) for row in raw_data.base_codes.dntp) )
[docs] def _add_condition_data(self, raw_data): abbrToActiveId = { x["TF"]: x["activeId"].split(", ") for x in raw_data.transcription_factors if len(x["activeId"]) > 0 } gene_id_to_rna_id = {gene["id"]: gene["rna_ids"][0] for gene in raw_data.genes} gene_symbol_to_rna_id = { gene["symbol"]: gene["rna_ids"][0] for gene in raw_data.genes } gene_symbol_to_rna_id.update( { x["name"]: gene_id_to_rna_id[x["geneId"]] for x in raw_data.translation_efficiency if x["geneId"] != "#N/A" } ) rna_ids_with_coordinates = { gene["rna_ids"][0] for gene in raw_data.genes if gene["left_end_pos"] is not None and gene["right_end_pos"] is not None } rna_id_to_rna_type = {rna["id"]: rna["type"] for rna in raw_data.rnas} self.tf_to_fold_change = {} self.tf_to_direction = {} for fc_file in ["fold_changes", "fold_changes_nca"]: gene_not_found = set() tf_not_found = set() gene_location_not_specified = set() gene_excluded = set() for row in getattr(raw_data, fc_file): FC = row["log2 FC mean"] # Skip fold changes that do not agree with curation if row["Regulation_direct"] != "" and row["Regulation_direct"] > 2: continue # Skip positive autoregulation if row["TF"] == row["Target"] and FC > 0: continue try: tf = abbrToActiveId[row["TF"]][0] except KeyError: tf_not_found.add(row["TF"]) continue try: target = gene_symbol_to_rna_id[row["Target"]] except KeyError: gene_not_found.add(row["Target"]) continue if target not in rna_ids_with_coordinates: gene_location_not_specified.add(row["Target"]) continue if rna_id_to_rna_type[target] in EXCLUDED_RNA_TYPES: gene_excluded.add(row["Target"]) continue if tf not in self.tf_to_fold_change: self.tf_to_fold_change[tf] = {} self.tf_to_direction[tf] = {} self.tf_to_direction[tf][target] = np.sign(FC) self.tf_to_fold_change[tf][target] = 2**FC if VERBOSE: if gene_not_found: print( f"The following target genes listed in {fc_file}.tsv" " have no corresponding entry in genes.tsv:" ) for item in gene_not_found: print(item) if tf_not_found: print( "The following transcription factors listed in" f" {fc_file}.tsv have no corresponding active entry in" " transcription_factors.tsv:" ) for tf in tf_not_found: print(tf) if gene_location_not_specified: print( f"The following target genes listed in {fc_file}.tsv" " have no chromosomal location specified in" " genes.tsv:" ) for item in gene_location_not_specified: print(item) if gene_excluded: print( f"The following target genes listed in {fc_file}.tsv" " have been excluded from the model:" ) for item in gene_excluded: print(item) self.tf_to_active_inactive_conditions = {} for row in raw_data.condition.tf_condition: tf = row["active TF"] if tf not in self.tf_to_fold_change: continue activeGenotype = row["active genotype perturbations"] activeNutrients = row["active nutrients"] inactiveGenotype = row["inactive genotype perturbations"] inactiveNutrients = row["inactive nutrients"] if tf not in self.tf_to_active_inactive_conditions: self.tf_to_active_inactive_conditions[tf] = {} else: print("Warning: overwriting TF fold change conditions for %s" % tf) self.tf_to_active_inactive_conditions[tf][ "active genotype perturbations" ] = activeGenotype self.tf_to_active_inactive_conditions[tf]["active nutrients"] = ( activeNutrients ) self.tf_to_active_inactive_conditions[tf][ "inactive genotype perturbations" ] = inactiveGenotype self.tf_to_active_inactive_conditions[tf]["inactive nutrients"] = ( inactiveNutrients ) # Populate combined conditions data from condition_defs self.conditions = {} self.condition_to_doubling_time = {} self.condition_active_tfs = {} self.condition_inactive_tfs = {} self.ordered_conditions = [] # order for variant to run for row in raw_data.condition.condition_defs: condition = row["condition"] self.ordered_conditions.append(condition) self.conditions[condition] = {} self.conditions[condition]["nutrients"] = row["nutrients"] self.conditions[condition]["perturbations"] = row["genotype perturbations"] self.condition_to_doubling_time[condition] = row["doubling time"] self.condition_active_tfs[condition] = row["active TFs"] self.condition_inactive_tfs[condition] = row["inactive TFs"] # Populate nutrientToDoubling for each set of combined conditions self.nutrient_to_doubling_time = {} for condition in self.condition_to_doubling_time: if len(self.conditions[condition]["perturbations"]) > 0: continue nutrientLabel = self.conditions[condition]["nutrients"] if ( nutrientLabel in self.nutrient_to_doubling_time and self.condition_to_doubling_time[condition] != self.nutrient_to_doubling_time[nutrientLabel] ): raise Exception( "Multiple doubling times correspond to the same media conditions" ) self.nutrient_to_doubling_time[nutrientLabel] = ( self.condition_to_doubling_time[condition] ) # Populate conditions and conditionToDboulingTime for active and inactive TF conditions basal_dt = self.condition_to_doubling_time["basal"] for tf in sorted(self.tf_to_active_inactive_conditions): for status in ["active", "inactive"]: condition = "{}__{}".format(tf, status) nutrients = self.tf_to_active_inactive_conditions[tf][ "{} nutrients".format(status) ] self.conditions[condition] = {} self.conditions[condition]["nutrients"] = nutrients self.conditions[condition]["perturbations"] = ( self.tf_to_active_inactive_conditions[ tf ]["{} genotype perturbations".format(status)] ) self.condition_to_doubling_time[condition] = ( self.nutrient_to_doubling_time.get(nutrients, basal_dt) )
[docs] def calculate_ppgpp_expression(self, condition: str): """ Calculates the expected expression of RNA based on ppGpp regulation in a given condition and the expected transcription factor effects in that condition. Relies on other values that are calculated in the fitting process so should only be called after the parca has been run. Args: condition: label for the desired condition to calculate the average expression for (eg. 'basal', 'with_aa', etc) """ ppgpp = self.growth_rate_parameters.get_ppGpp_conc( self.condition_to_doubling_time[condition] ) delta_prob = self.process.transcription_regulation.get_delta_prob_matrix( ppgpp=True ) p_promoter_bound = np.array( [ self.pPromoterBound[condition][tf] for tf in self.process.transcription_regulation.tf_ids ] ) delta = delta_prob @ p_promoter_bound prob, factor = self.process.transcription.synth_prob_from_ppgpp( ppgpp, self.process.replication.get_average_copy_number ) rna_expression = prob * (1 + delta) / factor # For cases with no basal ppGpp expression, assume the delta prob is the # same as without ppGpp control mask = prob == 0 rna_expression[mask] = delta[mask] / factor[mask] rna_expression[rna_expression < 0] = 0 return normalize(rna_expression)
[docs] def adjust_final_expression(self, gene_indices, factors): transcription = self.process.transcription transcription_regulation = self.process.transcription_regulation for gene_index, factor in zip(gene_indices, factors): recruitment_mask = np.array( [i == gene_index for i in transcription_regulation.delta_prob["deltaI"]] ) for synth_prob in transcription.rna_synth_prob.values(): synth_prob[gene_index] *= factor for exp in transcription.rna_expression.values(): exp[gene_index] *= factor transcription.exp_free[gene_index] *= factor transcription.exp_ppgpp[gene_index] *= factor transcription.attenuation_basal_prob_adjustments[ transcription.attenuated_rna_indices == gene_index ] *= factor transcription_regulation.basal_prob[gene_index] *= factor transcription_regulation.delta_prob["deltaV"][recruitment_mask] *= factor # Renormalize parameters for synth_prob in transcription.rna_synth_prob.values(): synth_prob /= synth_prob.sum() for exp in transcription.rna_expression.values(): exp /= exp.sum() transcription.exp_free /= transcription.exp_free.sum() transcription.exp_ppgpp /= transcription.exp_ppgpp.sum()
[docs] def adjust_new_gene_final_expression(self, gene_indices, factors): """ Adjusting the final expression values of new genes must be handled separately because the baseline new gene expression values need to be set to small non-zero values using data loaded from flat/new_gene_data/new_gene_baseline_expression_parameters.tsv, as new genes are knocked out by default. Args: gene_indices: Indices of new genes to adjust factors: Multiplicative factor to adjust by """ transcription = self.process.transcription transcription_regulation = self.process.transcription_regulation new_gene_rna_synth_prob_baseline = transcription.new_gene_expression_baselines[ "new_gene_rna_synth_prob_baseline" ] new_gene_rna_expression_baseline = transcription.new_gene_expression_baselines[ "new_gene_rna_expression_baseline" ] new_gene_exp_free_baseline = transcription.new_gene_expression_baselines[ "new_gene_exp_free_baseline" ] new_gene_exp_ppgpp_baseline = transcription.new_gene_expression_baselines[ "new_gene_exp_ppgpp_baseline" ] new_gene_reg_basal_prob_baseline = transcription.new_gene_expression_baselines[ "new_gene_reg_basal_prob_baseline" ] for gene_index, factor in zip(gene_indices, factors): recruitment_mask = np.array( [i == gene_index for i in transcription_regulation.delta_prob["deltaI"]] ) for synth_prob in transcription.rna_synth_prob.values(): synth_prob[gene_index] = new_gene_rna_synth_prob_baseline * factor for exp in transcription.rna_expression.values(): exp[gene_index] = new_gene_rna_expression_baseline * factor transcription.exp_free[gene_index] = new_gene_exp_free_baseline * factor transcription.exp_ppgpp[gene_index] = new_gene_exp_ppgpp_baseline * factor transcription_regulation.basal_prob[gene_index] = ( new_gene_reg_basal_prob_baseline * factor ) # For the forseeable future, these will not be needed in the new # gene implementation. For now, encode the assumption that these # will be empty numpy arrays. assert ( ( transcription.attenuation_basal_prob_adjustments[ transcription.attenuated_rna_indices == gene_index ] ).size == 0 ), ( "Attenuation basal probability adjustment for new genes is" " not currently implemented in the model." ) assert ( (transcription_regulation.delta_prob["deltaV"][recruitment_mask]).size == 0 ), ( "Transcriptional regulation of new genes is not currently" " implemented in the model." ) # Renormalize parameters for synth_prob in transcription.rna_synth_prob.values(): synth_prob /= synth_prob.sum() for exp in transcription.rna_expression.values(): exp /= exp.sum() transcription.exp_free /= transcription.exp_free.sum() transcription.exp_ppgpp /= transcription.exp_ppgpp.sum()