Source code for ecoli.analysis.multigeneration.ribosome_components

"""
Record the 30S and 50S component count vs time
"""

import altair as alt
import os
from typing import Any, Dict

from duckdb import DuckDBPyConnection
import pickle
import polars as pl

from ecoli.library.parquet_emitter import (
    field_metadata,
    open_arbitrary_sim_data,
    named_idx,
    read_stacked_columns,
)

# ----------------------------------------- #


[docs] def plot( params: Dict[str, Any], conn: DuckDBPyConnection, history_sql: str, config_sql: str, success_sql: str, sim_data_dict: Dict[str, Dict[int, str]], validation_data_paths: list[str], outdir: str, variant_metadata: Dict[str, Dict[int, Any]], variant_names: Dict[str, str], ): # Load simulation data with open_arbitrary_sim_data(sim_data_dict) as f: sim_data = pickle.load(f) # Extract molecule IDs for ribosomal subunits s30_protein_ids = sim_data.molecule_groups.s30_proteins s30_16s_rRNA_ids = sim_data.molecule_groups.s30_16s_rRNA s30_full_complex_id = sim_data.molecule_ids.s30_full_complex s50_protein_ids = sim_data.molecule_groups.s50_proteins s50_23s_rRNA_ids = sim_data.molecule_groups.s50_23s_rRNA s50_5s_rRNA_ids = sim_data.molecule_groups.s50_5s_rRNA s50_full_complex_id = sim_data.molecule_ids.s50_full_complex # Retrieve stoichiometry for each protein subunit complexation = sim_data.process.complexation s30_info = complexation.get_monomers(s30_full_complex_id) s50_info = complexation.get_monomers(s50_full_complex_id) s30_stoich = dict(zip(s30_info["subunitIds"], s30_info["subunitStoich"])) s50_stoich = dict(zip(s50_info["subunitIds"], s50_info["subunitStoich"])) # Map bulk IDs to SQL column indices bulk_ids = field_metadata(conn, config_sql, "bulk") bulk_index = {mid: idx for idx, mid in enumerate(bulk_ids)} # Determine column indexes in SQL for rRNAs and complexes s30_16s_idx = [bulk_index[i] for i in s30_16s_rRNA_ids if i in bulk_index] s50_23s_idx = [bulk_index[i] for i in s50_23s_rRNA_ids if i in bulk_index] s50_5s_idx = [bulk_index[i] for i in s50_5s_rRNA_ids if i in bulk_index] s30_complex_idx = bulk_index[s30_full_complex_id] s50_complex_idx = bulk_index[s50_full_complex_id] # Map monomer counts IDs to SQL column indices mono_ids = field_metadata(conn, config_sql, "listeners__monomer_counts") mono_index = {mid: idx for idx, mid in enumerate(mono_ids)} s30_protein_idx = [mono_index[i] for i in s30_protein_ids if i in mono_index] s50_protein_idx = [mono_index[i] for i in s50_protein_ids if i in mono_index] # Build named_idx spec for reading bulk_cols = [ named_idx("bulk", s30_16s_rRNA_ids, [s30_16s_idx]), named_idx("bulk", s50_23s_rRNA_ids, [s50_23s_idx]), named_idx("bulk", s50_5s_rRNA_ids, [s50_5s_idx]), named_idx("bulk", [s30_full_complex_id], [[s30_complex_idx]]), named_idx("bulk", [s50_full_complex_id], [[s50_complex_idx]]), ] protein_cols = [ named_idx("listeners__monomer_counts", [pid], [[idx]]) for pid, idx in zip( s30_protein_ids + s50_protein_ids, s30_protein_idx + s50_protein_idx ) ] additional = ["listeners__unique_molecule_counts__active_ribosome", "time"] cols = bulk_cols + protein_cols + additional # Read time-series data data = read_stacked_columns(history_sql, cols, conn=conn) df = pl.DataFrame(data).with_columns(Time_min=pl.col("time") / 60) # Sum rRNA counts horizontally s30_16s = pl.sum_horizontal([pl.col(i) for i in s30_16s_rRNA_ids]) s50_23s = pl.sum_horizontal([pl.col(i) for i in s50_23s_rRNA_ids]) s50_5s = pl.sum_horizontal([pl.col(i) for i in s50_5s_rRNA_ids]) # Extract complex and active ribosome counts s30_complex = pl.col(s30_full_complex_id) s50_complex = pl.col(s50_full_complex_id) active_ribo = pl.col("listeners__unique_molecule_counts__active_ribosome") # Adjust protein counts by stoichiometry for pid in s30_protein_ids: df = df.with_columns(**{f"adj_s30_{pid}": pl.col(pid) / s30_stoich[pid]}) for pid in s50_protein_ids: df = df.with_columns(**{f"adj_s50_{pid}": pl.col(pid) / s50_stoich[pid]}) # Determine limiting protein across subunits s30_lim = pl.min_horizontal([pl.col(f"adj_s30_{pid}") for pid in s30_protein_ids]) s50_lim = pl.min_horizontal([pl.col(f"adj_s50_{pid}") for pid in s50_protein_ids]) # Calculate total rRNA including complexes and active ribosomes df = df.with_columns( s30_16s_total=s30_16s + s30_complex + active_ribo, s50_23s_total=s50_23s + s50_complex + active_ribo, s50_5s_total=s50_5s + s50_complex + active_ribo, s30_limiting=s30_lim, s50_limiting=s50_lim, s30_total=s30_complex + active_ribo, s50_total=s50_complex + active_ribo, ) # ----------------------------------------- # plot_cols_30 = ["s30_limiting", "s30_16s_total", "s30_total"] plot_cols_50 = ["s50_limiting", "s50_23s_total", "s50_5s_total", "s50_total"] melt_30 = df.select(["Time_min"] + plot_cols_30).melt( id_vars="Time_min", variable_name="component", value_name="count" ) melt_50 = df.select(["Time_min"] + plot_cols_50).melt( id_vars="Time_min", variable_name="component", value_name="count" ) chart_30 = ( alt.Chart(melt_30) .mark_line() .encode( x="Time_min", y="count", color=alt.Color("component", title="30S Components"), ) .properties(title="30S Component Counts", width=600) ) chart_50 = ( alt.Chart(melt_50) .mark_line() .encode( x="Time_min", y="count", color=alt.Color("component", title="50S Components"), ) .properties(title="50S Component Counts", width=600) ) combined = ( alt.vconcat(chart_30, chart_50) .resolve_scale(color="independent") .resolve_legend(color="independent") ) combined.save(os.path.join(outdir, "ribosome_components.html"))