Source code for ecoli.analysis.multiseed.ribosome_spacing

import pickle
from typing import Any

from duckdb import DuckDBPyConnection
import numpy as np
import polars as pl

from ecoli.library.parquet_emitter import (
    open_arbitrary_sim_data,
    read_stacked_columns,
    get_field_metadata,
    ndidx_to_duckdb_expr,
)
from wholecell.utils import units


[docs] def plot( params: dict[str, Any], conn: DuckDBPyConnection, history_sql: str, config_sql: str, success_sql: str, sim_data_paths: dict[str, dict[int, str]], validation_data_paths: list[str], outdir: str, variant_metadata: dict[str, dict[int, Any]], variant_names: dict[str, str], ): with open_arbitrary_sim_data(sim_data_paths) as f: sim_data = pickle.load(f) # Map monomer IDs to cistron indices monomer_data = sim_data.process.translation.monomer_data.struct_array monomer_to_cistron_id = dict(zip(monomer_data["id"], monomer_data["cistron_id"])) mrna_cistron_ids = get_field_metadata( conn, config_sql, "listeners__rna_counts__mRNA_cistron_counts" ) cistron_idx_dict = {rna: i for i, rna in enumerate(mrna_cistron_ids)} monomer_ids = get_field_metadata(conn, config_sql, "listeners__monomer_counts") cistron_idx_for_monomers = [ cistron_idx_dict[monomer_to_cistron_id[monomer_id]] for monomer_id in monomer_ids ] monomer_counts_sql = read_stacked_columns( history_sql, [ "listeners__ribosome_data__ribosome_init_event_per_monomer AS init", "listeners__rna_counts__mRNA_cistron_counts AS mrna_counts", ], order_results=False, ) monomer_mrna_counts = ndidx_to_duckdb_expr( "mrna_counts", [cistron_idx_for_monomers] ) # Get aggregate ribosomes initiations per mRNA inits_per_mrna = conn.sql(f""" WITH extract_counts AS ( SELECT init, {monomer_mrna_counts}, experiment_id, variant, lineage_seed, generation, agent_id FROM ({monomer_counts_sql}) ), unnested AS ( SELECT unnest(init) AS init, unnest(mrna_counts) AS mrna_counts, generate_subscripts(init, 1) AS idx, experiment_id, variant, lineage_seed, generation, agent_id FROM extract_counts ), ratio AS ( SELECT CASE WHEN mrna_counts = 0 THEN 0 ELSE init / mrna_counts END AS inits_per_mrna, idx, experiment_id, variant, lineage_seed, generation, agent_id FROM unnested ) SELECT idx, max(inits_per_mrna) AS max_inits, avg(inits_per_mrna) AS avg_inits, FROM ratio GROUP BY idx, ORDER BY avg_inits DESC """).pl() ribosome_footprint_size = ( sim_data.process.translation.active_ribosome_footprint_size.asNumber(units.nt) ) nutrients = sim_data.conditions[sim_data.condition]["nutrients"] ribosome_elongation_rate = ( sim_data.process.translation.ribosomeElongationRateDict[nutrients].asNumber( units.aa / units.s ) * 3 ) ribosome_spacing = pl.DataFrame( [ pl.Series(monomer_ids)[inits_per_mrna["idx"] - 1].alias("Monomer ID"), (ribosome_elongation_rate / inits_per_mrna["avg_inits"]).alias( "Average Ribosome Spacing (nt)" ), (ribosome_elongation_rate / inits_per_mrna["max_inits"]).alias( "Minimum Ribosome Spacing (nt)" ), pl.Series(np.full(len(inits_per_mrna), ribosome_footprint_size)).alias( "Ribosome Footprint Size (nt)" ), ] ) ribosome_spacing = ribosome_spacing.select( ["Monomer ID"] + [ pl.when(pl.col(col).is_infinite()) .then(None) .otherwise(pl.col(col)) .alias(col) for col in ribosome_spacing.columns if col != "Monomer ID" ] ) ribosome_spacing.write_csv(f"{outdir}/ribosome_spacing.csv")