Source code for ecoli.analysis.multigeneration.ribosome_usage

"""
Record several things:
1. cell volume over time
2. total / active ribosome count and concentration
3. active ribosome molar / mass fraction
4. Ribosome activation / deactivation count
5. # of AA. be translated
6. the effective ribosome elongation rate
"""

import altair as alt
import os
from typing import Any
import pickle

import polars as pl
from duckdb import DuckDBPyConnection
import pandas as pd
import numpy as np

from ecoli.library.parquet_emitter import open_arbitrary_sim_data, named_idx
from ecoli.library.schema import bulk_name_to_idx

# ----------------------------------------- #


[docs] def plot( params: dict[str, Any], conn: DuckDBPyConnection, history_sql: str, config_sql: str, success_sql: str, sim_data_dict: dict[str, dict[int, str]], validation_data_paths: list[str], outdir: str, variant_metadata: dict[str, dict[int, Any]], variant_names: dict[str, str], ): """Visualize ribosome usage statistics for E. coli simulation.""" # Load sim_data with open_arbitrary_sim_data(sim_data_dict) as f: sim_data = pickle.load(f) # Get molecular IDs for ribosome subunits complex_ids_30s = [sim_data.molecule_ids.s30_full_complex] complex_ids_50s = [sim_data.molecule_ids.s50_full_complex] bulk_ids = sim_data.internal_state.bulk_molecules.bulk_data["id"].tolist() # precompute indices as Python ints (following ribosome_production.py pattern) idx_30s = [ int(i) for i in np.atleast_1d(bulk_name_to_idx(complex_ids_30s, bulk_ids)) ] idx_50s = [ int(i) for i in np.atleast_1d(bulk_name_to_idx(complex_ids_50s, bulk_ids)) ] # Get molecular weights n_avogadro = sim_data.constants.n_avogadro mw_30s = sim_data.getter.get_masses(complex_ids_30s) mw_50s = sim_data.getter.get_masses(complex_ids_50s) mw_70s = mw_30s + mw_50s required_columns = [ "time", "variant", "generation", "agent_id", "experiment_id", "lineage_seed", "listeners__mass__instantaneous_growth_rate", "listeners__mass__cell_mass", "listeners__mass__volume", "listeners__ribosome_data__did_initialize", "listeners__ribosome_data__actual_elongations", "listeners__ribosome_data__did_terminate", "listeners__ribosome_data__effective_elongation_rate", "listeners__unique_molecule_counts__active_ribosome", ] # Create the bulk index expressions expr_30s = named_idx("bulk", [f"bulk_30s_{i}" for i in idx_30s], [idx_30s]) expr_50s = named_idx("bulk", [f"bulk_50s_{i}" for i in idx_50s], [idx_50s]) # load data sql = f""" SELECT {", ".join(required_columns)}, {expr_30s}, {expr_50s} FROM ({history_sql}) WHERE agent_id = 0 ORDER BY generation, time """ df = conn.sql(sql).pl() # Convert time if "time" in df.columns: df = df.with_columns((pl.col("time") / 60).alias("time_min")) df = df.with_columns([(pl.col("time") + 1).alias("time_step_sec")]) # Calculate ribosome subunit counts cols_30s = [c for c in df.columns if c.startswith("bulk_30s_")] cols_50s = [c for c in df.columns if c.startswith("bulk_50s_")] df = df.with_columns( [ # compute bulk ribosome subunit counts pl.sum_horizontal(cols_30s).alias("counts_30s"), pl.sum_horizontal(cols_50s).alias("counts_50s"), # compute unique ribosomes pl.col("listeners__unique_molecule_counts__active_ribosome") .fill_null(0) .alias("active_ribosome_counts"), ] ) # Calculate total ribosome counts and fractions df = df.with_columns( [ ( pl.col("active_ribosome_counts") + pl.min_horizontal(pl.col("counts_30s"), pl.col("counts_50s")) ).alias("total_ribosome_counts"), ( pl.col("active_ribosome_counts").cast(pl.Float64) / ( pl.col("active_ribosome_counts") + pl.min_horizontal(pl.col("counts_30s"), pl.col("counts_50s")) ) ).alias("molar_fraction_active"), ] ) if "listeners__mass__cell_mass" in df.columns: cell_density = sim_data.constants.cell_density.asNumber() df = df.with_columns( (1e-15 * pl.col("listeners__mass__cell_mass") / cell_density).alias( "cell_volume" ) ) # Calculate concentrations df = df.with_columns( [ ( pl.col("total_ribosome_counts") / n_avogadro.asNumber() / pl.col("cell_volume") ).alias("total_ribosome_concentration_mM"), ( pl.col("active_ribosome_counts") / n_avogadro.asNumber() / pl.col("cell_volume") ).alias("active_ribosome_concentration_mM"), ] ) # Calculate masses mw_30s_value = mw_30s.asNumber() if hasattr(mw_30s, "asNumber") else float(mw_30s) mw_50s_value = mw_50s.asNumber() if hasattr(mw_50s, "asNumber") else float(mw_50s) mw_70s_value = mw_70s.asNumber() if hasattr(mw_70s, "asNumber") else float(mw_70s) df = df.with_columns( [ (pl.col("counts_30s") / n_avogadro.asNumber() * mw_30s_value).alias( "mass_30s" ), (pl.col("counts_50s") / n_avogadro.asNumber() * mw_50s_value).alias( "mass_50s" ), ( pl.col("active_ribosome_counts") / n_avogadro.asNumber() * mw_70s_value ).alias("active_ribosome_mass"), ] ) df = df.with_columns( [ ( pl.col("active_ribosome_mass") + pl.col("mass_30s") + pl.col("mass_50s") ).alias("total_ribosome_mass"), ( pl.col("active_ribosome_mass") / ( pl.col("active_ribosome_mass") + pl.col("mass_30s") + pl.col("mass_50s") ) ).alias("mass_fraction_active"), ] ) # Calculate rates per time and volume if "time_step_sec" in df.columns and "cell_volume" in df.columns: df = df.with_columns( [ ( pl.col("listeners__ribosome_data__did_initialize") / (pl.col("cell_volume") / 1e-15) ).alias("activations_per_volume"), ( pl.col("listeners__ribosome_data__did_terminate") / (pl.col("cell_volume") / 1e-15) ).alias("deactivations_per_volume"), ] ) # Select columns for plotting plot_columns = ["time_min", "variant", "generation"] # Add other columns that exist for col in [ "time_step_sec", "cell_volume", "total_ribosome_counts", "total_ribosome_concentration_mM", "active_ribosome_counts", "active_ribosome_concentration_mM", "molar_fraction_active", "mass_fraction_active", "listeners__ribosome_data__did_initialize", "listeners__ribosome_data__did_terminate", "activations_per_volume", "deactivations_per_volume", "listeners__ribosome_data__actual_elongations", "listeners__ribosome_data__effective_elongation_rate", ]: if col in df.columns: plot_columns.append(col) plot_df = df.select(plot_columns) # ----------------------------------------- # def create_line_chart(y_field, title, y_title, skip_first_point=False): """Create line chart with optional skipping of first data point.""" data = plot_df.to_pandas() if skip_first_point: # Group by variant and generation, skip first point of each group filtered_data = [] for (variant, generation), group in data.groupby(["variant", "generation"]): if len(group) > 1: filtered_data.append(group.iloc[1:]) else: filtered_data.append(group) data = ( pd.concat(filtered_data, ignore_index=True) if filtered_data else data ) chart = ( alt.Chart(data) .mark_line() .encode( x=alt.X("time_min:Q", title="Time (min)"), y=alt.Y(f"{y_field}:Q", title=y_title), color=alt.Color("generation:N", legend=alt.Legend(title="Generation")), ) .properties(title=title, width=600, height=120) ) return chart # ----------------------------------------- # plots = [] # Create all 14 plots following the original order if "time_step_sec" in plot_df.columns: plots.append( create_line_chart( "time_step_sec", "Length of Time Step", "Length of time step (s)" ) ) if "cell_volume" in plot_df.columns: plots.append(create_line_chart("cell_volume", "Cell Volume", "Cell volume (L)")) if "total_ribosome_counts" in plot_df.columns: plots.append( create_line_chart( "total_ribosome_counts", "Total Ribosome Count", "Total ribosome count" ) ) if "total_ribosome_concentration_mM" in plot_df.columns: plots.append( create_line_chart( "total_ribosome_concentration_mM", "Total Ribosome Concentration", "[Total ribosome] (mM)", ) ) if "active_ribosome_counts" in plot_df.columns: plots.append( create_line_chart( "active_ribosome_counts", "Active Ribosome Count", "Active ribosome count", skip_first_point=True, ) ) if "active_ribosome_concentration_mM" in plot_df.columns: plots.append( create_line_chart( "active_ribosome_concentration_mM", "Active Ribosome Concentration", "[Active ribosome] (mM)", skip_first_point=True, ) ) if "molar_fraction_active" in plot_df.columns: plots.append( create_line_chart( "molar_fraction_active", "Molar Fraction Active Ribosomes", "Molar fraction active ribosomes", skip_first_point=True, ) ) if "mass_fraction_active" in plot_df.columns: plots.append( create_line_chart( "mass_fraction_active", "Mass Fraction Active Ribosomes", "Mass fraction active ribosomes", skip_first_point=True, ) ) if "listeners__ribosome_data__did_initialize" in plot_df.columns: plots.append( create_line_chart( "listeners__ribosome_data__did_initialize", "Ribosome Activations", "Activations per timestep", ) ) if "listeners__ribosome_data__did_terminate" in plot_df.columns: plots.append( create_line_chart( "listeners__ribosome_data__did_terminate", "Ribosome Deactivations", "Deactivations per timestep", ) ) if "activations_per_volume" in plot_df.columns: plots.append( create_line_chart( "activations_per_volume", "Activations per Volume (fL)", "Activations per Volume (fL)", ) ) if "deactivations_per_volume" in plot_df.columns: plots.append( create_line_chart( "deactivations_per_volume", "Deactivations per Volume (fL)", "Deactivations per Volume (fL)", ) ) if "listeners__ribosome_data__actual_elongations" in plot_df.columns: plots.append( create_line_chart( "listeners__ribosome_data__actual_elongations", "Amino Acids Translated", "AA translated", ) ) if "listeners__ribosome_data__effective_elongation_rate" in plot_df.columns: plots.append( create_line_chart( "listeners__ribosome_data__effective_elongation_rate", "Effective Ribosome Elongation Rate", "Effective elongation rate", ) ) if not plots: fallback_df = pl.DataFrame( { "message": ["No data available for ribosome usage visualization"], "x": [0], "y": [0], } ) fallback_plot = ( alt.Chart(fallback_df) .mark_text(size=20, color="red") .encode(x="x:Q", y="y:Q", text="message:N") .properties( width=600, height=400, title="Ribosome Usage Statistics - No Data Available", ) ) plots.append(fallback_plot) # Arrange plots in 2 columns as in original left_plots = plots[::2] # Even indices (0, 2, 4, ...) right_plots = plots[1::2] # Odd indices (1, 3, 5, ...) # Ensure both columns have same length by adding empty chart if needed if len(left_plots) > len(right_plots): empty_chart = ( alt.Chart(pl.DataFrame({"x": [0], "y": [0]})) .mark_point(opacity=0) .encode(x="x:Q", y="y:Q") .properties(width=600, height=120) ) right_plots.append(empty_chart) elif len(right_plots) > len(left_plots): empty_chart = ( alt.Chart(pl.DataFrame({"x": [0], "y": [0]})) .mark_point(opacity=0) .encode(x="x:Q", y="y:Q") .properties(width=600, height=120) ) left_plots.append(empty_chart) # Create two column layout left_column = alt.vconcat(*left_plots) right_column = alt.vconcat(*right_plots) combined_plot = ( alt.hconcat(left_column, right_column) .resolve_scale(x="shared", y="independent") .properties(title="Ribosome Usage Statistics") ) output_path = os.path.join(outdir, "ribosome_usage_report.html") combined_plot.save(output_path) print(f"Saved visualization to: {output_path}") return combined_plot