Source code for ecoli.processes.tf_binding

"""
============================
Transcription Factor Binding
============================

This process models how transcription factors bind to promoters on the DNA sequence.
"""

import numpy as np
import warnings

from vivarium.core.process import Step

from ecoli.library.schema import (
    listener_schema,
    numpy_schema,
    attrs,
    bulk_name_to_idx,
    counts,
)

from wholecell.utils.random import stochasticRound
from wholecell.utils import units

from ecoli.processes.registries import topology_registry


# Register default topology for this process, associating it with process name
NAME = "ecoli-tf-binding"
TOPOLOGY = {
    "promoters": ("unique", "promoter"),
    "bulk": ("bulk",),
    "bulk_total": ("bulk",),
    "listeners": ("listeners",),
    "timestep": ("timestep",),
    "next_update_time": ("next_update_time", "tf_binding"),
    "global_time": ("global_time",),
}
topology_registry.register(NAME, TOPOLOGY)


[docs] class TfBinding(Step): """Transcription Factor Binding Step""" name = NAME topology = TOPOLOGY defaults = { "tf_ids": [], "rna_ids": [], "delta_prob": {"deltaI": [], "deltaJ": [], "deltaV": []}, "n_avogadro": 6.02214076e23 / units.mol, "cell_density": 1100 * units.g / units.L, # Calculate promoter binding probability when not 0CS TF "p_promoter_bound_tf": lambda active, inactive: float(active) / (float(active) + float(inactive)), "tf_to_tf_type": {}, "active_to_bound": {}, "get_unbound": lambda tf: "", "active_to_inactive_tf": {}, "bulk_molecule_ids": [], "bulk_mass_data": np.array([[]]) * units.g / units.mol, "seed": 0, "submass_to_idx": { "rRNA": 0, "tRNA": 1, "mRNA": 2, "miscRNA": 3, "nonspecific_RNA": 4, "protein": 5, "metabolite": 6, "water": 7, "DNA": 8, }, "emit_unique": False, } # Constructor def __init__(self, parameters=None): super().__init__(parameters) # Get IDs of transcription factors self.tf_ids = self.parameters["tf_ids"] self.n_TF = len(self.tf_ids) self.rna_ids = self.parameters["rna_ids"] # Build dict that maps TFs to transcription units they regulate self.delta_prob = self.parameters["delta_prob"] self.TF_to_TU_idx = {} for i, tf in enumerate(self.tf_ids): self.TF_to_TU_idx[tf] = self.delta_prob["deltaI"][ self.delta_prob["deltaJ"] == i ] # Get total counts of transcription units self.n_TU = self.delta_prob["shape"][0] # Get constants self.n_avogadro = self.parameters["n_avogadro"] self.cell_density = self.parameters["cell_density"] # Create dictionaries and method self.p_promoter_bound_tf = self.parameters["p_promoter_bound_tf"] self.tf_to_tf_type = self.parameters["tf_to_tf_type"] self.active_to_bound = self.parameters["active_to_bound"] self.get_unbound = self.parameters["get_unbound"] self.active_to_inactive_tf = self.parameters["active_to_inactive_tf"] self.active_tfs = {} self.inactive_tfs = {} for tf in self.tf_ids: self.active_tfs[tf] = tf + "[c]" if self.tf_to_tf_type[tf] == "1CS": if tf == self.active_to_bound[tf]: self.inactive_tfs[tf] = self.get_unbound(tf + "[c]") else: self.inactive_tfs[tf] = self.active_to_bound[tf] + "[c]" elif self.tf_to_tf_type[tf] == "2CS": self.inactive_tfs[tf] = self.active_to_inactive_tf[tf + "[c]"] self.bulk_mass_data = self.parameters["bulk_mass_data"] # Build array of active TF masses self.bulk_molecule_ids = self.parameters["bulk_molecule_ids"] tf_indexes = [ np.where(self.bulk_molecule_ids == tf_id + "[c]")[0][0] for tf_id in self.tf_ids ] self.active_tf_masses = ( self.bulk_mass_data[tf_indexes] / self.n_avogadro ).asNumber(units.fg) self.seed = self.parameters["seed"] self.random_state = np.random.RandomState(seed=self.seed) # Helper indices for Numpy indexing self.active_tf_idx = None if "PD00365" in self.tf_ids: self.marR_name = "CPLX0-7710[c]" self.marR_tet = "marR-tet[c]" self.submass_indices = self.parameters["submass_indices"]
[docs] def ports_schema(self): return { "promoters": numpy_schema("promoters", emit=self.parameters["emit_unique"]), "bulk": numpy_schema("bulk"), "bulk_total": numpy_schema("bulk"), "listeners": { "rna_synth_prob": listener_schema( { "p_promoter_bound": ([0.0] * self.n_TF, self.tf_ids), "n_promoter_bound": ([0] * self.n_TF, self.tf_ids), "n_actual_bound": ([0] * self.n_TF, self.tf_ids), "n_available_promoters": ([0] * self.n_TF, self.tf_ids), "n_bound_TF_per_TU": ( [[0] * self.n_TF] * self.n_TU, self.rna_ids, ), } ) }, "next_update_time": { "_default": self.parameters["time_step"], "_updater": "set", "_divider": "set", }, "global_time": {"_default": 0.0}, "timestep": {"_default": self.parameters["time_step"]}, }
[docs] def update_condition(self, timestep, states): """ See :py:meth:`~.Requester.update_condition`. """ if states["next_update_time"] <= states["global_time"]: if states["next_update_time"] < states["global_time"]: warnings.warn( f"{self.name} updated at t=" f"{states['global_time']} instead of t=" f"{states['next_update_time']}. Decrease the " "timestep for the global clock process for more " "accurate timekeeping." ) return True return False
[docs] def next_update(self, timestep, states): # At t=0, convert all strings to indices if self.active_tf_idx is None: bulk_ids = states["bulk"]["id"] self.active_tf_idx = { tf_id: bulk_name_to_idx(tf_name, bulk_ids) for tf_id, tf_name in self.active_tfs.items() } self.inactive_tf_idx = { tf_id: bulk_name_to_idx(tf_name, bulk_ids) for tf_id, tf_name in self.inactive_tfs.items() } if "PD00365" in self.tf_ids: self.marR_idx = bulk_name_to_idx(self.marR_name, bulk_ids) self.marR_tet_idx = bulk_name_to_idx(self.marR_tet, bulk_ids) # If there are no promoters, return immediately if states["promoters"]["_entryState"].sum() == 0: return {"promoters": {}} # Get attributes of all promoters TU_index, bound_TF = attrs(states["promoters"], ["TU_index", "bound_TF"]) # Calculate number of bound TFs for each TF prior to changes n_bound_TF = bound_TF.sum(axis=0) # Initialize new bound_TF array bound_TF_new = np.zeros_like(bound_TF) # Create vectors for storing values pPromotersBound = np.zeros(self.n_TF, dtype=np.float64) nPromotersBound = np.zeros(self.n_TF, dtype=int) nActualBound = np.zeros(self.n_TF, dtype=int) n_promoters = np.zeros(self.n_TF, dtype=int) n_bound_TF_per_TU = np.zeros((self.n_TU, self.n_TF), dtype=np.int16) update = {"bulk": []} for tf_idx, tf_id in enumerate(self.tf_ids): # Free all DNA-bound transcription factors into free active # transcription factors curr_tf_idx = self.active_tf_idx[tf_id] tf_count = counts(states["bulk"], curr_tf_idx) bound_tf_counts = n_bound_TF[tf_idx] update["bulk"].append((curr_tf_idx, bound_tf_counts)) # Get counts of transcription factors active_tf_counts = ( counts(states["bulk_total"], curr_tf_idx) + bound_tf_counts ) n_available_active_tfs = tf_count + bound_tf_counts # NEW to vivarium-ecoli # Uncomplexed marR reduces active marA if tf_id == "PD00365": marR_count = counts(states["bulk_total"], self.marR_idx) marR_tet_count = counts(states["bulk_total"], self.marR_tet_idx) # marA activity ramps up as more marR is complexed off # TODO: Figure out how to modify ParCa so MarA/R are included # as active TFs so no need to compromise basal or tetracycline # behavior when total MarR count is zero ratio = marR_tet_count / max(marR_count + marR_tet_count, 1) # 34 = # of promoters for genes that marA regulates n_available_active_tfs = int(34 * ratio) # Determine the number of available promoter sites available_promoters = np.isin(TU_index, self.TF_to_TU_idx[tf_id]) n_available_promoters = np.count_nonzero(available_promoters) n_promoters[tf_idx] = n_available_promoters # If there are no active transcription factors to work with, # continue to the next transcription factor if n_available_active_tfs == 0: continue # Compute probability of binding the promoter if self.tf_to_tf_type[tf_id] == "0CS": pPromoterBound = 1.0 else: inactive_tf_counts = counts( states["bulk_total"], self.inactive_tf_idx[tf_id] ) pPromoterBound = self.p_promoter_bound_tf( active_tf_counts, inactive_tf_counts ) # Calculate the number of promoters that should be bound n_to_bind = int( min( stochasticRound( self.random_state, np.full(n_available_promoters, pPromoterBound), ).sum(), n_available_active_tfs, ) ) bound_locs = np.zeros(n_available_promoters, dtype=bool) if n_to_bind > 0: # Determine randomly which DNA targets to bind based on which of # the following is more limiting: # number of promoter sites to bind, or number of active # transcription factors bound_locs[ self.random_state.choice( n_available_promoters, size=n_to_bind, replace=False ) ] = True # Update count of free transcription factors update["bulk"].append((curr_tf_idx, -bound_locs.sum())) # Update bound_TF array bound_TF_new[available_promoters, tf_idx] = bound_locs n_bound_TF_per_TU[:, tf_idx] = np.bincount( TU_index[bound_TF_new[:, tf_idx]], minlength=self.n_TU ) # Record values pPromotersBound[tf_idx] = pPromoterBound nPromotersBound[tf_idx] = n_to_bind nActualBound[tf_idx] = bound_locs.sum() delta_TF = bound_TF_new.astype(np.int8) - bound_TF.astype(np.int8) mass_diffs = delta_TF.dot(self.active_tf_masses) submass_update = { submass: attrs(states["promoters"], [submass])[0] + mass_diffs[:, i] for submass, i in self.submass_indices.items() } update["promoters"] = {"set": {"bound_TF": bound_TF_new, **submass_update}} update["listeners"] = { "rna_synth_prob": { "p_promoter_bound": pPromotersBound, "n_promoter_bound": nPromotersBound, "n_actual_bound": nActualBound, "n_available_promoters": n_promoters, # 900 KB, very large, comment out to halve emit size "n_bound_TF_per_TU": n_bound_TF_per_TU, }, } update["next_update_time"] = states["global_time"] + states["timestep"] return update
def test_tf_binding_listener(): from ecoli.experiments.ecoli_master_sim import EcoliSim sim = EcoliSim.from_file() sim.total_time = 2 sim.raw_output = False sim.build_ecoli() sim.run() data = sim.query() assert data is not None if __name__ == "__main__": test_tf_binding_listener()