"""
======================
Polypeptide Initiation
======================
This process models the complementation of 30S and 50S ribosomal subunits
into 70S ribosomes on mRNA transcripts. This process is in many ways
analogous to the TranscriptInitiation process - the number of initiation
events per transcript is determined in a probabilistic manner and dependent
on the number of free ribosomal subunits, each mRNA transcript’s translation
efficiency, and the counts of each type of transcript.
"""
import numpy as np
from vivarium.core.composition import simulate_process
from ecoli.library.schema import (
create_unique_indexes,
numpy_schema,
attrs,
counts,
bulk_name_to_idx,
listener_schema,
)
from wholecell.utils import units
from wholecell.utils.fitting import normalize
from ecoli.processes.registries import topology_registry
from ecoli.processes.partition import PartitionedProcess
# Register default topology for this process, associating it with process name
NAME = "ecoli-polypeptide-initiation"
TOPOLOGY = {
"environment": ("environment",),
"listeners": ("listeners",),
"active_ribosome": ("unique", "active_ribosome"),
"RNA": ("unique", "RNA"),
"bulk": ("bulk",),
"timestep": ("timestep",),
}
topology_registry.register(NAME, TOPOLOGY)
[docs]
class PolypeptideInitiation(PartitionedProcess):
"""Polypeptide Initiation PartitionedProcess"""
name = NAME
topology = TOPOLOGY
defaults = {
"protein_lengths": [],
"translation_efficiencies": [],
"active_ribosome_fraction": {},
"elongation_rates": {},
"variable_elongation": False,
"make_elongation_rates": lambda x: [],
"rna_id_to_cistron_indexes": {},
"cistron_start_end_pos_in_tu": {},
"tu_ids": [],
"active_ribosome_footprint_size": 24 * units.nt,
"cistron_to_monomer_mapping": {},
"cistron_tu_mapping_matrix": {},
"monomer_index_to_cistron_index": {},
"monomer_index_to_tu_indexes": {},
"ribosome30S": "ribosome30S",
"ribosome50S": "ribosome50S",
"seed": 0,
"monomer_ids": [],
"emit_unique": False,
"time_step": 1,
}
def __init__(self, parameters=None):
super().__init__(parameters)
# Load parameters
self.protein_lengths = self.parameters["protein_lengths"]
self.translation_efficiencies = self.parameters["translation_efficiencies"]
self.active_ribosome_fraction = self.parameters["active_ribosome_fraction"]
self.ribosome_elongation_rates_dict = self.parameters["elongation_rates"]
self.variable_elongation = self.parameters["variable_elongation"]
self.make_elongation_rates = self.parameters["make_elongation_rates"]
self.rna_id_to_cistron_indexes = self.parameters["rna_id_to_cistron_indexes"]
self.cistron_start_end_pos_in_tu = self.parameters[
"cistron_start_end_pos_in_tu"
]
self.tu_ids = self.parameters["tu_ids"]
self.n_TUs = len(self.tu_ids)
self.active_ribosome_footprint_size = self.parameters[
"active_ribosome_footprint_size"
]
# Get mapping from cistrons to protein monomers and TUs
self.cistron_to_monomer_mapping = self.parameters["cistron_to_monomer_mapping"]
self.cistron_tu_mapping_matrix = self.parameters["cistron_tu_mapping_matrix"]
self.monomer_index_to_cistron_index = self.parameters[
"monomer_index_to_cistron_index"
]
self.monomer_index_to_tu_indexes = self.parameters[
"monomer_index_to_tu_indexes"
]
self.ribosome30S = self.parameters["ribosome30S"]
self.ribosome50S = self.parameters["ribosome50S"]
self.seed = self.parameters["seed"]
self.random_state = np.random.RandomState(seed=self.seed)
# Use separate random state instance to create unique indices
# so results are directly comparable with wcEcoli
self.unique_idx_random_state = np.random.RandomState(seed=self.seed)
self.empty_update = {
"listeners": {
"ribosome_data": {
"ribosomes_initialized": 0,
"prob_translation_per_transcript": 0.0,
}
}
}
self.monomer_ids = self.parameters["monomer_ids"]
# Helper indices for Numpy indexing
self.ribosome30S_idx = None
[docs]
def ports_schema(self):
return {
"environment": {"media_id": {"_default": "", "_updater": "set"}},
"listeners": {
"ribosome_data": listener_schema(
{
"did_initialize": 0,
"target_prob_translation_per_transcript": (
[0.0] * len(self.monomer_ids),
self.monomer_ids,
),
"actual_prob_translation_per_transcript": (
[0.0] * len(self.monomer_ids),
self.monomer_ids,
),
"mRNA_is_overcrowded": (
[False] * len(self.monomer_ids),
self.monomer_ids,
),
"ribosome_init_event_per_monomer": (
[0] * len(self.monomer_ids),
self.monomer_ids,
),
"effective_elongation_rate": 0.0,
"max_p": 0.0,
"max_p_per_protein": (
np.zeros(len(self.monomer_ids), np.float64),
self.monomer_ids,
),
}
),
},
"active_ribosome": numpy_schema(
"active_ribosome", emit=self.parameters["emit_unique"]
),
"RNA": numpy_schema("RNAs", emit=self.parameters["emit_unique"]),
"bulk": numpy_schema("bulk"),
"timestep": {"_default": self.parameters["time_step"]},
}
[docs]
def calculate_request(self, timestep, states):
if self.ribosome30S_idx is None:
bulk_ids = states["bulk"]["id"]
self.ribosome30S_idx = bulk_name_to_idx(self.ribosome30S, bulk_ids)
self.ribosome50S_idx = bulk_name_to_idx(self.ribosome50S, bulk_ids)
current_media_id = states["environment"]["media_id"]
# requests = {'subunits': states['subunits']}
requests = {
"bulk": [
(self.ribosome30S_idx, counts(states["bulk"], self.ribosome30S_idx)),
(self.ribosome50S_idx, counts(states["bulk"], self.ribosome50S_idx)),
]
}
self.fracActiveRibosome = self.active_ribosome_fraction[current_media_id]
# Read ribosome elongation rate from last timestep
self.ribosomeElongationRate = states["listeners"]["ribosome_data"][
"effective_elongation_rate"
]
# If the ribosome elongation rate is zero (which is always the case for
# the first timestep), set ribosome elongation rate to the one in
# dictionary
if self.ribosomeElongationRate == 0:
self.ribosomeElongationRate = self.ribosome_elongation_rates_dict[
current_media_id
].asNumber(units.aa / units.s)
self.elongation_rates = self.make_elongation_rates(
self.random_state,
self.ribosomeElongationRate,
1, # want elongation rate, not lengths adjusted for time step
self.variable_elongation,
)
# Ensure rates are never zero
self.elongation_rates = np.fmax(self.elongation_rates, 1)
return requests
[docs]
def evolve_state(self, timestep, states):
# Calculate number of ribosomes that could potentially be initialized
# based on counts of free 30S and 50S subunits
inactive_ribosome_count = np.min(
[
counts(states["bulk"], self.ribosome30S_idx),
counts(states["bulk"], self.ribosome50S_idx),
]
)
# Calculate actual number of ribosomes that should be activated based
# on probabilities
(
TU_index_RNAs,
transcript_lengths,
can_translate,
is_full_transcript,
unique_index_RNAs,
) = attrs(
states["RNA"],
[
"TU_index",
"transcript_length",
"can_translate",
"is_full_transcript",
"unique_index",
],
)
TU_index_mRNAs = TU_index_RNAs[can_translate]
length_mRNAs = transcript_lengths[can_translate]
unique_index_mRNAs = unique_index_RNAs[can_translate]
is_full_transcript_mRNAs = is_full_transcript[can_translate]
is_incomplete_transcript_mRNAs = np.logical_not(is_full_transcript_mRNAs)
# Calculate counts of each mRNA cistron from fully transcribed
# transcription units
TU_index_full_mRNAs = TU_index_mRNAs[is_full_transcript_mRNAs]
TU_counts_full_mRNAs = np.bincount(TU_index_full_mRNAs, minlength=self.n_TUs)
cistron_counts = self.cistron_tu_mapping_matrix.dot(TU_counts_full_mRNAs)
# Calculate counts of each mRNA cistron from partially transcribed
# transcription units
TU_index_incomplete_mRNAs = TU_index_mRNAs[is_incomplete_transcript_mRNAs]
length_incomplete_mRNAs = length_mRNAs[is_incomplete_transcript_mRNAs]
for TU_index, length in zip(TU_index_incomplete_mRNAs, length_incomplete_mRNAs):
cistron_indexes = self.rna_id_to_cistron_indexes(self.tu_ids[TU_index])
cistron_start_positions = np.array(
[
self.cistron_start_end_pos_in_tu[(cistron_index, TU_index)][0]
for cistron_index in cistron_indexes
]
)
cistron_counts[cistron_indexes] += length > cistron_start_positions
# Calculate initiation probabilities for ribosomes based on mRNA counts
# and associated mRNA translational efficiencies
protein_init_prob = normalize(
cistron_counts[self.cistron_to_monomer_mapping]
* self.translation_efficiencies
)
target_protein_init_prob = protein_init_prob.copy()
# Calculate actual number of ribosomes that should be activated based
# on probabilities
activation_prob = self.calculate_activation_prob(
self.fracActiveRibosome,
self.protein_lengths,
self.elongation_rates,
target_protein_init_prob,
states["timestep"],
)
n_ribosomes_to_activate = np.int64(activation_prob * inactive_ribosome_count)
if n_ribosomes_to_activate == 0:
update = dict(self.empty_update)
update["active_ribosome"] = {}
return self.empty_update
# Cap the initiation probabilities at the maximum level physically
# allowed from the known ribosome footprint sizes based on the
# number of mRNAs
max_p = (
self.ribosomeElongationRate
/ self.active_ribosome_footprint_size
* (units.s)
* states["timestep"]
/ n_ribosomes_to_activate
).asNumber()
max_p_per_protein = max_p * cistron_counts[self.cistron_to_monomer_mapping]
is_overcrowded = protein_init_prob > max_p_per_protein
# Initialize flag to record if the number of ribosomes activated at this
# time step needed to be reduced to prevent overcrowding
is_n_ribosomes_to_activate_reduced = False
# If needed, resolve overcrowding
while np.any(protein_init_prob > max_p_per_protein):
if protein_init_prob[~is_overcrowded].sum() != 0:
# Resolve overcrowding through rescaling (preferred)
protein_init_prob[is_overcrowded] = max_p_per_protein[is_overcrowded]
scale_the_rest_by = (
1.0 - protein_init_prob[is_overcrowded].sum()
) / protein_init_prob[~is_overcrowded].sum()
protein_init_prob[~is_overcrowded] *= scale_the_rest_by
is_overcrowded |= protein_init_prob > max_p_per_protein
else:
# If we cannot resolve the overcrowding through rescaling,
# we need to activate fewer ribosomes. Set the number of
# ribosomes to activate so that there will be no overcrowding.
is_n_ribosomes_to_activate_reduced = True
max_index = np.argmax(
protein_init_prob[is_overcrowded]
/ max_p_per_protein[is_overcrowded]
)
max_init_prob = protein_init_prob[is_overcrowded][max_index]
associated_cistron_counts = cistron_counts[
self.cistron_to_monomer_mapping
][is_overcrowded][max_index]
n_ribosomes_to_activate = np.int64(
(
self.ribosomeElongationRate
/ self.active_ribosome_footprint_size
* (units.s)
* states["timestep"]
/ max_init_prob
* associated_cistron_counts
).asNumber()
)
# Update maximum probabilities based on new number of activated
# ribosomes.
max_p = (
self.ribosomeElongationRate
/ self.active_ribosome_footprint_size
* (units.s)
* states["timestep"]
/ n_ribosomes_to_activate
).asNumber()
max_p_per_protein = (
max_p * cistron_counts[self.cistron_to_monomer_mapping]
)
is_overcrowded = protein_init_prob > max_p_per_protein
assert is_overcrowded.sum() == 0 # We expect no overcrowding
# Compute actual transcription probabilities of each transcript
actual_protein_init_prob = protein_init_prob.copy()
# Sample multinomial distribution to determine which mRNAs have full
# 70S ribosomes initialized on them
n_new_proteins = self.random_state.multinomial(
n_ribosomes_to_activate, protein_init_prob
)
# Build attributes for active ribosomes.
# Each ribosome is assigned a protein index for the protein that
# corresponds to the polypeptide it will polymerize. This is done in
# blocks of protein ids for efficiency.
protein_indexes = np.empty(n_ribosomes_to_activate, np.int64)
mRNA_indexes = np.empty(n_ribosomes_to_activate, np.int64)
positions_on_mRNA = np.empty(n_ribosomes_to_activate, np.int64)
nonzero_count = n_new_proteins > 0
start_index = 0
for protein_index, protein_counts in zip(
np.arange(n_new_proteins.size)[nonzero_count], n_new_proteins[nonzero_count]
):
# Set protein index
protein_indexes[start_index : start_index + protein_counts] = protein_index
cistron_index = self.monomer_index_to_cistron_index[protein_index]
attribute_indexes = []
cistron_start_positions = []
for TU_index in self.monomer_index_to_tu_indexes[protein_index]:
attribute_indexes_this_TU = np.where(TU_index_mRNAs == TU_index)[0]
cistron_start_position = self.cistron_start_end_pos_in_tu[
(cistron_index, TU_index)
][0]
is_transcript_long_enough = (
length_mRNAs[attribute_indexes_this_TU] >= cistron_start_position
)
attribute_indexes.extend(
attribute_indexes_this_TU[is_transcript_long_enough]
)
cistron_start_positions.extend(
[cistron_start_position]
* len(attribute_indexes_this_TU[is_transcript_long_enough])
)
n_mRNAs = len(attribute_indexes)
# Distribute ribosomes among these mRNAs
n_ribosomes_per_RNA = self.random_state.multinomial(
protein_counts, np.full(n_mRNAs, 1.0 / n_mRNAs)
)
# Get unique indexes of each mRNA
mRNA_indexes[start_index : start_index + protein_counts] = np.repeat(
unique_index_mRNAs[attribute_indexes], n_ribosomes_per_RNA
)
positions_on_mRNA[start_index : start_index + protein_counts] = np.repeat(
cistron_start_positions, n_ribosomes_per_RNA
)
start_index += protein_counts
# Create active 70S ribosomes and assign their attributes
ribosome_indices = create_unique_indexes(
n_ribosomes_to_activate, self.unique_idx_random_state
)
update = {
"bulk": [
(self.ribosome30S_idx, -n_new_proteins.sum()),
(self.ribosome50S_idx, -n_new_proteins.sum()),
],
"active_ribosome": {
"add": {
"unique_index": ribosome_indices,
"protein_index": protein_indexes,
"peptide_length": np.zeros(n_ribosomes_to_activate, dtype=np.int64),
"mRNA_index": mRNA_indexes,
"pos_on_mRNA": positions_on_mRNA,
},
},
"listeners": {
"ribosome_data": {
"did_initialize": n_new_proteins.sum(),
"ribosome_init_event_per_monomer": n_new_proteins,
"target_prob_translation_per_transcript": target_protein_init_prob,
"actual_prob_translation_per_transcript": actual_protein_init_prob,
"mRNA_is_overcrowded": is_overcrowded,
"max_p": max_p,
"max_p_per_protein": max_p_per_protein,
"is_n_ribosomes_to_activate_reduced": is_n_ribosomes_to_activate_reduced,
}
},
}
return update
[docs]
def calculate_activation_prob(
self,
fracActiveRibosome,
proteinLengths,
ribosomeElongationRates,
proteinInitProb,
timeStepSec,
):
"""
Calculates the expected ribosome termination rate based on the ribosome
elongation rate
Args:
allTranslationTimes: Vector of times required to translate each
protein
allTranslationTimestepCounts: Vector of numbers of timesteps
required to translate each protein
averageTranslationTimeStepCounts: Average number of timesteps
required to translate a protein, weighted by initiation
probabilities
expectedTerminationRate: Average number of terminations in one
timestep for one protein
"""
allTranslationTimes = 1.0 / ribosomeElongationRates * proteinLengths
allTranslationTimestepCounts = np.ceil(allTranslationTimes / timeStepSec)
averageTranslationTimestepCounts = np.dot(
allTranslationTimestepCounts, proteinInitProb
)
expectedTerminationRate = 1.0 / averageTranslationTimestepCounts
# Modify given fraction of active ribosomes to take into account early
# terminations in between timesteps
# allFractionTimeInactive: Vector of probabilities an "active" ribosome
# will in effect be "inactive" because it has terminated during a
# timestep
# averageFractionTimeInactive: Average probability of an "active"
# ribosome being in effect "inactive", weighted by initiation
# probabilities
# effectiveFracActiveRnap: New higher "goal" for fraction of active
# ribosomes, considering that the "effective" fraction is lower than
# what the listener sees
allFractionTimeInactive = (
1 - allTranslationTimes / timeStepSec / allTranslationTimestepCounts
)
averageFractionTimeInactive = np.dot(allFractionTimeInactive, proteinInitProb)
effectiveFracActiveRibosome = (
fracActiveRibosome * 1 / (1 - averageFractionTimeInactive)
)
# Return activation probability that will balance out the expected
# termination rate
activationProb = (
effectiveFracActiveRibosome
* expectedTerminationRate
/ (1 - effectiveFracActiveRibosome)
)
# The upper bound for the activation probability is temporarily set to
# 1.0 to prevent negative molecule counts. This will lower the fraction
# of active ribosomes for timesteps longer than roughly 1.8s.
if activationProb >= 1.0:
activationProb = 1
return activationProb
def test_polypeptide_initiation():
def make_elongation_rates(self, random, base, time_step, variable_elongation=False):
return base
all_mRNA_ids = ["wRNA", "xRNA", "yRNA", "zRNA"]
protein_lengths = np.array([25, 9, 12, 29])
test_config = {
"protein_lengths": protein_lengths,
"translation_efficiencies": normalize(np.array([0.1, 0.2, 0.3, 0.4])),
"active_ribosome_fraction": {"minimal": 0.05},
"variable_elongation": False,
"make_elongation_rates": make_elongation_rates,
"rna_id_to_cistron_indexes": lambda rna_id: all_mRNA_ids.index(rna_id),
"cistron_start_end_pos_in_tu": {
(idx, idx): (0, protein_length * 3 + 3)
for idx, protein_length in enumerate(protein_lengths)
},
"tu_ids": all_mRNA_ids,
"cistron_to_monomer_mapping": np.arange(4),
"cistron_tu_mapping_matrix": np.identity(4),
"monomer_index_to_cistron_index": {i: i for i in range(4)},
"monomer_index_to_tu_indexes": {i: (i,) for i in range(4)},
"protein_index_to_TU_index": np.arange(4),
"all_TU_ids": ["wRNA", "xRNA", "yRNA", "zRNA"],
"all_mRNA_ids": ["wRNA", "xRNA", "yRNA", "zRNA"],
"ribosome30S": "30S",
"ribosome50S": "50S",
"seed": 0,
}
polypeptide_initiation = PolypeptideInitiation(test_config)
state = {
"environment": {"media_id": "minimal"},
"listeners": {"ribosome_data": {"effective_elongation_rate": 25}},
"bulk": np.array(
[
("30S", 2000),
("50S", 3000),
],
dtype=[("id", "U40"), ("count", int)],
),
"RNA": np.array(
[
(1, 0, True, 0, 103, True),
(1, 0, True, 1, 103, True),
(1, 1, True, 2, 39, True),
(1, 2, True, 3, 51, True),
(1, 2, True, 4, 51, True),
(1, 3, True, 4, 119, True),
],
dtype=[
("_entryState", np.bool_),
("TU_index", int),
("can_translate", np.bool_),
("unique_index", int),
("transcript_length", int),
("is_full_transcript", np.bool_),
],
),
}
settings = {"total_time": 10, "initial_state": state}
data = simulate_process(polypeptide_initiation, settings)
print(data)
if __name__ == "__main__":
test_polypeptide_initiation()