"""
The parca, aka parameter calculator.
TODO: establish a controlled language for function behaviors (i.e. create* set* fit*)
TODO: functionalize so that values are not both set and returned from some methods
"""
import binascii
import functools
import itertools
import os
import pickle
import time
import traceback
from typing import Callable
from stochastic_arrow import StochasticSystem
from cvxpy import Variable, Problem, Minimize, norm
import numpy as np
import scipy.optimize
import scipy.sparse
from ecoli.library.initial_conditions import create_bulk_container
from ecoli.library.schema import bulk_name_to_idx, counts
from reconstruction.ecoli.simulation_data import SimulationDataEcoli
from wholecell.utils import filepath, parallelization, units
from wholecell.utils.fitting import normalize, masses_and_counts_for_homeostatic_target
# Fitting parameters
# NOTE: This threshold is arbitrary and was relaxed from 1e-9
# to 1e-8 to fix failure to converge after scipy/scipy#20168
FITNESS_THRESHOLD = 1e-8
MAX_FITTING_ITERATIONS = 150
N_SEEDS = 10
# Parameters used in fitPromoterBoundProbability()
PROMOTER_PDIFF_THRESHOLD = 0.06 # Minimum difference between binding probabilities of a TF in conditions where TF is active and inactive
PROMOTER_REG_COEFF = 1e-3 # Optimization weight on how much probability should stay close to original values
PROMOTER_SCALING = 10 # Multiplied to all matrices for numerical stability
PROMOTER_NORM_TYPE = 1 # Matrix 1-norm
PROMOTER_MAX_ITERATIONS = 100
PROMOTER_CONVERGENCE_THRESHOLD = 1e-9
ECOS_0_TOLERANCE = 1e-10 # Tolerance to adjust solver output to 0
BASAL_EXPRESSION_CONDITION = "M9 Glucose minus AAs"
VERBOSE = 1
COUNTS_UNITS = units.dmol
VOLUME_UNITS = units.L
MASS_UNITS = units.g
TIME_UNITS = units.s
functions_run = []
[docs]
def fitSimData_1(raw_data, **kwargs):
"""
Fits parameters necessary for the simulation based on the knowledge base
Inputs:
raw_data (KnowledgeBaseEcoli) - knowledge base consisting of the
necessary raw data
cpus (int) - number of processes to use (if > 1, use multiprocessing)
debug (bool) - if True, fit only one arbitrarily-chosen transcription
factor in order to speed up a debug cycle (should not be used for
an actual simulation)
save_intermediates (bool) - if True, save the state (sim_data and cell_specs)
to disk in intermediates_directory after each Parca step
intermediates_directory (str) - path to the directory to save intermediate
sim_data and cell_specs files to
load_intermediate (str) - the function name of the Parca step to load
sim_data and cell_specs from; functions prior to and including this
will be skipped but all following functions will run
variable_elongation_transcription (bool) - enable variable elongation
for transcription
variable_elongation_translation (bool) - enable variable elongation for
translation
disable_ribosome_capacity_fitting (bool) - if True, ribosome expression
is not fit to protein synthesis demands
disable_rnapoly_capacity_fitting (bool) - if True, RNA polymerase
expression is not fit to protein synthesis demands
"""
sim_data = SimulationDataEcoli()
cell_specs = {}
# Functions to modify sim_data and/or cell_specs
# Functions defined below should be wrapped by @save_state to allow saving
# and loading sim_data and cell_specs to skip certain functions while doing
# development for faster testing and iteration of later functions that
# might not need earlier functions to be rerun each time.
sim_data, cell_specs = initialize(sim_data, cell_specs, raw_data=raw_data, **kwargs)
sim_data, cell_specs = input_adjustments(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = basal_specs(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = tf_condition_specs(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = fit_condition(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = promoter_binding(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = adjust_promoters(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = set_conditions(sim_data, cell_specs, **kwargs)
sim_data, cell_specs = final_adjustments(sim_data, cell_specs, **kwargs)
if sim_data is None:
raise ValueError(
'sim_data is not specified. Check that the'
f' load_intermediate function ({kwargs.get("load_intermediate")})'
' is correct and matches a function to be run.'
)
return sim_data
[docs]
def save_state(func):
"""
Wrapper for functions called in fitSimData_1() to allow saving and loading
of sim_data and cell_specs at different points in the parameter calculation
pipeline. This is useful for development in order to skip time intensive
steps that are not required to recalculate in order to work with the desired
stage of parameter calculation.
This wrapper expects arguments in the kwargs passed into a wrapped function:
save_intermediates (bool): if True, the state (sim_data and cell_specs)
will be saved to disk in intermediates_directory
intermediates_directory (str): path to the directory to save intermediate
sim_data and cell_specs files to
load_intermediate (str): the name of the function to load sim_data and
cell_specs from, functions prior to and including this will be
skipped but all following functions will run
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
func_name = func.__name__
load_intermediate = kwargs.get("load_intermediate")
intermediates_dir = kwargs.get("intermediates_directory", "")
# Files to save to or load from
sim_data_file = os.path.join(intermediates_dir, f"sim_data_{func_name}.cPickle")
cell_specs_file = os.path.join(
intermediates_dir, f"cell_specs_{func_name}.cPickle"
)
# Run the wrapped function if the function to load is not specified or was already loaded
if load_intermediate is None or load_intermediate in functions_run:
start = time.time()
sim_data, cell_specs = func(*args, **kwargs)
end = time.time()
print(f"Ran {func_name} in {end - start:.0f} s")
# Load the saved results from the wrapped function if it is set to be loaded
elif load_intermediate == func_name:
if not os.path.exists(sim_data_file) or not os.path.exists(cell_specs_file):
raise IOError(
f"Could not find intermediate files ({sim_data_file}"
f" or {cell_specs_file}) to load. Make sure to save intermediates"
" before trying to load them."
)
with open(sim_data_file, "rb") as f:
sim_data = pickle.load(f)
with open(cell_specs_file, "rb") as f:
cell_specs = pickle.load(f)
print(f"Loaded sim_data and cell_specs for {func_name}")
# Skip running or loading if a later function will be loaded
else:
print(f"Skipped {func_name}")
sim_data = None
cell_specs = {}
# Save the current state of the parameter calculator after the function to disk
if (
kwargs.get("save_intermediates", False)
and intermediates_dir != ""
and sim_data is not None
):
os.makedirs(intermediates_dir, exist_ok=True)
with open(sim_data_file, "wb") as f:
pickle.dump(sim_data, f, protocol=pickle.HIGHEST_PROTOCOL)
with open(cell_specs_file, "wb") as f:
pickle.dump(cell_specs, f, protocol=pickle.HIGHEST_PROTOCOL)
print(f"Saved data for {func_name}")
# Record which functions have been run to know if the loaded function has run
functions_run.append(func_name)
return sim_data, cell_specs
return wrapper
[docs]
@save_state
def initialize(sim_data, cell_specs, raw_data=None, **kwargs):
sim_data.initialize(
raw_data=raw_data,
basal_expression_condition=BASAL_EXPRESSION_CONDITION,
)
return sim_data, cell_specs
[docs]
@save_state
def basal_specs(
sim_data,
cell_specs,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
variable_elongation_transcription=True,
variable_elongation_translation=False,
**kwargs,
):
cell_specs = buildBasalCellSpecifications(
sim_data,
variable_elongation_transcription,
variable_elongation_translation,
disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting,
)
# Set expression based on ppGpp regulation from basal expression
sim_data.process.transcription.set_ppgpp_expression(sim_data)
# TODO (Travis): use ppGpp expression in condition fitting
# Modify other properties
# Compute Km's
Km = setKmCooperativeEndoRNonLinearRNAdecay(
sim_data, cell_specs["basal"]["bulkContainer"]
)
n_transcribed_rnas = len(sim_data.process.transcription.rna_data)
sim_data.process.transcription.rna_data["Km_endoRNase"] = Km[:n_transcribed_rnas]
sim_data.process.transcription.mature_rna_data["Km_endoRNase"] = Km[
n_transcribed_rnas:
]
## Calculate and set maintenance values
# ----- Growth associated maintenance -----
fitMaintenanceCosts(sim_data, cell_specs["basal"]["bulkContainer"])
return sim_data, cell_specs
[docs]
@save_state
def tf_condition_specs(
sim_data,
cell_specs,
cpus=1,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
variable_elongation_transcription=True,
variable_elongation_translation=False,
**kwargs,
):
# Limit the number of CPUs before printing it to stdout.
cpus = parallelization.cpus(cpus)
# Apply updates to cell_specs from buildTfConditionCellSpecifications for each TF condition
conditions = list(sorted(sim_data.tf_to_active_inactive_conditions))
args = [
(
sim_data,
tf,
variable_elongation_transcription,
variable_elongation_translation,
disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting,
)
for tf in conditions
]
apply_updates(
buildTfConditionCellSpecifications, args, conditions, cell_specs, cpus
)
for conditionKey in cell_specs:
if conditionKey == "basal":
continue
sim_data.process.transcription.rna_expression[conditionKey] = cell_specs[
conditionKey
]["expression"]
sim_data.process.transcription.rna_synth_prob[conditionKey] = cell_specs[
conditionKey
]["synthProb"]
sim_data.process.transcription.cistron_expression[conditionKey] = cell_specs[
conditionKey
]["cistron_expression"]
sim_data.process.transcription.fit_cistron_expression[conditionKey] = (
cell_specs[conditionKey]["fit_cistron_expression"]
)
buildCombinedConditionCellSpecifications(
sim_data,
cell_specs,
variable_elongation_transcription,
variable_elongation_translation,
disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting,
)
return sim_data, cell_specs
[docs]
@save_state
def fit_condition(sim_data, cell_specs, cpus=1, **kwargs):
# Apply updates from fitCondition to cell_specs for each fit condition
conditions = list(sorted(cell_specs))
args = [(sim_data, cell_specs[condition], condition) for condition in conditions]
apply_updates(fitCondition, args, conditions, cell_specs, cpus)
for condition_label in sorted(cell_specs):
nutrients = sim_data.conditions[condition_label]["nutrients"]
if nutrients not in sim_data.translation_supply_rate:
sim_data.translation_supply_rate[nutrients] = cell_specs[condition_label][
"translation_aa_supply"
]
return sim_data, cell_specs
[docs]
@save_state
def set_conditions(sim_data, cell_specs, **kwargs):
sim_data.process.transcription.rnaSynthProbFraction = {}
sim_data.process.transcription.rnapFractionActiveDict = {}
sim_data.process.transcription.rnaSynthProbRProtein = {}
sim_data.process.transcription.rnaSynthProbRnaPolymerase = {}
sim_data.process.transcription.rnaPolymeraseElongationRateDict = {}
sim_data.expectedDryMassIncreaseDict = {}
sim_data.process.translation.ribosomeElongationRateDict = {}
sim_data.process.translation.ribosomeFractionActiveDict = {}
for condition_label in sorted(cell_specs):
condition = sim_data.conditions[condition_label]
nutrients = condition["nutrients"]
if VERBOSE > 0:
print("Updating mass in condition {}".format(condition_label))
spec = cell_specs[condition_label]
concDict = sim_data.process.metabolism.concentration_updates.concentrations_based_on_nutrients(
media_id=nutrients
)
concDict.update(
sim_data.mass.getBiomassAsConcentrations(
sim_data.condition_to_doubling_time[condition_label]
)
)
avgCellDryMassInit, fitAvgSolublePoolMass = rescaleMassForSolubleMetabolites(
sim_data,
spec["bulkContainer"],
concDict,
sim_data.condition_to_doubling_time[condition_label],
)
if VERBOSE > 0:
print("{} to {}".format(spec["avgCellDryMassInit"], avgCellDryMassInit))
spec["avgCellDryMassInit"] = avgCellDryMassInit
spec["fitAvgSolublePoolMass"] = fitAvgSolublePoolMass
mRnaSynthProb = sim_data.process.transcription.rna_synth_prob[condition_label][
sim_data.process.transcription.rna_data["is_mRNA"]
].sum()
tRnaSynthProb = sim_data.process.transcription.rna_synth_prob[condition_label][
sim_data.process.transcription.rna_data["is_tRNA"]
].sum()
rRnaSynthProb = sim_data.process.transcription.rna_synth_prob[condition_label][
sim_data.process.transcription.rna_data["is_rRNA"]
].sum()
if len(condition["perturbations"]) == 0:
if nutrients not in sim_data.process.transcription.rnaSynthProbFraction:
sim_data.process.transcription.rnaSynthProbFraction[nutrients] = {
"mRna": mRnaSynthProb,
"tRna": tRnaSynthProb,
"rRna": rRnaSynthProb,
}
if nutrients not in sim_data.process.transcription.rnaSynthProbRProtein:
prob = sim_data.process.transcription.rna_synth_prob[condition_label][
sim_data.process.transcription.rna_data[
"includes_ribosomal_protein"
]
]
sim_data.process.transcription.rnaSynthProbRProtein[nutrients] = prob
if (
nutrients
not in sim_data.process.transcription.rnaSynthProbRnaPolymerase
):
prob = sim_data.process.transcription.rna_synth_prob[condition_label][
sim_data.process.transcription.rna_data["includes_RNAP"]
]
sim_data.process.transcription.rnaSynthProbRnaPolymerase[nutrients] = (
prob
)
if nutrients not in sim_data.process.transcription.rnapFractionActiveDict:
frac = sim_data.growth_rate_parameters.get_fraction_active_rnap(
spec["doubling_time"]
)
sim_data.process.transcription.rnapFractionActiveDict[nutrients] = frac
if (
nutrients
not in sim_data.process.transcription.rnaPolymeraseElongationRateDict
):
rate = sim_data.growth_rate_parameters.get_rnap_elongation_rate(
spec["doubling_time"]
)
sim_data.process.transcription.rnaPolymeraseElongationRateDict[
nutrients
] = rate
if nutrients not in sim_data.expectedDryMassIncreaseDict:
sim_data.expectedDryMassIncreaseDict[nutrients] = spec[
"avgCellDryMassInit"
]
if nutrients not in sim_data.process.translation.ribosomeElongationRateDict:
rate = sim_data.growth_rate_parameters.get_ribosome_elongation_rate(
spec["doubling_time"]
)
sim_data.process.translation.ribosomeElongationRateDict[nutrients] = (
rate
)
if nutrients not in sim_data.process.translation.ribosomeFractionActiveDict:
frac = sim_data.growth_rate_parameters.get_fraction_active_ribosome(
spec["doubling_time"]
)
sim_data.process.translation.ribosomeFractionActiveDict[nutrients] = (
frac
)
return sim_data, cell_specs
[docs]
@save_state
def final_adjustments(sim_data, cell_specs, **kwargs):
# Adjust expression for RNA attenuation
sim_data.process.transcription.calculate_attenuation(sim_data, cell_specs)
# Adjust ppGpp regulated expression after conditions have been fit for physiological constraints
sim_data.process.transcription.adjust_polymerizing_ppgpp_expression(sim_data)
sim_data.process.transcription.adjust_ppgpp_expression_for_tfs(sim_data)
# Set supply constants for amino acids based on condition supply requirements
average_basal_container = create_bulk_container(sim_data, n_seeds=5)
average_with_aa_container = create_bulk_container(
sim_data, condition="with_aa", n_seeds=5
)
sim_data.process.metabolism.set_phenomological_supply_constants(sim_data)
sim_data.process.metabolism.set_mechanistic_supply_constants(
sim_data, cell_specs, average_basal_container, average_with_aa_container
)
sim_data.process.metabolism.set_mechanistic_export_constants(
sim_data, cell_specs, average_basal_container
)
sim_data.process.metabolism.set_mechanistic_uptake_constants(
sim_data, cell_specs, average_with_aa_container
)
# Set ppGpp reaction parameters
sim_data.process.transcription.set_ppgpp_kinetics_parameters(
average_basal_container, sim_data.constants
)
return sim_data, cell_specs
[docs]
def apply_updates(
func: Callable[..., dict],
args: list[tuple],
labels: list[str],
dest: dict,
cpus: int,
):
"""
Use multiprocessing (if cpus > 1) to apply args to a function to get
dictionary updates for a destination dictionary.
Args:
func: function to call with args
args: list of args to apply to func
labels: label for each set of args for exception information
dest: destination dictionary that will be updated with results
from each function call
cpus: number of cpus to use
"""
if cpus > 1:
print("Starting {} Parca processes".format(cpus))
# Apply args to func
pool = parallelization.pool(cpus)
results = {label: pool.apply_async(func, a) for label, a in zip(labels, args)}
pool.close()
pool.join()
# Check results from function calls and update dest
failed = []
for label, result in results.items():
if result.successful():
dest.update(result.get())
else:
# noinspection PyBroadException
try:
result.get()
except Exception:
traceback.print_exc()
failed.append(label)
# Cleanup
if failed:
raise RuntimeError(
"Error(s) raised for {} while using multiple processes".format(
", ".join(failed)
)
)
pool = None
print("End parallel processing")
else:
for a in args:
dest.update(func(*a))
[docs]
def buildBasalCellSpecifications(
sim_data,
variable_elongation_transcription=True,
variable_elongation_translation=False,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
):
"""
Creates cell specifications for the basal condition by fitting expression.
Relies on expressionConverge() to set the expression and update masses.
Inputs
------
- disable_ribosome_capacity_fitting (bool) - if True, ribosome expression
is not fit
- disable_rnapoly_capacity_fitting (bool) - if True, RNA polymerase
expression is not fit
Requires
--------
- Metabolite concentrations based on 'minimal' nutrients
- 'basal' RNA expression
- 'basal' doubling time
Modifies
--------
- Average mass values of the cell
- cistron expression
- RNA expression and synthesis probabilities
Returns
--------
- dict {'basal': dict} with the following keys in the dict from key 'basal':
'concDict' {metabolite_name (str): concentration (float with units)} -
dictionary of concentrations for each metabolite with a concentration
'fit_cistron_expression' (array of floats) - hypothetical expression for
each RNA cistron post-fit, total normalized to 1, if all
transcription units were monocistronic
'expression' (array of floats) - expression for each RNA, total normalized to 1
'doubling_time' (float with units) - cell doubling time
'synthProb' (array of floats) - synthesis probability for each RNA,
total normalized to 1
'avgCellDryMassInit' (float with units) - average initial cell dry mass
'fitAvgSolubleTargetMolMass' (float with units) - the adjusted dry mass
of the soluble fraction of a cell
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for expected counts based on expression of all bulk molecules
Notes
-----
- TODO - sets sim_data attributes and returns values - change to only return values
"""
# Create dictionary for basal condition
cell_specs = {}
cell_specs["basal"] = {
"concDict": sim_data.process.metabolism.concentration_updates.concentrations_based_on_nutrients(
media_id="minimal"
),
"expression": sim_data.process.transcription.rna_expression["basal"].copy(),
"doubling_time": sim_data.condition_to_doubling_time["basal"],
}
# Determine expression and synthesis probabilities
(
expression,
synthProb,
fit_cistron_expression,
avgCellDryMassInit,
fitAvgSolubleTargetMolMass,
bulkContainer,
_,
) = expressionConverge(
sim_data,
cell_specs["basal"]["expression"],
cell_specs["basal"]["concDict"],
cell_specs["basal"]["doubling_time"],
conditionKey="basal",
variable_elongation_transcription=variable_elongation_transcription,
variable_elongation_translation=variable_elongation_translation,
disable_ribosome_capacity_fitting=disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting=disable_rnapoly_capacity_fitting,
)
# Store calculated values
cell_specs["basal"]["expression"] = expression
cell_specs["basal"]["synthProb"] = synthProb
cell_specs["basal"]["fit_cistron_expression"] = fit_cistron_expression
cell_specs["basal"]["avgCellDryMassInit"] = avgCellDryMassInit
cell_specs["basal"]["fitAvgSolubleTargetMolMass"] = fitAvgSolubleTargetMolMass
cell_specs["basal"]["bulkContainer"] = bulkContainer
# Modify sim_data mass
sim_data.mass.avg_cell_dry_mass_init = avgCellDryMassInit
sim_data.mass.avg_cell_dry_mass = (
sim_data.mass.avg_cell_dry_mass_init
* sim_data.mass.avg_cell_to_initial_cell_conversion_factor
)
sim_data.mass.avg_cell_water_mass_init = (
sim_data.mass.avg_cell_dry_mass_init
/ sim_data.mass.cell_dry_mass_fraction
* sim_data.mass.cell_water_mass_fraction
)
sim_data.mass.fitAvgSolubleTargetMolMass = fitAvgSolubleTargetMolMass
# Modify sim_data expression
sim_data.process.transcription.rna_expression["basal"][:] = cell_specs["basal"][
"expression"
]
sim_data.process.transcription.rna_synth_prob["basal"][:] = cell_specs["basal"][
"synthProb"
]
sim_data.process.transcription.fit_cistron_expression["basal"] = cell_specs[
"basal"
]["fit_cistron_expression"]
return cell_specs
[docs]
def buildTfConditionCellSpecifications(
sim_data,
tf,
variable_elongation_transcription=True,
variable_elongation_translation=False,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
):
"""
Creates cell specifications for a given transcription factor by
fitting expression. Will set for the active and inactive TF condition.
Relies on expressionConverge() to set the expression and masses.
Uses fold change data relative to the 'basal' condition to determine
expression for a given TF.
Inputs
------
- tf (str) - label for the transcription factor to fit (eg. 'CPLX-125')
- disable_ribosome_capacity_fitting (bool) - if True, ribosome expression
is not fit
- disable_rnapoly_capacity_fitting (bool) - if True, RNA polymerase
expression is not fit
Requires
--------
- Metabolite concentrations based on nutrients for the TF
- Adjusted 'basal' cistron expression
- Doubling time for the TF
- Fold changes in expression for each gene given the TF
Returns
--------
- dict {tf + '__active'/'__inactive': dict} with the following keys in each dict:
'concDict' {metabolite_name (str): concentration (float with units)} -
dictionary of concentrations for each metabolite with a concentration
'expression' (array of floats) - expression for each RNA, total normalized to 1
'doubling_time' (float with units) - cell doubling time
'synthProb' (array of floats) - synthesis probability for each RNA,
total normalized to 1
'cistron_expression' (array of floats) - hypothetical expression for
each RNA cistron, calculated from basal cistron expression levels
and fold change data
'fit_cistron_expression' (array of floats) - hypothetical expression for
each RNA cistron post-fit, total normalized to 1, if all
transcription units were monocistronic
'avgCellDryMassInit' (float with units) - average initial cell dry mass
'fitAvgSolubleTargetMolMass' (float with units) - the adjusted dry mass
of the soluble fraction of a cell
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for expected counts based on expression of all bulk molecules
"""
cell_specs = {}
for choice in ["__active", "__inactive"]:
conditionKey = tf + choice
conditionValue = sim_data.conditions[conditionKey]
# Get expression for the condition based on fold changes over 'basal'
# condition if the condition is not the same as 'basal'
fcData = {}
if choice == "__active" and conditionValue != sim_data.conditions["basal"]:
fcData = sim_data.tf_to_fold_change[tf]
if choice == "__inactive" and conditionValue != sim_data.conditions["basal"]:
fcDataTmp = sim_data.tf_to_fold_change[tf].copy()
for key, value in fcDataTmp.items():
fcData[key] = 1.0 / value
expression, cistron_expression = expressionFromConditionAndFoldChange(
sim_data.process.transcription,
conditionValue["perturbations"],
fcData,
)
# Get metabolite concentrations for the condition
concDict = sim_data.process.metabolism.concentration_updates.concentrations_based_on_nutrients(
media_id=conditionValue["nutrients"]
)
concDict.update(
sim_data.mass.getBiomassAsConcentrations(
sim_data.condition_to_doubling_time[conditionKey]
)
)
# Create dictionary for the condition
cell_specs[conditionKey] = {
"concDict": concDict,
"expression": expression,
"doubling_time": sim_data.condition_to_doubling_time.get(
conditionKey, sim_data.condition_to_doubling_time["basal"]
),
}
# Determine expression and synthesis probabilities
(
expression,
synthProb,
fit_cistron_expression,
avgCellDryMassInit,
fitAvgSolubleTargetMolMass,
bulkContainer,
concDict,
) = expressionConverge(
sim_data,
cell_specs[conditionKey]["expression"],
cell_specs[conditionKey]["concDict"],
cell_specs[conditionKey]["doubling_time"],
sim_data.process.transcription.rna_data["Km_endoRNase"],
conditionKey=conditionKey,
variable_elongation_transcription=variable_elongation_transcription,
variable_elongation_translation=variable_elongation_translation,
disable_ribosome_capacity_fitting=disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting=disable_rnapoly_capacity_fitting,
)
# Store calculated values
cell_specs[conditionKey]["expression"] = expression
cell_specs[conditionKey]["synthProb"] = synthProb
cell_specs[conditionKey]["cistron_expression"] = cistron_expression
cell_specs[conditionKey]["fit_cistron_expression"] = fit_cistron_expression
cell_specs[conditionKey]["avgCellDryMassInit"] = avgCellDryMassInit
cell_specs[conditionKey]["fitAvgSolubleTargetMolMass"] = (
fitAvgSolubleTargetMolMass
)
cell_specs[conditionKey]["bulkContainer"] = bulkContainer
return cell_specs
[docs]
def buildCombinedConditionCellSpecifications(
sim_data,
cell_specs,
variable_elongation_transcription=True,
variable_elongation_translation=False,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
):
"""
Creates cell specifications for sets of transcription factors being active.
These sets include conditions like 'with_aa' or 'no_oxygen' where multiple
transcription factors will be active at the same time.
Inputs
------
- cell_specs {condition (str): dict} - information about each individual
transcription factor condition
- disable_ribosome_capacity_fitting (bool) - if True, ribosome expression
is not fit
- disable_rnapoly_capacity_fitting (bool) - if True, RNA polymerase
expression is not fit
Requires
--------
- Metabolite concentrations based on nutrients for the condition
- Adjusted 'basal' RNA expression
- Doubling time for the combined condition
- Fold changes in expression for each gene given the TF
Modifies
--------
- cell_specs dictionary for each combined condition
- RNA expression and synthesis probabilities for each combined condition
Notes
-----
- TODO - determine how to handle fold changes when multiple TFs change the
same gene because multiplying both fold changes together might not be
appropriate
"""
for conditionKey in sim_data.condition_active_tfs:
# Skip adjustments if 'basal' condition
if conditionKey == "basal":
continue
# Get expression from fold changes for each TF in the given condition
fcData = {}
conditionValue = sim_data.conditions[conditionKey]
for tf in sim_data.condition_active_tfs[conditionKey]:
for gene, fc in sim_data.tf_to_fold_change[tf].items():
fcData[gene] = fcData.get(gene, 1) * fc
for tf in sim_data.condition_inactive_tfs[conditionKey]:
for gene, fc in sim_data.tf_to_fold_change[tf].items():
fcData[gene] = fcData.get(gene, 1) / fc
expression, cistron_expression = expressionFromConditionAndFoldChange(
sim_data.process.transcription,
conditionValue["perturbations"],
fcData,
)
# Get metabolite concentrations for the condition
concDict = sim_data.process.metabolism.concentration_updates.concentrations_based_on_nutrients(
media_id=conditionValue["nutrients"]
)
concDict.update(
sim_data.mass.getBiomassAsConcentrations(
sim_data.condition_to_doubling_time[conditionKey]
)
)
# Create dictionary for the condition
cell_specs[conditionKey] = {
"concDict": concDict,
"expression": expression,
"doubling_time": sim_data.condition_to_doubling_time.get(
conditionKey, sim_data.condition_to_doubling_time["basal"]
),
}
# Determine expression and synthesis probabilities
(
expression,
synthProb,
fit_cistron_expression,
avgCellDryMassInit,
fitAvgSolubleTargetMolMass,
bulkContainer,
concDict,
) = expressionConverge(
sim_data,
cell_specs[conditionKey]["expression"],
cell_specs[conditionKey]["concDict"],
cell_specs[conditionKey]["doubling_time"],
sim_data.process.transcription.rna_data["Km_endoRNase"],
conditionKey=conditionKey,
variable_elongation_transcription=variable_elongation_transcription,
variable_elongation_translation=variable_elongation_translation,
disable_ribosome_capacity_fitting=disable_ribosome_capacity_fitting,
disable_rnapoly_capacity_fitting=disable_rnapoly_capacity_fitting,
)
# Modify cell_specs for calculated values
cell_specs[conditionKey]["expression"] = expression
cell_specs[conditionKey]["synthProb"] = synthProb
cell_specs[conditionKey]["cistron_expression"] = cistron_expression
cell_specs[conditionKey]["fit_cistron_expression"] = fit_cistron_expression
cell_specs[conditionKey]["avgCellDryMassInit"] = avgCellDryMassInit
cell_specs[conditionKey]["fitAvgSolubleTargetMolMass"] = (
fitAvgSolubleTargetMolMass
)
cell_specs[conditionKey]["bulkContainer"] = bulkContainer
# Modify sim_data expression
sim_data.process.transcription.rna_expression[conditionKey] = cell_specs[
conditionKey
]["expression"]
sim_data.process.transcription.rna_synth_prob[conditionKey] = cell_specs[
conditionKey
]["synthProb"]
sim_data.process.transcription.cistron_expression[conditionKey] = cell_specs[
conditionKey
]["cistron_expression"]
sim_data.process.transcription.fit_cistron_expression[conditionKey] = (
cell_specs[conditionKey]["fit_cistron_expression"]
)
[docs]
def expressionConverge(
sim_data,
expression,
concDict,
doubling_time,
Km=None,
conditionKey=None,
variable_elongation_transcription=True,
variable_elongation_translation=False,
disable_ribosome_capacity_fitting=False,
disable_rnapoly_capacity_fitting=False,
):
"""
Iteratively fits synthesis probabilities for RNA. Calculates initial
expression based on gene expression data and makes adjustments to match
physiological constraints for ribosome and RNAP counts. Relies on
fitExpression() to converge
Inputs
------
- expression (array of floats) - expression for each RNA, normalized to 1
- concDict {metabolite (str): concentration (float with units of mol/volume)} -
dictionary for concentrations of each metabolite with location tag
- doubling_time (float with units of time) - doubling time
- Km (array of floats with units of mol/volume) - Km for each RNA associated
with RNases
- disable_ribosome_capacity_fitting (bool) - if True, ribosome expression
is not fit
- disable_rnapoly_capacity_fitting (bool) - if True, RNA polymerase
expression is not fit
Requires
--------
- MAX_FITTING_ITERATIONS (int) - number of iterations to adjust expression
before an exception is raised
- FITNESS_THRESHOLD (float) - acceptable change from one iteration to break
the fitting loop
Returns
--------
- expression (array of floats) - adjusted expression for each RNA,
normalized to 1
- synthProb (array of floats) - synthesis probability for each RNA which
accounts for expression and degradation rate, normalized to 1
- avgCellDryMassInit (float with units of mass) - expected initial dry cell mass
- fitAvgSolubleTargetMolMass (float with units of mass) - the adjusted dry mass
of the soluble fraction of a cell
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for expected counts based on expression of all bulk molecules
"""
if VERBOSE > 0:
print(
f"Fitting RNA synthesis probabilities for condition {conditionKey} ...",
end="",
)
for iteration in range(MAX_FITTING_ITERATIONS):
if VERBOSE > 1:
print("Iteration: {}".format(iteration))
initialExpression = expression.copy()
expression = setInitialRnaExpression(sim_data, expression, doubling_time)
bulkContainer = createBulkContainer(sim_data, expression, doubling_time)
avgCellDryMassInit, fitAvgSolubleTargetMolMass = (
rescaleMassForSolubleMetabolites(
sim_data, bulkContainer, concDict, doubling_time
)
)
if not disable_rnapoly_capacity_fitting:
setRNAPCountsConstrainedByPhysiology(
sim_data,
bulkContainer,
doubling_time,
avgCellDryMassInit,
variable_elongation_transcription,
Km,
)
if not disable_ribosome_capacity_fitting:
setRibosomeCountsConstrainedByPhysiology(
sim_data, bulkContainer, doubling_time, variable_elongation_translation
)
# Normalize expression and write out changes
expression, synthProb, fit_cistron_expression, cistron_expression_res = (
fitExpression(
sim_data, bulkContainer, doubling_time, avgCellDryMassInit, Km
)
)
degreeOfFit = np.sqrt(np.mean(np.square(initialExpression - expression)))
if VERBOSE > 1:
print("degree of fit: {}".format(degreeOfFit))
print(
f"Average cistron expression residuals: {np.linalg.norm(cistron_expression_res)}"
)
if degreeOfFit < FITNESS_THRESHOLD:
print("! Fitting converged after {} iterations".format(iteration + 1))
break
else:
raise Exception("Fitting did not converge")
return (
expression,
synthProb,
fit_cistron_expression,
avgCellDryMassInit,
fitAvgSolubleTargetMolMass,
bulkContainer,
concDict,
)
[docs]
def fitCondition(sim_data, spec, condition):
"""
Takes a given condition and returns the predicted bulk average, bulk deviation,
protein monomer average, protein monomer deviation, and amino acid supply to
translation. This relies on calculateBulkDistributions and calculateTranslationSupply.
Inputs
------
- condition (str) - condition to fit (eg 'CPLX0-7705__active')
- spec {property (str): property values} - cell specifications for the given condition.
This function uses the specs "expression", "concDict", "avgCellDryMassInit",
and "doubling_time"
Returns
--------
- A dictionary {condition (str): spec (dict)} with the updated spec dictionary
with the following values updated:
- bulkAverageContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the mean of the counts of all bulk molecules
- bulkDeviationContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the standard deviation of the counts of all bulk molecules
- proteinMonomerAverageContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the mean of the counts of all protein monomers
- proteinMonomerDeviationContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the standard deviation of the counts of all protein monomers
- translation_aa_supply (array with units of mol/(mass.time)) - the supply rates
for each amino acid to translation
"""
if VERBOSE > 0:
print("Fitting condition {}".format(condition))
# Find bulk and protein distributions
(
bulkAverageContainer,
bulkDeviationContainer,
proteinMonomerAverageContainer,
proteinMonomerDeviationContainer,
) = calculateBulkDistributions(
sim_data,
spec["expression"],
spec["concDict"],
spec["avgCellDryMassInit"],
spec["doubling_time"],
)
spec["bulkAverageContainer"] = bulkAverageContainer
spec["bulkDeviationContainer"] = bulkDeviationContainer
spec["proteinMonomerAverageContainer"] = proteinMonomerAverageContainer
spec["proteinMonomerDeviationContainer"] = proteinMonomerDeviationContainer
# Find the supply rates of amino acids to translation given doubling time
spec["translation_aa_supply"] = calculateTranslationSupply(
sim_data,
spec["doubling_time"],
spec["proteinMonomerAverageContainer"],
spec["avgCellDryMassInit"],
)
return {condition: spec}
[docs]
def calculateTranslationSupply(
sim_data, doubling_time, bulkContainer, avgCellDryMassInit
):
"""
Returns the supply rates of all amino acids to translation given the desired
doubling time. This creates a limit on the polypeptide elongation process,
and thus on growth. The amino acid supply rate is found by calculating the
concentration of amino acids per gram dry cell weight and multiplying by the
loss to dilution given doubling time.
Inputs
------
- doubling_time (float with units of time) - measured doubling times given the condition
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for count of all bulk molecules
- avgCellDryMassInit (float with units of mass) - the average initial cell dry mass
Notes
-----
- The supply of amino acids should not be based on a desired doubling time,
but should come from a more mechanistic basis. This would allow simulations
of environmental shifts in which the doubling time is unknown.
"""
aaCounts = sim_data.process.translation.monomer_data[
"aa_counts"
] # the counts of each amino acid required for each protein
protein_idx = bulk_name_to_idx(
sim_data.process.translation.monomer_data["id"], bulkContainer["id"]
)
proteinCounts = counts(bulkContainer, protein_idx) # the counts of all proteins
nAvogadro = sim_data.constants.n_avogadro
molAAPerGDCW = units.sum(
aaCounts * np.tile(proteinCounts.reshape(-1, 1), (1, 21)), axis=0
) * ((1 / (units.aa * nAvogadro)) * (1 / avgCellDryMassInit))
# Calculate required amino acid supply to translation to counter dilution
translation_aa_supply = molAAPerGDCW * np.log(2) / doubling_time
return translation_aa_supply
# Sub-fitting functions
[docs]
def setTranslationEfficiencies(sim_data):
"""
This function's goal is to set translation efficiencies for a subset of metabolic proteins.
It first gathers the index of the proteins it wants to modify, then changes the monomer
translation efficiencies based on the adjustment that is specified.
These adjustments were made so that the simulation could run.
Requires
--------
- For each protein that needs to be modified, it takes in an adjustment factor.
Modifies
--------
- This function modifies, for a subset of proteins, their translational efficiencies in sim_data.
It takes their current efficiency and multiplies them by the factor specified in adjustments.
"""
for protein in sim_data.adjustments.translation_efficiencies_adjustments:
idx = np.where(sim_data.process.translation.monomer_data["id"] == protein)[0]
sim_data.process.translation.translation_efficiencies_by_monomer[idx] *= (
sim_data.adjustments.translation_efficiencies_adjustments[protein]
)
[docs]
def set_balanced_translation_efficiencies(sim_data):
"""
Sets the translation efficiencies of a group of proteins to be equal to the
mean value of all proteins within the group.
Requires
--------
- List of proteins that should have balanced translation efficiencies.
Modifies
--------
- Translation efficiencies of proteins within each specified group.
"""
monomer_id_to_index = {
monomer["id"][:-3]: i
for (i, monomer) in enumerate(sim_data.process.translation.monomer_data)
}
for proteins in sim_data.adjustments.balanced_translation_efficiencies:
protein_indexes = np.array([monomer_id_to_index[m] for m in proteins])
mean_trl_eff = sim_data.process.translation.translation_efficiencies_by_monomer[
protein_indexes
].mean()
sim_data.process.translation.translation_efficiencies_by_monomer[
protein_indexes
] = mean_trl_eff
[docs]
def setRNAExpression(sim_data):
"""
This function's goal is to set expression levels for a subset of RNAs.
It first gathers the index of the RNA's it wants to modify, then changes
the expression levels of those RNAs, within sim_data, based on the
specified adjustment factor. If the specified ID is an RNA cistron, the
expression levels of all RNA molecules containing the cistron are adjusted.
Requires
--------
- For each RNA that needs to be modified, it takes in an adjustment factor.
Modifies
--------
- This function modifies the basal RNA expression levels set in sim_data,
for the chosen RNAs. It takes their current basal expression and multiplies
them by the factor specified in adjustments.
- After updating the basal expression levels for the given RNAs, the
function normalizes all the basal expression levels.
"""
cistron_ids = set(sim_data.process.transcription.cistron_data["id"])
rna_id_to_index = {
rna_id[:-3]: i
for (i, rna_id) in enumerate(sim_data.process.transcription.rna_data["id"])
}
rna_index_to_adjustment = {}
for mol_id, adj_factor in sim_data.adjustments.rna_expression_adjustments.items():
if mol_id in cistron_ids:
# Find indexes of all RNAs containing the cistron
rna_indexes = sim_data.process.transcription.cistron_id_to_rna_indexes(
mol_id
)
elif mol_id in rna_id_to_index:
rna_indexes = rna_id_to_index[mol_id]
else:
raise ValueError(
f"Molecule ID {mol_id} not found in list of cistrons or transcription units."
)
# If multiple adjustments are made for the same RNA, take the maximum
# adjustment factor
for rna_index in rna_indexes:
rna_index_to_adjustment[rna_index] = max(
rna_index_to_adjustment.get(rna_index, 0), adj_factor
)
# Multiply all degradation rates with the specified adjustment factor
for rna_index, adj_factor in rna_index_to_adjustment.items():
sim_data.process.transcription.rna_expression["basal"][rna_index] *= adj_factor
sim_data.process.transcription.rna_expression["basal"] /= (
sim_data.process.transcription.rna_expression["basal"].sum()
)
[docs]
def setRNADegRates(sim_data):
"""
This function's goal is to adjust the degradation rates for a subset of
metabolic RNA's. It first gathers the index of the RNA's it wants to modify,
then changes the degradation rates of those RNAs. If the specified ID is
that of an RNA cistron, the degradation rates of all RNA molecules
containing the cistron are adjusted. (Note: since RNA concentrations are
assumed to be in equilibrium, increasing the degradation rate increases the
synthesis rates of these RNAs)
Requires
--------
- For each RNA that needs to be modified, it takes in an adjustment factor
Modifies
--------
- This function modifies the RNA degradation rates for the chosen RNAs in
sim_data. It takes their current degradation rate and multiplies them by the
factor specified in adjustments.
"""
cistron_id_to_index = {
cistron_id: i
for (i, cistron_id) in enumerate(
sim_data.process.transcription.cistron_data["id"]
)
}
rna_id_to_index = {
rna_id[:-3]: i
for (i, rna_id) in enumerate(sim_data.process.transcription.rna_data["id"])
}
rna_index_to_adjustment = {}
for mol_id, adj_factor in sim_data.adjustments.rna_deg_rates_adjustments.items():
if mol_id in cistron_id_to_index:
# Multiply the cistron degradation rate with the specified
# adjustment factor (Note: these rates are not actually used by the
# simulation but are still adjusted for bookkeeping purposes)
cistron_index = cistron_id_to_index[mol_id]
sim_data.process.transcription.cistron_data.struct_array["deg_rate"][
cistron_index
] *= sim_data.adjustments.rna_deg_rates_adjustments[mol_id]
# Find indexes of all RNAs containing the cistron
rna_indexes = sim_data.process.transcription.cistron_id_to_rna_indexes(
mol_id
)
elif mol_id in rna_id_to_index:
rna_indexes = rna_id_to_index[mol_id]
else:
raise ValueError(
f"Molecule ID {mol_id} not found in list of cistrons or transcription units."
)
# If multiple adjustments are made for the same RNA, take the maximum
# adjustment factor
for rna_index in rna_indexes:
rna_index_to_adjustment[rna_index] = max(
rna_index_to_adjustment.get(rna_index, 0), adj_factor
)
# Multiply all degradation rates with the specified adjustment factor
for rna_index, adj_factor in rna_index_to_adjustment.items():
sim_data.process.transcription.rna_data.struct_array["deg_rate"][rna_index] *= (
adj_factor
)
[docs]
def setProteinDegRates(sim_data):
"""
This function's goal is to set the degradation rates for a subset of proteins.
It first gathers the index of the proteins it wants to modify, then changes the degradation
rates of those proteins. These adjustments were made so that the simulation could run.
Requires
--------
- For each protein that needs to be modified it take in an adjustment factor.
Modifies
--------
- This function modifies the protein degradation rates for the chosen proteins in sim_data.
It takes their current degradation rate and multiplies them by the factor specified in adjustments.
"""
for protein in sim_data.adjustments.protein_deg_rates_adjustments:
idx = np.where(sim_data.process.translation.monomer_data["id"] == protein)[0]
sim_data.process.translation.monomer_data.struct_array["deg_rate"][idx] *= (
sim_data.adjustments.protein_deg_rates_adjustments[protein]
)
[docs]
def setInitialRnaExpression(sim_data, expression, doubling_time):
"""
Creates a container that with the initial count and ID of each RNA,
calculated based on the mass fraction, molecular weight, and expression
distribution of each RNA. For rRNA the counts are set based on mass, while
for tRNA and mRNA the counts are set based on mass and relative abundance.
Relies on the math function totalCountFromMassesAndRatios.
Requires
--------
- Needs information from the knowledge base about the mass fraction,
molecular weight, and distribution of each RNA species.
Inputs
------
- expression (array of floats) - expression for each RNA, normalized to 1
- doubling_time (float with units of time) - doubling time for condition
Returns
--------
- expression (array of floats) - contains the adjusted RNA expression,
normalized to 1
Notes
-----
- Now rnaData["synthProb"] does not match "expression"
"""
# Load from sim_data
n_avogadro = sim_data.constants.n_avogadro
transcription = sim_data.process.transcription
cistron_data = transcription.cistron_data
rna_data = transcription.rna_data
get_average_copy_number = sim_data.process.replication.get_average_copy_number
rna_mw = rna_data["mw"]
rna_rRNA_mw = rna_data["rRNA_mw"]
rna_tRNA_mw = rna_data["tRNA_mw"]
rna_coord = rna_data["replication_coordinate"]
# Mask arrays for each RNA type
is_rRNA = rna_data["is_rRNA"]
is_tRNA = rna_data["is_tRNA"]
is_mRNA = rna_data["is_mRNA"]
# Get list of RNA IDs for each type and tRNA cistron IDs
all_RNA_ids = rna_data["id"]
ids_rRNA = all_RNA_ids[is_rRNA]
ids_mRNA = all_RNA_ids[is_mRNA]
ids_tRNA = all_RNA_ids[is_tRNA]
ids_tRNA_cistrons = cistron_data["id"][cistron_data["is_tRNA"]]
# Get mass fractions of each RNA type for this condition
initial_rna_mass = (
sim_data.mass.get_component_masses(doubling_time)["rnaMass"]
/ sim_data.mass.avg_cell_to_initial_cell_conversion_factor
)
ppgpp = sim_data.growth_rate_parameters.get_ppGpp_conc(doubling_time)
rna_fractions = transcription.get_rna_fractions(ppgpp)
total_mass_rRNA = initial_rna_mass * rna_fractions["rRNA"]
total_mass_tRNA = initial_rna_mass * rna_fractions["tRNA"]
total_mass_mRNA = initial_rna_mass * rna_fractions["mRNA"]
# Get molecular weights of each RNA. For rRNAs/tRNAs, we only account for
# the masses of the mature rRNAs/tRNAs within each RNA, since these RNAs are
# almost instantly processed to yield the mature RNAs.
individual_masses_rRNA = rna_rRNA_mw[is_rRNA] / n_avogadro
individual_masses_tRNA = rna_tRNA_mw[is_tRNA] / n_avogadro
individual_masses_mRNA = rna_mw[is_mRNA] / n_avogadro
# Set rRNA TU expression assuming equal per-copy transcription
# probabilities. The combined expression levels of each rRNA TU are assumed
# to be proportional to their expected average copy numbers, which are
# dependent on the doubling time and the chromosomal position.
tau = doubling_time.asNumber(units.min)
coord_rRNA = rna_coord[is_rRNA]
n_avg_copy_rRNA = get_average_copy_number(tau, coord_rRNA)
distribution_rRNA = normalize(n_avg_copy_rRNA)
total_count_rRNA = totalCountFromMassesAndRatios(
total_mass_rRNA, individual_masses_rRNA, distribution_rRNA
)
counts_rRNA = total_count_rRNA * distribution_rRNA
# Get the total mass of tRNAs that are expressed from rRNA TUs and subtract
# this mass from the total tRNA mass
tRNA_masses_in_each_rRNA = rna_tRNA_mw[is_rRNA] / n_avogadro
total_mass_tRNA_in_rRNAs = units.dot(counts_rRNA, tRNA_masses_in_each_rRNA)
total_mass_tRNA -= total_mass_tRNA_in_rRNAs
# Get tRNA cistron distribution (see Dong 1996), while setting values for
# cistrons that are expressed from rRNAs to zero
tRNA_distribution = sim_data.mass.get_trna_distribution(doubling_time)
tRNA_id_to_dist = {
trna_id: dist
for (trna_id, dist) in zip(
tRNA_distribution["id"], tRNA_distribution["molar_ratio_to_16SrRNA"]
)
}
distribution_tRNA_cistrons = np.zeros(len(ids_tRNA_cistrons))
for i, tRNA_id in enumerate(ids_tRNA_cistrons):
distribution_tRNA_cistrons[i] = tRNA_id_to_dist[tRNA_id]
tRNA_expressed_from_rRNA_mask = transcription.cistron_tu_mapping_matrix.dot(
is_rRNA
)[cistron_data["is_tRNA"]].astype(bool)
distribution_tRNA_cistrons[tRNA_expressed_from_rRNA_mask] = 0
distribution_tRNA_cistrons = normalize(distribution_tRNA_cistrons)
# Approximate distribution of tRNA-including transcripts from tRNA cistron
# distribution by using NNLS
distribution_tRNA_including_transcripts, _ = transcription.fit_trna_expression(
distribution_tRNA_cistrons
)
# Get distribution of tRNA-including transcripts that are not rRNAs
is_hybrid = rna_data["is_rRNA"][rna_data["includes_tRNA"]]
distribution_tRNA = distribution_tRNA_including_transcripts[~is_hybrid]
distribution_tRNA = normalize(distribution_tRNA)
# Assign tRNA counts based on this distribution
total_count_tRNA = totalCountFromMassesAndRatios(
total_mass_tRNA, individual_masses_tRNA, distribution_tRNA
)
counts_tRNA = total_count_tRNA * distribution_tRNA
# Assign mRNA counts based on mass and relative abundances (microarrays)
distribution_mRNA = normalize(expression[is_mRNA])
total_count_mRNA = totalCountFromMassesAndRatios(
total_mass_mRNA, individual_masses_mRNA, distribution_mRNA
)
counts_mRNA = total_count_mRNA * distribution_mRNA
# Set expression counts in container
rRNA_idx = bulk_name_to_idx(ids_rRNA, all_RNA_ids)
tRNA_idx = bulk_name_to_idx(ids_tRNA, all_RNA_ids)
mRNA_idx = bulk_name_to_idx(ids_mRNA, all_RNA_ids)
rna_expression_container = np.zeros(len(all_RNA_ids), dtype=np.float64)
rna_expression_container[rRNA_idx] = counts_rRNA
rna_expression_container[tRNA_idx] = counts_tRNA
rna_expression_container[mRNA_idx] = counts_mRNA
expression = normalize(rna_expression_container)
return expression
[docs]
def totalCountIdDistributionProtein(sim_data, expression, doubling_time):
"""
Calculates the total counts of proteins from the relative expression of RNA,
individual protein mass, and total protein mass. Relies on the math functions
netLossRateFromDilutionAndDegradationProtein, proteinDistributionFrommRNA,
totalCountFromMassesAndRatios.
Inputs
------
- expression (array of floats) - relative frequency distribution of RNA expression
- doubling_time (float with units of time) - measured doubling time given the condition
Returns
--------
- total_count_protein (float) - total number of proteins
- ids_protein (array of str) - name of each protein with location tag
- distribution_protein (array of floats) - distribution for each protein,
normalized to 1
"""
ids_protein = sim_data.process.translation.monomer_data["id"]
total_mass_protein = (
sim_data.mass.get_component_masses(doubling_time)["proteinMass"]
/ sim_data.mass.avg_cell_to_initial_cell_conversion_factor
)
individual_masses_protein = (
sim_data.process.translation.monomer_data["mw"] / sim_data.constants.n_avogadro
)
mRNA_cistron_expression = (
sim_data.process.transcription.cistron_tu_mapping_matrix.dot(expression)[
sim_data.process.transcription.cistron_data["is_mRNA"]
]
)
distribution_transcripts_by_protein = normalize(
sim_data.relation.monomer_to_mRNA_cistron_mapping().dot(mRNA_cistron_expression)
)
translation_efficiencies_by_protein = normalize(
sim_data.process.translation.translation_efficiencies_by_monomer
)
degradationRates = sim_data.process.translation.monomer_data["deg_rate"]
# Find the net protein loss
netLossRate_protein = netLossRateFromDilutionAndDegradationProtein(
doubling_time, degradationRates
)
# Find the protein distribution
distribution_protein = proteinDistributionFrommRNA(
distribution_transcripts_by_protein,
translation_efficiencies_by_protein,
netLossRate_protein,
)
# Find total protein counts
total_count_protein = totalCountFromMassesAndRatios(
total_mass_protein, individual_masses_protein, distribution_protein
)
return total_count_protein, ids_protein, distribution_protein
[docs]
def totalCountIdDistributionRNA(sim_data, expression, doubling_time):
"""
Calculates the total counts of RNA from their relative expression,
individual mass, and total RNA mass. Relies on the math function
totalCountFromMassesAndRatios.
Inputs
------
- expression (array of floats) - relative frequency distribution of RNA
expression
- doubling_time (float with units of time) - measured doubling time given
the condition
Returns
--------
- total_count_RNA (float) - total number of RNAs
- ids_rnas (array of str) - name of each RNA with location tag
- distribution_RNA (array of floats) - distribution for each RNA,
normalized to 1
"""
transcription = sim_data.process.transcription
ids_rnas = transcription.rna_data["id"]
total_mass_RNA = (
sim_data.mass.get_component_masses(doubling_time)["rnaMass"]
/ sim_data.mass.avg_cell_to_initial_cell_conversion_factor
)
mws = transcription.rna_data["mw"]
# Use only the rRNA/tRNA mass for rRNA/tRNA transcription units
is_rRNA = transcription.rna_data["is_rRNA"]
is_tRNA = transcription.rna_data["is_tRNA"]
mws[is_rRNA] = transcription.rna_data["rRNA_mw"][is_rRNA]
mws[is_tRNA] = transcription.rna_data["tRNA_mw"][is_tRNA]
individual_masses_RNA = mws / sim_data.constants.n_avogadro
distribution_RNA = normalize(expression)
total_count_RNA = totalCountFromMassesAndRatios(
total_mass_RNA, individual_masses_RNA, distribution_RNA
)
return total_count_RNA, ids_rnas, distribution_RNA
[docs]
def createBulkContainer(sim_data, expression, doubling_time):
"""
Creates a container that tracks the counts of all bulk molecules. Relies on
totalCountIdDistributionRNA and totalCountIdDistributionProtein to set the
counts and IDs of all RNAs and proteins.
Inputs
------
- expression (array of floats) - relative frequency distribution of RNA expression
- doubling_time (float with units of time) - measured doubling time given the condition
Returns
-------
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for count of all bulk molecules
"""
total_count_RNA, ids_rnas, distribution_RNA = totalCountIdDistributionRNA(
sim_data, expression, doubling_time
)
total_count_protein, ids_protein, distribution_protein = (
totalCountIdDistributionProtein(sim_data, expression, doubling_time)
)
ids_molecules = sim_data.internal_state.bulk_molecules.bulk_data["id"]
# Construct bulk container
bulkContainer = np.array(
[mol_data for mol_data in zip(ids_molecules, np.zeros(len(ids_molecules)))],
dtype=[("id", ids_molecules.dtype), ("count", np.float64)],
)
# Assign RNA counts based on mass and expression distribution
counts_RNA = total_count_RNA * distribution_RNA
rna_idx = bulk_name_to_idx(ids_rnas, bulkContainer["id"])
bulkContainer["count"][rna_idx] = counts_RNA
# Assign protein counts based on mass and mRNA counts
counts_protein = total_count_protein * distribution_protein
protein_idx = bulk_name_to_idx(ids_protein, bulkContainer["id"])
bulkContainer["count"][protein_idx] = counts_protein
return bulkContainer
[docs]
def setRibosomeCountsConstrainedByPhysiology(
sim_data, bulkContainer, doubling_time, variable_elongation_translation
):
"""
Set counts of ribosomal protein subunits based on three constraints:
(1) Expected protein distribution doubles in one cell cycle
(2) Measured rRNA mass fractions
(3) Expected ribosomal protein subunit counts based on RNA expression data
Inputs
------
bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for count of all bulk molecules
doubling_time (float with units of time) - doubling time given the condition
variable_elongation_translation (bool) - whether there is variable elongation for translation
Modifies
--------
- counts of ribosomal protein subunits in bulkContainer
"""
active_fraction = sim_data.growth_rate_parameters.get_fraction_active_ribosome(
doubling_time
)
# Get IDs and stoichiometry of ribosome subunits
ribosome_30S_subunits = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.s30_full_complex
)["subunitIds"]
ribosome_50S_subunits = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.s50_full_complex
)["subunitIds"]
ribosome_30S_stoich = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.s30_full_complex
)["subunitStoich"]
ribosome_50S_stoich = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.s50_full_complex
)["subunitStoich"]
# Remove rRNA subunits from each array
monomer_ids = set(sim_data.process.translation.monomer_data["id"])
def remove_rRNA(subunit_ids, subunit_stoich):
is_protein = np.array(
[(subunit_id in monomer_ids) for subunit_id in subunit_ids]
)
return (subunit_ids[is_protein], subunit_stoich[is_protein])
ribosome_30S_subunits, ribosome_30S_stoich = remove_rRNA(
ribosome_30S_subunits, ribosome_30S_stoich
)
ribosome_50S_subunits, ribosome_50S_stoich = remove_rRNA(
ribosome_50S_subunits, ribosome_50S_stoich
)
# -- CONSTRAINT 1: Expected protein distribution doubling -- #
## Calculate minimium number of 30S and 50S subunits required in order to double our expected
## protein distribution in one cell cycle
proteinLengths = units.sum(
sim_data.process.translation.monomer_data["aa_counts"], axis=1
)
proteinDegradationRates = sim_data.process.translation.monomer_data["deg_rate"]
protein_idx = bulk_name_to_idx(
sim_data.process.translation.monomer_data["id"], bulkContainer["id"]
)
proteinCounts = counts(bulkContainer, protein_idx)
netLossRate_protein = netLossRateFromDilutionAndDegradationProtein(
doubling_time,
proteinDegradationRates,
)
elongation_rates = sim_data.process.translation.make_elongation_rates(
None,
sim_data.growth_rate_parameters.get_ribosome_elongation_rate(
doubling_time
).asNumber(units.aa / units.s),
1,
variable_elongation_translation,
)
nRibosomesNeeded = np.ceil(
calculateMinPolymerizingEnzymeByProductDistribution(
proteinLengths, elongation_rates, netLossRate_protein, proteinCounts
).asNumber(units.aa / units.s)
/ active_fraction
)
# Minimum number of ribosomes needed
constraint1_ribosome30SCounts = nRibosomesNeeded * ribosome_30S_stoich
constraint1_ribosome50SCounts = nRibosomesNeeded * ribosome_50S_stoich
# -- CONSTRAINT 2: Measured rRNA mass fraction -- #
# Get rRNA counts
rna_data = sim_data.process.transcription.rna_data
rrna_idx = bulk_name_to_idx(
rna_data["id"][rna_data["is_rRNA"]], bulkContainer["id"]
)
rRNA_tu_counts = counts(bulkContainer, rrna_idx)
rRNA_cistron_counts = (
sim_data.process.transcription.rRNA_cistron_tu_mapping_matrix.dot(
rRNA_tu_counts
)
)
rRNA_cistron_indexes = np.where(
sim_data.process.transcription.cistron_data["is_rRNA"]
)[0]
rRNA_23S_counts = rRNA_cistron_counts[
sim_data.process.transcription.cistron_data["is_23S_rRNA"][rRNA_cistron_indexes]
]
rRNA_16S_counts = rRNA_cistron_counts[
sim_data.process.transcription.cistron_data["is_16S_rRNA"][rRNA_cistron_indexes]
]
rRNA_5S_counts = rRNA_cistron_counts[
sim_data.process.transcription.cistron_data["is_5S_rRNA"][rRNA_cistron_indexes]
]
## 16S rRNA is in the 30S subunit
massFracPredicted_30SCount = rRNA_16S_counts.sum()
## 23S and 5S rRNA are in the 50S subunit
massFracPredicted_50SCount = min(rRNA_23S_counts.sum(), rRNA_5S_counts.sum())
constraint2_ribosome30SCounts = massFracPredicted_30SCount * ribosome_30S_stoich
constraint2_ribosome50SCounts = massFracPredicted_50SCount * ribosome_50S_stoich
# -- CONSTRAINT 3: Expected ribosomal subunit counts based expression
## Calculate fundamental ribosomal subunit count distribution based on RNA expression data
## Already calculated and stored in bulkContainer
ribosome_30S_idx = bulk_name_to_idx(ribosome_30S_subunits, bulkContainer["id"])
ribosome30SCounts = counts(bulkContainer, ribosome_30S_idx)
ribosome_50S_idx = bulk_name_to_idx(ribosome_50S_subunits, bulkContainer["id"])
ribosome50SCounts = counts(bulkContainer, ribosome_50S_idx)
# -- SET RIBOSOME FUNDAMENTAL SUBUNIT COUNTS TO MAXIMUM CONSTRAINT -- #
constraint_names = np.array(
[
"Insufficient to double protein counts",
"Too small for mass fraction",
"Current level OK",
]
)
rib30lims = np.array(
[
nRibosomesNeeded,
massFracPredicted_30SCount,
(ribosome30SCounts / ribosome_30S_stoich).min(),
]
)
rib50lims = np.array(
[
nRibosomesNeeded,
massFracPredicted_50SCount,
(ribosome50SCounts / ribosome_50S_stoich).min(),
]
)
if VERBOSE > 1:
print(
"30S limit: {}".format(
constraint_names[np.where(rib30lims.max() == rib30lims)[0]][-1]
)
)
print(
"30S actual count: {}".format(
(ribosome30SCounts / ribosome_30S_stoich).min()
)
)
print(
"30S count set to: {}".format(
rib30lims[np.where(rib30lims.max() == rib30lims)[0]][-1]
)
)
print(
"50S limit: {}".format(
constraint_names[np.where(rib50lims.max() == rib50lims)[0]][-1]
)
)
print(
"50S actual count: {}".format(
(ribosome50SCounts / ribosome_50S_stoich).min()
)
)
print(
"50S count set to: {}".format(
rib50lims[np.where(rib50lims.max() == rib50lims)[0]][-1]
)
)
bulkContainer["count"][ribosome_30S_idx] = np.fmax(
np.fmax(ribosome30SCounts, constraint1_ribosome30SCounts),
constraint2_ribosome30SCounts,
)
bulkContainer["count"][ribosome_50S_idx] = np.fmax(
np.fmax(ribosome50SCounts, constraint1_ribosome50SCounts),
constraint2_ribosome50SCounts,
)
[docs]
def setRNAPCountsConstrainedByPhysiology(
sim_data,
bulkContainer,
doubling_time,
avgCellDryMassInit,
variable_elongation_transcription,
Km=None,
):
"""
Set counts of RNA polymerase based on two constraints:
(1) Number of RNAP subunits required to maintain steady state of mRNAs
(2) Expected RNAP subunit counts based on (mRNA) distribution recorded in
bulkContainer
Inputs
------
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for count of all bulk molecules
- doubling_time (float with units of time) - doubling time given the condition
- avgCellDryMassInit (float with units of mass) - expected initial dry cell mass
- Km (array of floats with units of mol/volume) - Km for each RNA associated
with RNases
Modifies
--------
- bulkContainer (np.ndarray object) - the counts of RNA polymerase
subunits are set according to Constraint 1
Notes
-----
- Constraint 2 is not being used -- see final line of this function.
"""
# -- CONSTRAINT 1: Expected RNA distribution doubling -- #
rnaLengths = units.sum(
sim_data.process.transcription.rna_data["counts_ACGU"], axis=1
)
rnaLossRate = None
rna_idx = bulk_name_to_idx(
sim_data.process.transcription.rna_data["id"], bulkContainer["id"]
)
if Km is None:
# RNA loss rate is in units of counts/time, and computed by summing the
# contributions of degradation and dilution.
rnaLossRate = netLossRateFromDilutionAndDegradationRNALinear(
doubling_time,
sim_data.process.transcription.rna_data["deg_rate"],
counts(bulkContainer, rna_idx),
)
else:
# Get constants to compute countsToMolar factor
cellDensity = sim_data.constants.cell_density
cellVolume = (
avgCellDryMassInit / cellDensity / sim_data.mass.cell_dry_mass_fraction
)
countsToMolar = 1 / (sim_data.constants.n_avogadro * cellVolume)
# Gompute input arguments for netLossRateFromDilutionAndDegradationRNA()
rnaConc = countsToMolar * counts(bulkContainer, rna_idx)
endoRNase_idx = bulk_name_to_idx(
sim_data.process.rna_decay.endoRNase_ids, bulkContainer["id"]
)
endoRNaseConc = countsToMolar * counts(bulkContainer, endoRNase_idx)
kcatEndoRNase = sim_data.process.rna_decay.kcats
totalEndoRnaseCapacity = units.sum(endoRNaseConc * kcatEndoRNase)
# RNA loss rate is in units of counts/time, and computed by accounting
# for the competitive inhibition of RNase by other RNA targets.
rnaLossRate = netLossRateFromDilutionAndDegradationRNA(
doubling_time,
(1 / countsToMolar) * totalEndoRnaseCapacity,
Km,
rnaConc,
countsToMolar,
)
# Compute number of RNA polymerases required to maintain steady state of mRNA
elongation_rates = sim_data.process.transcription.make_elongation_rates(
None,
sim_data.growth_rate_parameters.get_rnap_elongation_rate(
doubling_time
).asNumber(units.nt / units.s),
1,
variable_elongation_transcription,
)
nActiveRnapNeeded = calculateMinPolymerizingEnzymeByProductDistributionRNA(
rnaLengths, elongation_rates, rnaLossRate
).asNumber(units.nt / units.s)
nRnapsNeeded = np.ceil(
nActiveRnapNeeded
/ sim_data.growth_rate_parameters.get_fraction_active_rnap(doubling_time)
)
# Convert nRnapsNeeded to the number of RNA polymerase subunits required
rnapIds = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.full_RNAP
)["subunitIds"]
rnapStoich = sim_data.process.complexation.get_monomers(
sim_data.molecule_ids.full_RNAP
)["subunitStoich"]
minRnapSubunitCounts = nRnapsNeeded * rnapStoich
# -- CONSTRAINT 2: Expected RNAP subunit counts based on distribution -- #
rnap_idx = bulk_name_to_idx(rnapIds, bulkContainer["id"])
rnapCounts = counts(bulkContainer, rnap_idx)
## -- SET RNAP COUNTS TO MAXIMUM CONSTRAINTS -- #
constraint_names = np.array(
["Current level OK", "Insufficient to double RNA distribution"]
)
rnapLims = np.array(
[(rnapCounts / rnapStoich).min(), (minRnapSubunitCounts / rnapStoich).min()]
)
if VERBOSE > 1:
print(
"rnap limit: {}".format(
constraint_names[np.where(rnapLims.max() == rnapLims)[0]][0]
)
)
print("rnap actual count: {}".format((rnapCounts / rnapStoich).min()))
print(
"rnap counts set to: {}".format(
rnapLims[np.where(rnapLims.max() == rnapLims)[0]][0]
)
)
if np.any(minRnapSubunitCounts < 0):
raise ValueError("RNAP protein counts must be positive.")
bulkContainer["count"][rnap_idx] = minRnapSubunitCounts
[docs]
def fitExpression(sim_data, bulkContainer, doubling_time, avgCellDryMassInit, Km=None):
"""
Determines expression and synthesis probabilities for RNA molecules to fit
protein levels and RNA degradation rates. Assumes a steady state analysis
where the RNA synthesis probability will be the same as the degradation rate.
If no Km is given, then RNA degradation is assumed to be linear otherwise
degradation is calculated based on saturation with RNases.
Inputs
------
- bulkContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for expected count based on expression of all bulk molecules
- doubling_time (float with units of time) - doubling time
- avgCellDryMassInit (float with units of mass) - expected initial dry cell mass
- Km (array of floats with units of mol/volume) - Km for each RNA associated
with RNases
Modifies
--------
- bulkContainer counts of RNA and proteins
Returns
--------
- expression (array of floats) - adjusted expression for each RNA,
normalized to 1
- synth_prob (array of floats) - synthesis probability for each RNA which
accounts for expression and degradation rate, normalized to 1
- fit_cistron_expression (array of floats) - target expression levels of
each cistron (gene) used to calculate RNA expression levels
- cistron_expression_res (array of floats) - the residuals of the NNLS
problem solved to calculate RNA expression levels
Notes
-----
- TODO - sets bulkContainer counts and returns values - change to only return values
"""
# Load required parameters
transcription = sim_data.process.transcription
translation = sim_data.process.translation
translation_efficiencies_by_protein = normalize(
translation.translation_efficiencies_by_monomer
)
degradation_rates_protein = translation.monomer_data["deg_rate"]
net_loss_rate_protein = netLossRateFromDilutionAndDegradationProtein(
doubling_time, degradation_rates_protein
)
avg_cell_fraction_mass = sim_data.mass.get_component_masses(doubling_time)
total_mass_RNA = (
avg_cell_fraction_mass["rnaMass"]
/ sim_data.mass.avg_cell_to_initial_cell_conversion_factor
)
cistron_tu_mapping_matrix = transcription.cistron_tu_mapping_matrix
# Calculate current expression fraction of mRNA transcription units
rna_idx = bulk_name_to_idx(transcription.rna_data["id"], bulkContainer["id"])
RNA_counts = counts(bulkContainer, rna_idx)
rna_expression_container = normalize(RNA_counts)
mRNA_tu_expression_frac = np.sum(
rna_expression_container[transcription.rna_data["is_mRNA"]]
)
# Calculate current expression levels of each cistron given the RNA
# expression levels
fit_cistron_expression = normalize(cistron_tu_mapping_matrix.dot(RNA_counts))
mRNA_cistron_expression_frac = fit_cistron_expression[
transcription.cistron_data["is_mRNA"]
].sum()
# Calculate required mRNA expression from monomer counts
protein_idx = bulk_name_to_idx(translation.monomer_data["id"], bulkContainer["id"])
counts_protein = counts(bulkContainer, protein_idx)
mRNA_cistron_distribution_per_protein = mRNADistributionFromProtein(
normalize(counts_protein),
translation_efficiencies_by_protein,
net_loss_rate_protein,
)
mRNA_cistron_distribution = normalize(
sim_data.relation.monomer_to_mRNA_cistron_mapping().T.dot(
mRNA_cistron_distribution_per_protein
)
)
# Replace mRNA cistron expression with values calculated from monomer counts
fit_cistron_expression[transcription.cistron_data["is_mRNA"]] = (
mRNA_cistron_expression_frac * mRNA_cistron_distribution
)
# Use least squares to calculate expression of transcription units required
# to generate the given cistron expression levels and the residuals for
# the expression of each cistron
fit_tu_expression, cistron_expression_res = transcription.fit_rna_expression(
fit_cistron_expression
)
fit_mRNA_tu_expression = fit_tu_expression[transcription.rna_data["is_mRNA"]]
rna_expression_container[transcription.rna_data["is_mRNA"]] = (
mRNA_tu_expression_frac * normalize(fit_mRNA_tu_expression)
)
expression = normalize(rna_expression_container)
# Set number of RNAs based on expression we just set
mws = transcription.rna_data["mw"]
# Use only the rRNA/tRNA mass for rRNA/tRNA transcription units
is_rRNA = transcription.rna_data["is_rRNA"]
is_tRNA = transcription.rna_data["is_tRNA"]
mws[is_rRNA] = transcription.rna_data["rRNA_mw"][is_rRNA]
mws[is_tRNA] = transcription.rna_data["tRNA_mw"][is_tRNA]
n_rnas = totalCountFromMassesAndRatios(
total_mass_RNA, mws / sim_data.constants.n_avogadro, expression
)
bulkContainer["count"][rna_idx] = n_rnas * expression
if Km is None:
rnaLossRate = netLossRateFromDilutionAndDegradationRNALinear(
doubling_time,
transcription.rna_data["deg_rate"],
counts(bulkContainer, rna_idx),
)
else:
# Get constants to compute countsToMolar factor
cellDensity = sim_data.constants.cell_density
dryMassFraction = sim_data.mass.cell_dry_mass_fraction
cellVolume = avgCellDryMassInit / cellDensity / dryMassFraction
countsToMolar = 1 / (sim_data.constants.n_avogadro * cellVolume)
endoRNase_idx = bulk_name_to_idx(
sim_data.process.rna_decay.endoRNase_ids, bulkContainer["id"]
)
endoRNaseConc = countsToMolar * counts(bulkContainer, endoRNase_idx)
kcatEndoRNase = sim_data.process.rna_decay.kcats
totalEndoRnaseCapacity = units.sum(endoRNaseConc * kcatEndoRNase)
rnaLossRate = netLossRateFromDilutionAndDegradationRNA(
doubling_time,
(1 / countsToMolar) * totalEndoRnaseCapacity,
Km,
countsToMolar * counts(bulkContainer, rna_idx),
countsToMolar,
)
synth_prob = normalize(rnaLossRate.asNumber(1 / units.min))
return expression, synth_prob, fit_cistron_expression, cistron_expression_res
[docs]
def fitMaintenanceCosts(sim_data, bulkContainer):
"""
Fits the growth-associated maintenance (GAM) cost associated with metabolism.
The energetic costs associated with growth have been estimated utilizing flux-balance analysis
and are used with FBA to obtain accurate growth predictions. In the whole-cell model, some of
these costs are explicitly associated with the energetic costs of translation, a biomass
assembly process. Consequently we must estimate the amount of energy utilized by translation
per unit of biomass (i.e. dry mass) produced, and subtract that quantity from reported GAM to
acquire the modified GAM that we use in the metabolic submodel.
Requires
--------
- amino acid counts associated with protein monomers
- average initial dry mass
- energetic (GTP) cost of translation (per amino acid polymerized)
- observed growth-associated maintenance (GAM)
In dimensions of ATP or ATP equivalents consumed per biomass
Modifies
--------
- the "dark" ATP, i.e. the modified GAM
Notes
-----
As more non-metabolic submodels account for energetic costs, this function should be extended
to subtract those costs off the observed GAM.
There also exists, in contrast, non-growth-associated-maintenance (NGAM), which is relative to
total biomass rather than the biomass accumulation rate. As the name would imply, this
accounts for the energetic costs of maintaining the existing biomass. It is also accounted for
in the metabolic submodel.
TODO (John): Rewrite as a true function.
"""
aaCounts = sim_data.process.translation.monomer_data["aa_counts"]
protein_idx = bulk_name_to_idx(
sim_data.process.translation.monomer_data["id"], bulkContainer["id"]
)
proteinCounts = counts(bulkContainer, protein_idx)
nAvogadro = sim_data.constants.n_avogadro
avgCellDryMassInit = sim_data.mass.avg_cell_dry_mass_init
gtpPerTranslation = sim_data.constants.gtp_per_translation
atp_per_charge = (
2 # ATP -> AMP is explicitly used in charging reactions so can remove from GAM
)
# GTPs used for translation (recycled, not incorporated into biomass)
aaMmolPerGDCW = units.sum(
aaCounts * np.tile(proteinCounts.reshape(-1, 1), (1, 21)), axis=0
) * ((1 / (units.aa * nAvogadro)) * (1 / avgCellDryMassInit))
aasUsedOverCellCycle = units.sum(aaMmolPerGDCW)
explicit_mmol_maintenance_per_gdcw = (
atp_per_charge + gtpPerTranslation
) * aasUsedOverCellCycle
darkATP = ( # This has everything we can't account for
sim_data.constants.growth_associated_maintenance
- explicit_mmol_maintenance_per_gdcw
)
# We do not want to create energy with growth by having a negative darkATP
# value. GAM measurements have some error so it's possible explicit
# accounting could be more accurate or the GAM value used is too low which
# would lead to a negative value. Easy fix is setting darkATP = 0 if this
# error is raised.
if darkATP.asNumber() < 0:
raise ValueError(
"GAM has been adjusted too low. Explicit energy accounting should not exceed GAM."
" Consider setting darkATP to 0 if energy corrections are accurate."
)
sim_data.constants.darkATP = darkATP
[docs]
def calculateBulkDistributions(
sim_data, expression, concDict, avgCellDryMassInit, doubling_time
):
"""
Finds a distribution of copy numbers for macromolecules. While RNA and protein
expression can be approximated using well-described statistical distributions,
complexes require absolute copy numbers. To get these distributions, this
function instantiates many cells with a reduced set of molecules, forms complexes,
and iterates through equilibrium and two-component system processes until
metabolite counts reach a steady-state. It then computes the resulting
statistical distributions.
Requires
--------
- N_SEEDS (int) - the number of instantiated cells
Inputs
------
- expression (array of floats) - expression for each RNA, normalized to 1
- concDict {metabolite (str): concentration (float with units of mol/volume)} -
dictionary for concentrations of each metabolite with location tag
- avgCellDryMassInit (float with units of mass) - initial dry cell mass
- doubling_time (float with units of time) - doubling time for condition
Returns
--------
- bulkAverageContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the mean of the counts of all bulk molecules
- bulkDeviationContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the standard deviation of the counts of all bulk molecules
- proteinMonomerAverageContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the mean of the counts of all protein monomers
- proteinMonomerDeviationContainer (np.ndarray object) - Two columns: 'id' for name and 'count'
for the standard deviation of the counts of all protein monomers
"""
# Ids
totalCount_RNA, ids_rnas, distribution_RNA = totalCountIdDistributionRNA(
sim_data, expression, doubling_time
)
totalCount_protein, ids_protein, distribution_protein = (
totalCountIdDistributionProtein(sim_data, expression, doubling_time)
)
ids_complex = sim_data.process.complexation.molecule_names
ids_equilibrium = sim_data.process.equilibrium.molecule_names
ids_twoComponentSystem = sim_data.process.two_component_system.molecule_names
ids_metabolites = sorted(concDict)
conc_metabolites = (units.mol / units.L) * np.array(
[concDict[key].asNumber(units.mol / units.L) for key in ids_metabolites]
)
allMoleculesIDs = sorted(
set(ids_rnas)
| set(ids_protein)
| set(ids_complex)
| set(ids_equilibrium)
| set(ids_twoComponentSystem)
| set(ids_metabolites)
)
# Data for complexation
complexationStoichMatrix = sim_data.process.complexation.stoich_matrix().astype(
np.int64, order="F"
)
# Data for equilibrium binding
# equilibriumDerivatives = sim_data.process.equilibrium.derivatives
# equilibriumDerivativesJacobian = sim_data.process.equilibrium.derivativesJacobian
# Data for metabolites
cellDensity = sim_data.constants.cell_density
cellVolume = avgCellDryMassInit / cellDensity / sim_data.mass.cell_dry_mass_fraction
# Construct bulk container
# We want to know something about the distribution of the copy numbers of
# macromolecules in the cell. While RNA and protein expression can be
# approximated using well-described statistical distributions, we need
# absolute copy numbers to form complexes. To get a distribution, we must
# instantiate many cells, form complexes, and finally compute the
# statistics we will use in the fitting operations.
bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data.struct_array["id"]
bulkContainer = np.array(
[mol_data for mol_data in zip(bulk_ids, np.zeros(len(bulk_ids)))],
dtype=[("id", bulk_ids.dtype), ("count", int)],
)
rna_idx = bulk_name_to_idx(ids_rnas, bulkContainer["id"])
protein_idx = bulk_name_to_idx(ids_protein, bulkContainer["id"])
complexation_molecules_idx = bulk_name_to_idx(ids_complex, bulkContainer["id"])
equilibrium_molecules_idx = bulk_name_to_idx(ids_equilibrium, bulkContainer["id"])
two_component_system_molecules_idx = bulk_name_to_idx(
ids_twoComponentSystem, bulkContainer["id"]
)
metabolites_idx = bulk_name_to_idx(ids_metabolites, bulkContainer["id"])
all_molecules_idx = bulk_name_to_idx(allMoleculesIDs, bulkContainer["id"])
allMoleculeCounts = np.empty((N_SEEDS, len(allMoleculesIDs)), np.int64)
proteinMonomerCounts = np.empty((N_SEEDS, len(ids_protein)), np.int64)
if VERBOSE > 1:
print("Bulk distribution seed:")
# Instantiate cells to find average copy numbers of macromolecules
for seed in range(N_SEEDS):
if VERBOSE > 1:
print("seed = {}".format(seed))
bulkContainer["count"][all_molecules_idx] = 0
bulkContainer["count"][rna_idx] = totalCount_RNA * distribution_RNA
bulkContainer["count"][protein_idx] = totalCount_protein * distribution_protein
proteinMonomerCounts[seed, :] = counts(bulkContainer, protein_idx)
complexationMoleculeCounts = counts(bulkContainer, complexation_molecules_idx)
# Form complexes
time_step = 2**31 # don't stop until all complexes are formed.
complexation_rates = sim_data.process.complexation.rates
system = StochasticSystem(complexationStoichMatrix.T, random_seed=seed)
complexation_result = system.evolve(
time_step, complexationMoleculeCounts, complexation_rates
)
updatedCompMoleculeCounts = complexation_result["outcome"]
bulkContainer["count"][complexation_molecules_idx] = updatedCompMoleculeCounts
metDiffs = np.inf * np.ones_like(counts(bulkContainer, metabolites_idx))
nIters = 0
# Iterate processes until metabolites converge to a steady-state
while np.linalg.norm(metDiffs, np.inf) > 1:
random_state = np.random.RandomState(seed)
metCounts = conc_metabolites * cellVolume * sim_data.constants.n_avogadro
metCounts.normalize()
metCounts.checkNoUnit()
bulkContainer["count"][metabolites_idx] = metCounts.asNumber().round()
# Find reaction fluxes from equilibrium process
# Do not use jit to avoid compiling time (especially when running
# in parallel since sim_data needs to be pickled and reconstructed
# each time)
rxnFluxes, _ = sim_data.process.equilibrium.fluxes_and_molecules_to_SS(
bulkContainer["count"][equilibrium_molecules_idx],
cellVolume.asNumber(units.L),
sim_data.constants.n_avogadro.asNumber(1 / units.mol),
random_state,
jit=False,
)
bulkContainer["count"][equilibrium_molecules_idx] += np.dot(
sim_data.process.equilibrium.stoich_matrix().astype(np.int64),
rxnFluxes.astype(np.int64),
)
assert np.all(bulkContainer["count"][equilibrium_molecules_idx] >= 0)
# Find changes from two component system
_, moleculeCountChanges = (
sim_data.process.two_component_system.molecules_to_ss(
bulkContainer["count"][two_component_system_molecules_idx],
cellVolume.asNumber(units.L),
sim_data.constants.n_avogadro.asNumber(1 / units.mmol),
)
)
bulkContainer["count"][two_component_system_molecules_idx] += (
moleculeCountChanges.astype(np.int64)
)
metDiffs = (
bulkContainer["count"][metabolites_idx] - metCounts.asNumber().round()
)
nIters += 1
if nIters > 100:
raise Exception("Equilibrium reactions are not converging!")
allMoleculeCounts[seed, :] = counts(bulkContainer, all_molecules_idx)
# Update counts in bulk objects container
bulkAverageContainer = np.array(
[mol_data for mol_data in zip(bulk_ids, np.zeros(len(bulk_ids)))],
dtype=[("id", bulk_ids.dtype), ("count", np.float64)],
)
bulkDeviationContainer = np.array(
[mol_data for mol_data in zip(bulk_ids, np.zeros(len(bulk_ids)))],
dtype=[("id", bulk_ids.dtype), ("count", np.float64)],
)
monomer_ids = sim_data.process.translation.monomer_data["id"]
proteinMonomerAverageContainer = np.array(
[mol_data for mol_data in zip(monomer_ids, np.zeros(len(monomer_ids)))],
dtype=[("id", monomer_ids.dtype), ("count", np.float64)],
)
proteinMonomerDeviationContainer = np.array(
[mol_data for mol_data in zip(monomer_ids, np.zeros(len(monomer_ids)))],
dtype=[("id", monomer_ids.dtype), ("count", np.float64)],
)
bulkAverageContainer["count"][all_molecules_idx] = allMoleculeCounts.mean(0)
bulkDeviationContainer["count"][all_molecules_idx] = allMoleculeCounts.std(0)
proteinMonomerAverageContainer["count"] = proteinMonomerCounts.mean(0)
proteinMonomerDeviationContainer["count"] = proteinMonomerCounts.std(0)
return (
bulkAverageContainer,
bulkDeviationContainer,
proteinMonomerAverageContainer,
proteinMonomerDeviationContainer,
)
# Math functions
[docs]
def totalCountFromMassesAndRatios(totalMass, individualMasses, distribution):
"""
Function to determine the expected total counts for a group of molecules
in order to achieve a total mass with a given distribution of individual
molecules.
Math:
Total mass = dot(mass, count)
Fraction of i:
f = count / Total counts
Substituting:
Total mass = dot(mass, f * Total counts)
Total mass = Total counts * dot(mass, f)
Total counts = Total mass / dot(mass, f)
Requires
--------
- totalMass (float with mass units): total mass of the group of molecules
- individualMasses (array of floats with mass units): mass for individual
molecules in the group
- distribution (array of floats): distribution of individual molecules,
normalized to 1
Returns
--------
- counts (float): total counts (does not need to be a whole number)
"""
assert np.allclose(np.sum(distribution), 1)
counts = 1 / units.dot(individualMasses, distribution) * totalMass
return units.strip_empty_units(counts)
[docs]
def proteinDistributionFrommRNA(
distribution_mRNA, translation_efficiencies, netLossRate
):
"""
dP_i / dt = k * M_i * e_i - P_i * Loss_i
At steady state:
P_i = k * M_i * e_i / Loss_i
Fraction of mRNA for ith gene is defined as:
f_i = M_i / M_total
Substituting in:
P_i = k * f_i * e_i * M_total / Loss_i
Normalizing P_i by summing over all i cancels out k and M_total
assuming constant translation rate.
Inputs
------
- distribution_mRNA (array of floats) - distribution for each mRNA,
normalized to 1
- translation_efficiencies (array of floats) - translational efficiency for each mRNA,
normalized to 1
- netLossRate (array of floats with units of 1/time) - rate of loss for each protein
Returns
-------
- array of floats for the distribution of each protein, normalized to 1
"""
assert np.allclose(np.sum(distribution_mRNA), 1)
assert np.allclose(np.sum(translation_efficiencies), 1)
distributionUnnormed = (
1 / netLossRate * distribution_mRNA * translation_efficiencies
)
distributionNormed = distributionUnnormed / units.sum(distributionUnnormed)
distributionNormed.normalize()
distributionNormed.checkNoUnit()
return distributionNormed.asNumber()
[docs]
def mRNADistributionFromProtein(
distribution_protein, translation_efficiencies, netLossRate
):
"""
dP_i / dt = k * M_i * e_i - P_i * Loss_i
At steady state:
M_i = Loss_i * P_i / (k * e_i)
Fraction of protein for ith gene is defined as:
f_i = P_i / P_total
Substituting in:
M_i = Loss_i * f_i * P_total / (k * e_i)
Normalizing M_i by summing over all i cancels out k and P_total
assuming a constant translation rate.
Inputs
------
- distribution_protein (array of floats) - distribution for each protein,
normalized to 1
- translation_efficiencies (array of floats) - translational efficiency for each mRNA,
normalized to 1
- netLossRate (array of floats with units of 1/time) - rate of loss for each protein
Returns
-------
- array of floats for the distribution of each mRNA, normalized to 1
"""
assert np.allclose(np.sum(distribution_protein), 1)
distributionUnnormed = netLossRate * distribution_protein / translation_efficiencies
distributionNormed = distributionUnnormed / units.sum(distributionUnnormed)
distributionNormed.normalize()
distributionNormed.checkNoUnit()
return distributionNormed.asNumber()
[docs]
def calculateMinPolymerizingEnzymeByProductDistribution(
productLengths, elongationRates, netLossRate, productCounts
):
"""
Compute the number of ribosomes required to maintain steady state.
dP/dt = production rate - loss rate
dP/dt = e_r * (1/L) * R - (k_loss * P)
At steady state: dP/dt = 0
R = sum over i ((L_i / e_r) * k_loss_i * P_i)
Multiplying both sides by volume gives an equation in terms of counts.
P = protein concentration
e_r = polypeptide elongation rate per ribosome
L = protein length
R = ribosome concentration
k_loss = net protein loss rate
i = ith protein
Inputs
------
- productLengths (array of ints with units of amino_acids) - L, protein lengths
- elongationRates (array of ints with units of amino_acid/time) e_r, polypeptide elongation rate
- netLossRate (array of floats with units of 1/time) - k_loss, protein loss rate
- productCounts (array of floats) - P, protein counts
Returns
--------
- float with dimensionless units for the number of ribosomes required to
maintain steady state
"""
nPolymerizingEnzymeNeeded = units.sum(
productLengths / elongationRates * netLossRate * productCounts
)
return nPolymerizingEnzymeNeeded
[docs]
def calculateMinPolymerizingEnzymeByProductDistributionRNA(
productLengths, elongationRates, netLossRate
):
"""
Compute the number of RNA polymerases required to maintain steady state of mRNA.
dR/dt = production rate - loss rate
dR/dt = e_r * (1/L) * RNAp - k_loss
At steady state: dR/dt = 0
RNAp = sum over i ((L_i / e_r) * k_loss_i)
Multiplying both sides by volume gives an equation in terms of counts.
R = mRNA transcript concentration
e_r = transcript elongation rate per RNAp
L = transcript length
RNAp = RNAp concentration
k_loss = net transcript loss rate (unit: concentration / time)
i = ith transcript
Inputs
------
- productLengths (array of ints with units of nucleotides) - L, transcript lengths
- elongationRates (array of ints with units of nucleotide/time) - e_r, transcript elongation rate
- netLossRate (array of floats with units of 1/time) - k_loss, transcript loss rate
Returns
-------
- float with dimensionless units for the number of RNA polymerases required to
maintain steady state
"""
nPolymerizingEnzymeNeeded = units.sum(
productLengths / elongationRates * netLossRate
)
return nPolymerizingEnzymeNeeded
[docs]
def netLossRateFromDilutionAndDegradationProtein(doublingTime, degradationRates):
"""
Compute total loss rate (summed contributions of degradation and dilution).
Inputs
------
- doublingTime (float with units of time) - doubling time of the cell
- degradationRates (array of floats with units of 1/time) - protein degradation rate
Returns
--------
- array of floats with units of 1/time for the total loss rate for each protein
"""
return np.log(2) / doublingTime + degradationRates
[docs]
def netLossRateFromDilutionAndDegradationRNA(
doublingTime, totalEndoRnaseCountsCapacity, Km, rnaConc, countsToMolar
):
"""
Compute total loss rate (summed impact of degradation and dilution).
Returns the loss rate in units of (counts/time) in preparation for use in
the steady state analysis in fitExpression() and
setRNAPCountsConstrainedByPhysiology()
(see calculateMinPolymerizingEnzymeByProductDistributionRNA()).
Derived from steady state analysis of Michaelis-Menten enzyme kinetics with
competitive inhibition: for a given RNA, all other RNAs compete for RNase.
V_i = k_cat * [ES_i]
v_i = k_cat * [E]0 * ([S_i]/Km_i) / (1 + sum over j genes([S_j] / Km_j))
Inputs
------
- doublingTime (float with units of time) - doubling time of the cell
- totalEndoRnaseCountsCapacity (float with units of 1/time) total kinetic
capacity of all RNases in the cell
- Km (array of floats with units of mol/volume) - Michaelis-Menten constant
for each RNA
- rnaConc (array of floats with units of mol/volume) - concentration for each RNA
- countsToMolar (float with units of mol/volume) - conversion between counts and molar
Returns
--------
- array of floats with units of 1/time for the total loss rate for each RNA
"""
fracSaturated = rnaConc / Km / (1 + units.sum(rnaConc / Km))
rnaCounts = (1 / countsToMolar) * rnaConc
return (np.log(2) / doublingTime) * rnaCounts + (
totalEndoRnaseCountsCapacity * fracSaturated
)
[docs]
def netLossRateFromDilutionAndDegradationRNALinear(
doublingTime, degradationRates, rnaCounts
):
"""
Compute total loss rate (summed contributions of degradation and dilution).
Returns the loss rate in units of (counts/time) in preparation for use in
the steady state analysis in fitExpression() and
setRNAPCountsConstrainedByPhysiology()
(see calculateMinPolymerizingEnzymeByProductDistributionRNA()).
Requires
--------
- doublingTime (float with units of time) - doubling time of the cell
- degradationRates (array of floats with units of 1/time) - degradation rate
for each RNA
- rnaCounts (array of floats) - counts for each RNA
Returns
--------
- array of floats with units of 1/time for the total loss rate for each RNA
"""
return (np.log(2) / doublingTime + degradationRates) * rnaCounts
[docs]
def expressionFromConditionAndFoldChange(transcription, condPerturbations, tfFCs):
"""
Adjusts expression of RNA based on fold changes from basal for a given
condition. Since fold changes are reported for individual RNA cistrons, the
changes are applied to the basal expression levels of each cistron and the
resulting vector is mapped back to RNA expression through nonnegative least
squares. For genotype perturbations, the expression of all RNAs that include
the given cistron are set to the given value.
Inputs
------
- transcription: Instance of the Transcription class from
reconstruction.ecoli.dataclasses.process.transcription
- condPerturbations {cistron ID (str): fold change (float)} -
dictionary of fold changes for cistrons based on the given condition
- tfFCs {cistron ID (str): fold change (float)} -
dictionary of fold changes for cistrons based on transcription factors
in the given condition
Returns
--------
- expression (array of floats) - adjusted expression for each RNA,
normalized to 1
Notes
-----
- TODO (Travis) - Might not properly handle if an RNA is adjusted from both a
perturbation and a transcription factor, currently RNA self regulation is not
included in tfFCs
"""
cistron_ids = transcription.cistron_data["id"]
cistron_expression = transcription.fit_cistron_expression["basal"].copy()
# Gather indices and fold changes for each cistron that will be adjusted
cistron_id_to_index = {cistron_id: i for (i, cistron_id) in enumerate(cistron_ids)}
cistron_indexes = []
cistron_fcs = []
# Compile indexes and fold changes of each cistron
for cistron_id, fc_value in tfFCs.items():
if cistron_id in condPerturbations:
continue
cistron_indexes.append(cistron_id_to_index[cistron_id])
cistron_fcs.append(fc_value)
def apply_fcs_to_expression(expression, indexes, fcs):
"""
Applys the fold-change values to an expression vector while keeping the
sum of expression values at one.
Args:
expression (np.ndarray of floats): Original expression vector of
cistrons or RNAs
indexes (List of floats): Indexes of cistrons/RNAs that the
fold-changes should be applied to
fcs (List of floats): Fold-changes of cistron/RNA expression
"""
fcs = [fc for (idx, fc) in sorted(zip(indexes, fcs), key=lambda pair: pair[0])]
indexes = [
idx for (idx, fc) in sorted(zip(indexes, fcs), key=lambda pair: pair[0])
]
# Adjust expression based on fold change and normalize
indexes_bool = np.zeros(len(expression), dtype=bool)
indexes_bool[indexes] = 1
fcs = np.array(fcs)
scaleTheRestBy = (1.0 - (expression[indexes] * fcs).sum()) / (
1.0 - (expression[indexes]).sum()
)
expression[indexes_bool] *= fcs
expression[~indexes_bool] *= scaleTheRestBy
return expression
cistron_expression = apply_fcs_to_expression(
cistron_expression, cistron_indexes, cistron_fcs
)
# Use NNLS to map new cistron expression to RNA expression
expression, _ = transcription.fit_rna_expression(cistron_expression)
expression = normalize(expression)
# Apply genotype perturbations to all RNAs that contain each cistron
rna_indexes = []
rna_fcs = []
cistron_perturbation_indexes = []
cistron_perturbation_values = []
for cistron_id, perturbation_value in condPerturbations.items():
rna_indexes_with_cistron = transcription.cistron_id_to_rna_indexes(cistron_id)
rna_indexes.extend(rna_indexes_with_cistron)
rna_fcs.extend([perturbation_value] * len(rna_indexes_with_cistron))
cistron_perturbation_indexes.append(cistron_id_to_index[cistron_id])
cistron_perturbation_values.append(perturbation_value)
expression = apply_fcs_to_expression(expression, rna_indexes, rna_fcs)
# Also apply perturbations to cistrons for bookkeeping purposes
cistron_expression = apply_fcs_to_expression(
cistron_expression, cistron_perturbation_indexes, cistron_perturbation_values
)
return expression, cistron_expression
[docs]
def fitLigandConcentrations(sim_data, cell_specs):
"""
Using the fit values of pPromoterBound, updates the set concentrations of
ligand metabolites and the kd's of the ligand-TF binding reactions.
Requires
--------
- Fitted pPromoterBound: probabilities that a TF will bind to its promoter,
fit by function fitPromoterBoundProbability().
Inputs
------
- cell_specs {condition (str): dict} - information about each condition
Modifies
--------
- Set concentrations of metabolites that are ligands in 1CS
- kd's of equilibrium reactions in 1CS
"""
cellDensity = sim_data.constants.cell_density
pPromoterBound = sim_data.pPromoterBound
for tf in sorted(sim_data.tf_to_active_inactive_conditions):
# Skip TFs that are not 1CS or are linked to genotypic perturbations
if sim_data.process.transcription_regulation.tf_to_tf_type[tf] != "1CS":
continue
if (
len(
sim_data.tf_to_active_inactive_conditions[tf][
"active genotype perturbations"
]
)
> 0
or len(
sim_data.tf_to_active_inactive_conditions[tf][
"inactive genotype perturbations"
]
)
> 0
):
continue
activeKey = tf + "__active"
inactiveKey = tf + "__inactive"
# Determine if metabolite-bound form of the TF is the active form
boundId = sim_data.process.transcription_regulation.active_to_bound[tf]
negativeSignal = tf != boundId # True if unbound form is the active TF
# Calculate kd of bound TF
fwdRate = sim_data.process.equilibrium.get_fwd_rate(boundId + "[c]")
revRate = sim_data.process.equilibrium.get_rev_rate(boundId + "[c]")
kd = revRate / fwdRate
# Get the metabolite that binds to the TF and its stoich coefficient
metabolite = sim_data.process.equilibrium.get_metabolite(boundId + "[c]")
metaboliteCoeff = sim_data.process.equilibrium.get_metabolite_coeff(
boundId + "[c]"
)
# Calculate the concentrations of the metabolite under conditions where
# TF is active and inactive
metabolite_idx = bulk_name_to_idx(
metabolite, cell_specs[activeKey]["bulkAverageContainer"]["id"]
)
activeCellVolume = (
cell_specs[activeKey]["avgCellDryMassInit"]
/ cellDensity
/ sim_data.mass.cell_dry_mass_fraction
)
activeCountsToMolar = 1 / (sim_data.constants.n_avogadro * activeCellVolume)
activeSignalConc = (
activeCountsToMolar
* counts(cell_specs[activeKey]["bulkAverageContainer"], metabolite_idx)
).asNumber(units.mol / units.L)
inactiveCellVolume = (
cell_specs[inactiveKey]["avgCellDryMassInit"]
/ cellDensity
/ sim_data.mass.cell_dry_mass_fraction
)
inactiveCountsToMolar = 1 / (sim_data.constants.n_avogadro * inactiveCellVolume)
inactiveSignalConc = (
inactiveCountsToMolar
* counts(cell_specs[inactiveKey]["bulkAverageContainer"], metabolite_idx)
).asNumber(units.mol / units.L)
# Update kd with fitted values of P and the bulk average concentrations
# of the metabolite, and use this fitted kd to recalculate the set
# amounts of the metabolite in metabolism
p_active = pPromoterBound[activeKey][tf]
p_inactive = pPromoterBound[inactiveKey][tf]
if negativeSignal:
if p_inactive == 0:
raise ValueError(
"Inf ligand concentration from p_inactive = 0."
" Check results from fitPromoterBoundProbability and Kd values."
)
if 1 - p_active < 1e-9:
kdNew = kd # Concentration of metabolite-bound TF is negligible
else:
kdNew = (
(activeSignalConc**metaboliteCoeff) * p_active / (1 - p_active)
) ** (1 / metaboliteCoeff)
# Reset metabolite concentration with fitted P and kd
sim_data.process.metabolism.concentration_updates.molecule_set_amounts[
metabolite
] = (kdNew**metaboliteCoeff * (1 - p_inactive) / p_inactive) ** (
1.0 / metaboliteCoeff
) * (units.mol / units.L)
else:
if p_active == 1:
raise ValueError(
"Inf ligand concentration from p_active = 1."
" Check results from fitPromoterBoundProbability and Kd values."
)
if p_inactive < 1e-9:
kdNew = kd # Concentration of metabolite-bound TF is negligible
else:
kdNew = (
(inactiveSignalConc**metaboliteCoeff)
* (1 - p_inactive)
/ p_inactive
) ** (1 / metaboliteCoeff)
# Reset metabolite concentration with fitted P and kd
sim_data.process.metabolism.concentration_updates.molecule_set_amounts[
metabolite
] = (kdNew**metaboliteCoeff * p_active / (1 - p_active)) ** (
1.0 / metaboliteCoeff
) * (units.mol / units.L)
# Fit reverse rate in line with fitted kd
sim_data.process.equilibrium.set_rev_rate(boundId + "[c]", kdNew * fwdRate)
[docs]
def calculateRnapRecruitment(sim_data, cell_specs):
"""
Constructs the basal_prob vector and delta_prob matrix from values of r.
The basal_prob vector holds the basal transcription probabilities of each
transcription unit. The delta_prob matrix holds the differences in
transcription probabilities when transcription factors bind to the
promoters of each transcription unit. Both values are stored in sim_data.
Requires
--------
- cell_specs['basal']:
- ['r_vector']: Fit parameters on how the recruitment of a TF affects the expression
of a gene. High (positive) values of r indicate that the TF binding
increases the probability that the gene is expressed.
- ['r_columns']: mapping of column name to index in r
Modifies
--------
- Rescales values in basal_prob such that all values are positive
- Adds basal_prob and delta_prob arrays to sim_data
"""
r = cell_specs["basal"]["r_vector"]
col_names_to_index = cell_specs["basal"]["r_columns"]
# Get list of transcription units and TF IDs
transcription = sim_data.process.transcription
transcription_regulation = sim_data.process.transcription_regulation
all_TUs = transcription.rna_data["id"]
all_tfs = transcription_regulation.tf_ids
# Initialize basal_prob vector and delta_prob sparse matrix
basal_prob = np.zeros(len(all_TUs))
deltaI, deltaJ, deltaV = [], [], []
for rna_idx, rnaId in enumerate(all_TUs):
rnaIdNoLoc = rnaId[:-3] # Remove compartment ID from RNA ID
# Take only those TFs with active/inactive conditions data
for tf in sim_data.relation.rna_id_to_regulating_tfs.get(rnaId, []):
if tf not in sorted(sim_data.tf_to_active_inactive_conditions):
continue
colName = rnaIdNoLoc + "__" + tf
# Set element in delta to value in r that corresponds to the
# transcription unit of the row, and the TF of the column
deltaI.append(rna_idx)
deltaJ.append(all_tfs.index(tf))
deltaV.append(r[col_names_to_index[colName]])
# Add alpha column for each RNA
colName = rnaIdNoLoc + "__alpha"
# Set element in basal_prob to the transcription unit's value for alpha
basal_prob[rna_idx] = r[col_names_to_index[colName]]
# Convert to arrays
deltaI, deltaJ, deltaV = np.array(deltaI), np.array(deltaJ), np.array(deltaV)
delta_shape = (len(all_TUs), len(all_tfs))
# Adjust any negative basal probabilities to 0
basal_prob[basal_prob < 0] = 0
# Add basal_prob vector and delta_prob matrix to sim_data
transcription_regulation.basal_prob = basal_prob
transcription_regulation.delta_prob = {
"deltaI": deltaI,
"deltaJ": deltaJ,
"deltaV": deltaV,
"shape": delta_shape,
}
[docs]
def crc32(*arrays: np.ndarray, initial: int = 0) -> int:
"""Return a CRC32 checksum of the given ndarrays."""
def crc_next(initial: int, array: np.ndarray) -> int:
shape = str(array.shape).encode()
values = array.tobytes()
return binascii.crc32(values, binascii.crc32(shape, initial))
return functools.reduce(crc_next, arrays, initial)
[docs]
def setKmCooperativeEndoRNonLinearRNAdecay(sim_data, bulkContainer):
"""
Fits the affinities (Michaelis-Menten constants) for RNAs binding to
endoRNAses.
EndoRNAses perform the first step of RNA decay by cleaving a whole RNA
somewhere inside its extent. This results in RNA fragments, which are then
digested into monomers by exoRNAses. To model endoRNAse activity, we need to
determine an affinity (Michaelis-Menten constant) for each RNA that is
consistent with experimentally observed half-lives. The Michaelis-Menten
constants must be determined simultaneously, as the RNAs must compete for
the active site of the endoRNAse. (See the RnaDegradation Process class for
more information about the dynamical model.) The parameters are estimated
using a root solver (scipy.optimize.fsolve). (See the
sim_data.process.rna_decay.kmLossFunction method for more information about
the optimization problem.)
Requires
--------
- cell density, dry mass fraction, and average initial dry mass
Used to calculate the cell volume, which in turn is used to calculate
concentrations.
- observed RNA degradation rates (half-lives)
- endoRNAse counts
- endoRNAse catalytic rate constants
- RNA counts
- boolean options that enable sensitivity analyses (see Notes below)
Modifies
--------
- Michaelis-Menten constants for first-order decay (initially set to zeros)
- Several optimization-related values
Sensitivity analyses (optional, see Notes below)
Terminal values for optimization-related functions
Returns
-------
- enoRNAse Km values, in units of M
Notes
-----
If certain options are set, a sensitivity analysis will be performed using a
range of metaparameters. Outputs will be cached and utilized instead of
running the optimization if possible.
The function that generates the optimization functions is defined under
sim_data but has no dependency on sim_data, and therefore could be moved
here or elsewhere. (TODO)
TODO (John): Refactor as a pure function.
TODO (John): Why is this function called 'cooperative'? It seems to instead
assume and model competitive binding.
TODO (John): Determine what part (if any) of the 'linear' parameter fitting
should be retained.
"""
def arrays_differ(a: np.ndarray, b: np.ndarray) -> bool:
return a.shape != b.shape or not np.allclose(a, b, equal_nan=True)
cellDensity = sim_data.constants.cell_density
cellVolume = (
sim_data.mass.avg_cell_dry_mass_init
/ cellDensity
/ sim_data.mass.cell_dry_mass_fraction
)
countsToMolar = 1 / (sim_data.constants.n_avogadro * cellVolume)
degradable_rna_ids = np.concatenate(
(
sim_data.process.transcription.rna_data["id"],
sim_data.process.transcription.mature_rna_data["id"],
)
)
degradation_rates = (1 / units.s) * np.concatenate(
(
sim_data.process.transcription.rna_data["deg_rate"].asNumber(1 / units.s),
sim_data.process.transcription.mature_rna_data["deg_rate"].asNumber(
1 / units.s
),
)
)
endoRNase_idx = bulk_name_to_idx(
sim_data.process.rna_decay.endoRNase_ids, bulkContainer["id"]
)
endoRNaseConc = countsToMolar * counts(bulkContainer, endoRNase_idx)
kcatEndoRNase = sim_data.process.rna_decay.kcats
totalEndoRnaseCapacity = units.sum(endoRNaseConc * kcatEndoRNase)
endoRnaseRnaIds = sim_data.molecule_groups.endoRNase_rnas
isEndoRnase = np.array([(x in endoRnaseRnaIds) for x in degradable_rna_ids])
degradable_rna_idx = bulk_name_to_idx(degradable_rna_ids, bulkContainer["id"])
rna_counts = counts(bulkContainer, degradable_rna_idx)
rna_conc = countsToMolar * rna_counts
Km_counts = ((1 / degradation_rates * totalEndoRnaseCapacity) - rna_conc).asNumber()
sim_data.process.rna_decay.Km_first_order_decay = Km_counts
# Residuals can be written as follows: Res = f(Km) = 0, then Km = g(Km)
# Compute derivative g(Km) in counts:
KmQuadratic = 1 / np.power((1 / countsToMolar * Km_counts).asNumber(), 2)
denominator = np.power(
np.sum(rna_counts / (1 / countsToMolar * Km_counts).asNumber()), 2
)
numerator = (1 / countsToMolar * totalEndoRnaseCapacity).asNumber() * (
denominator - (rna_counts / (1 / countsToMolar * Km_counts).asNumber())
)
gDerivative = np.abs(KmQuadratic * (1 - (numerator / denominator)))
if VERBOSE:
print("Max derivative (counts) = %f" % max(gDerivative))
# Compute derivative g(Km) in concentrations:
KmQuadratic = 1 / np.power(Km_counts, 2)
denominator = np.power(np.sum(rna_conc.asNumber() / Km_counts), 2)
numerator = totalEndoRnaseCapacity.asNumber() * (
denominator - (rna_conc.asNumber() / Km_counts)
)
gDerivative = np.abs(KmQuadratic * (1 - (numerator / denominator)))
if VERBOSE:
print("Max derivative (concentration) = %f" % max(gDerivative))
# Sensitivity analysis: alpha (regularization term)
Alphas = []
if sim_data.constants.sensitivity_analysis_alpha:
Alphas = [0.0001, 0.001, 0.01, 0.1, 1, 10]
total_endo_rnase_capacity_mol_l_s = totalEndoRnaseCapacity.asNumber(
units.mol / units.L / units.s
)
rna_conc_mol_l = rna_conc.asNumber(units.mol / units.L)
degradation_rates_s = degradation_rates.asNumber(1 / units.s)
for alpha in Alphas:
if VERBOSE:
print("Alpha = %f" % alpha)
LossFunction, Rneg, R, LossFunctionP, R_aux, L_aux, Lp_aux, Jacob, Jacob_aux = (
sim_data.process.rna_decay.km_loss_function(
total_endo_rnase_capacity_mol_l_s,
rna_conc_mol_l,
degradation_rates_s,
isEndoRnase,
alpha,
)
)
Km_cooperative_model = scipy.optimize.fsolve(
LossFunction, Km_counts, fprime=LossFunctionP
)
sim_data.process.rna_decay.sensitivity_analysis_alpha_residual[alpha] = np.sum(
np.abs(R_aux(Km_cooperative_model))
)
sim_data.process.rna_decay.sensitivity_analysis_alpha_regulari_neg[alpha] = (
np.sum(np.abs(Rneg(Km_cooperative_model)))
)
alpha = 0.5
# Sensitivity analysis: kcatEndoRNase
kcatEndo = []
if sim_data.constants.sensitivity_analysis_kcat_endo:
kcatEndo = [0.0001, 0.001, 0.01, 0.1, 1, 10]
for kcat in kcatEndo:
if VERBOSE:
print("Kcat = %f" % kcat)
totalEndoRNcap = units.sum(endoRNaseConc * kcat)
LossFunction, Rneg, R, LossFunctionP, R_aux, L_aux, Lp_aux, Jacob, Jacob_aux = (
sim_data.process.rna_decay.km_loss_function(
totalEndoRNcap.asNumber(units.mol / units.L),
rna_conc_mol_l,
degradation_rates_s,
isEndoRnase,
alpha,
)
)
KmcountsIni = (
(totalEndoRNcap / degradation_rates.asNumber()) - rna_conc
).asNumber()
Km_cooperative_model = scipy.optimize.fsolve(
LossFunction, KmcountsIni, fprime=LossFunctionP
)
sim_data.process.rna_decay.sensitivity_analysis_kcat[kcat] = (
Km_cooperative_model
)
sim_data.process.rna_decay.sensitivity_analysis_kcat_res_ini[kcat] = np.sum(
np.abs(R_aux(Km_counts))
)
sim_data.process.rna_decay.sensitivity_analysis_kcat_res_opt[kcat] = np.sum(
np.abs(R_aux(Km_cooperative_model))
)
# Loss function, and derivative
LossFunction, Rneg, R, LossFunctionP, R_aux, L_aux, Lp_aux, Jacob, Jacob_aux = (
sim_data.process.rna_decay.km_loss_function(
total_endo_rnase_capacity_mol_l_s,
rna_conc_mol_l,
degradation_rates_s,
isEndoRnase,
alpha,
)
)
# The checksum in the filename picks independent caches for distinct cases
# such as different Parca options or Parca code in different git branches.
# `make clean` will delete the cache files.
needToUpdate = ""
cache_dir = filepath.makedirs(filepath.ROOT_PATH, "cache")
checksum = crc32(Km_counts, isEndoRnase, np.array(alpha))
km_filepath = os.path.join(cache_dir, f"parca-km-{checksum}.cPickle")
if os.path.exists(km_filepath):
with open(km_filepath, "rb") as f:
Km_cache = pickle.load(f)
# KmCooperativeModel fits a set of Km values to give the expected degradation rates.
# It takes 1.5 - 3 minutes to recompute.
# R_aux calculates the difference of the degradation rate based on these
# Km values and the expected rate so this sum seems like a good test of
# whether the cache fits current input data, but cross-check additional
# inputs to avoid Issue #996.
Km_cooperative_model = Km_cache["Km_cooperative_model"]
if (
Km_counts.shape != Km_cooperative_model.shape
or np.sum(np.abs(R_aux(Km_cooperative_model))) > 1e-15
or arrays_differ(
Km_cache["total_endo_rnase_capacity_mol_l_s"],
total_endo_rnase_capacity_mol_l_s,
)
or arrays_differ(Km_cache["rna_conc_mol_l"], rna_conc_mol_l)
or arrays_differ(Km_cache["degradation_rates_s"], degradation_rates_s)
):
needToUpdate = "recompute"
else:
needToUpdate = "compute"
if needToUpdate:
if VERBOSE:
print(f"Running non-linear optimization to {needToUpdate} {km_filepath}")
Km_cooperative_model = scipy.optimize.fsolve(
LossFunction, Km_counts, fprime=LossFunctionP
)
Km_cache = dict(
Km_cooperative_model=Km_cooperative_model,
total_endo_rnase_capacity_mol_l_s=total_endo_rnase_capacity_mol_l_s,
rna_conc_mol_l=rna_conc_mol_l,
degradation_rates_s=degradation_rates_s,
)
with open(km_filepath, "wb") as f:
pickle.dump(Km_cache, f, protocol=pickle.HIGHEST_PROTOCOL)
else:
if VERBOSE:
print(
"Not running non-linear optimization--using cached result {}".format(
km_filepath
)
)
if VERBOSE > 1:
print(
"Loss function (Km inital) = %f" % np.sum(np.abs(LossFunction(Km_counts)))
)
print(
"Loss function (optimized Km) = %f"
% np.sum(np.abs(LossFunction(Km_cooperative_model)))
)
print("Negative km ratio = %f" % np.sum(np.abs(Rneg(Km_cooperative_model))))
print("Residuals (Km initial) = %f" % np.sum(np.abs(R(Km_counts))))
print("Residuals optimized = %f" % np.sum(np.abs(R(Km_cooperative_model))))
print(
"EndoR residuals (Km initial) = %f"
% np.sum(np.abs(isEndoRnase * R(Km_counts)))
)
print(
"EndoR residuals optimized = %f"
% np.sum(np.abs(isEndoRnase * R(Km_cooperative_model)))
)
print(
"Residuals (scaled by Kdeg * RNAcounts) Km initial = %f"
% np.sum(np.abs(R_aux(Km_counts)))
)
print(
"Residuals (scaled by Kdeg * RNAcounts) optimized = %f"
% np.sum(np.abs(R_aux(Km_cooperative_model)))
)
# Evaluate Jacobian around solutions (Kmcounts and KmCooperativeModel)
JacobDiag = np.diag(Jacob(Km_cooperative_model))
Jacob_auxDiag = np.diag(Jacob_aux(Km_cooperative_model))
# Compute convergence of non-linear optimization: g'(Km)
Gkm = np.abs(1.0 - JacobDiag)
Gkm_aux = np.abs(1.0 - Jacob_auxDiag)
sim_data.process.rna_decay.Km_convergence = Gkm_aux
# Convergence is guaranteed if g'(Km) <= K < 1
if VERBOSE:
print(
"Convergence (Jacobian) = %.0f%% (<K> = %.5f)"
% (len(Gkm[Gkm < 1.0]) / float(len(Gkm)) * 100.0, np.mean(Gkm))
)
print(
"Convergence (Jacobian_aux) = %.0f%% (<K> = %.5f)"
% (
len(Gkm_aux[Gkm_aux < 1.0]) / float(len(Gkm_aux)) * 100.0,
np.mean(Gkm_aux[Gkm_aux < 1.0]),
)
)
# Save statistics KM optimization
sim_data.process.rna_decay.stats_fit["LossKm"] = np.sum(
np.abs(LossFunction(Km_counts))
)
sim_data.process.rna_decay.stats_fit["LossKmOpt"] = np.sum(
np.abs(LossFunction(Km_cooperative_model))
)
sim_data.process.rna_decay.stats_fit["RnegKmOpt"] = np.sum(
np.abs(Rneg(Km_cooperative_model))
)
sim_data.process.rna_decay.stats_fit["ResKm"] = np.sum(np.abs(R(Km_counts)))
sim_data.process.rna_decay.stats_fit["ResKmOpt"] = np.sum(
np.abs(R(Km_cooperative_model))
)
sim_data.process.rna_decay.stats_fit["ResEndoRNKm"] = np.sum(
np.abs(isEndoRnase * R(Km_counts))
)
sim_data.process.rna_decay.stats_fit["ResEndoRNKmOpt"] = np.sum(
np.abs(isEndoRnase * R(Km_cooperative_model))
)
sim_data.process.rna_decay.stats_fit["ResScaledKm"] = np.sum(
np.abs(R_aux(Km_counts))
)
sim_data.process.rna_decay.stats_fit["ResScaledKmOpt"] = np.sum(
np.abs(R_aux(Km_cooperative_model))
)
return units.mol / units.L * Km_cooperative_model