Source code for ecoli.analysis.multiseed.subgenerational_expression_table

"""
Generates a table of genes that are subgenerationally expressed, with their
expression frequencies and average/maximum mRNA/protein counts.
"""

import pickle
import os
from typing import Any

# noinspection PyUnresolvedReferences
from duckdb import DuckDBPyConnection
import polars as pl

from ecoli.library.parquet_emitter import (
    get_field_metadata,
    ndidx_to_duckdb_expr,
    num_cells,
    open_arbitrary_sim_data,
    read_stacked_columns,
    skip_n_gens,
)


IGNORE_FIRST_N_GENS = 8


[docs] def plot( params: dict[str, Any], conn: DuckDBPyConnection, history_sql: str, config_sql: str, sim_data_dict: dict[str, dict[int, str]], validation_data_paths: list[str], outdir: str, variant_metadata: dict[str, dict[int, Any]], variant_names: dict[str, str], ): with open_arbitrary_sim_data(sim_data_dict) as f: sim_data = pickle.load(f) # Ignore first N generations history_sql = skip_n_gens(history_sql, IGNORE_FIRST_N_GENS) config_sql = skip_n_gens(config_sql, IGNORE_FIRST_N_GENS) if num_cells(conn, config_sql) == 0: print("Skipping analysis - not enough generations run.") return # Get list of cistron IDs from sim_data cistron_data = sim_data.process.transcription.cistron_data cistron_ids = cistron_data["id"] # Filter list for cistron IDs with associated protein ids cistron_id_to_protein_id = { protein["cistron_id"]: protein["id"] for protein in sim_data.process.translation.monomer_data } mRNA_cistron_ids = [ cistron_id for cistron_id in cistron_ids if cistron_id in cistron_id_to_protein_id ] # Get IDs of associated monomers and genes monomer_ids = [ cistron_id_to_protein_id.get(cistron_id, None) for cistron_id in mRNA_cistron_ids ] cistron_id_to_gene_id = { cistron["id"]: cistron["gene_id"] for cistron in cistron_data } gene_ids = [cistron_id_to_gene_id[cistron_id] for cistron_id in mRNA_cistron_ids] # Get subcolumn for mRNA cistron IDs in RNA counts table mRNA_cistron_ids_rna_counts_table = get_field_metadata( conn, config_sql, "listeners__rna_counts__mRNA_cistron_counts" ) # Get indexes of mRNA cistrons in this subcolums (DuckDB lists are 1-indexed) mRNA_cistron_id_to_index = { cistron_id: i + 1 for (i, cistron_id) in enumerate(mRNA_cistron_ids_rna_counts_table) } mRNA_cistron_indexes = [ mRNA_cistron_id_to_index[cistron_id] for cistron_id in mRNA_cistron_ids ] # Get subcolumn for monomer IDs in monomer counts table monomer_ids_monomer_counts_table = get_field_metadata( conn, config_sql, "listeners__monomer_counts" ) # Get indexes of monomers in this subcolumn (DuckDB lists are 1-indexed) monomer_id_to_index = { monomer_id: i + 1 for (i, monomer_id) in enumerate(monomer_ids_monomer_counts_table) } monomer_indexes = [monomer_id_to_index[monomer_id] for monomer_id in monomer_ids] monomer_expr = ndidx_to_duckdb_expr("listeners__monomer_counts", [monomer_indexes]) cistron_expr = ndidx_to_duckdb_expr( "listeners__rna_counts__mRNA_cistron_counts", [mRNA_cistron_indexes] ) subquery = read_stacked_columns( history_sql, ["listeners__monomer_counts", "listeners__rna_counts__mRNA_cistron_counts"], [f"{monomer_expr} AS monomer_counts", f"{cistron_expr} AS mrna_counts"], order_results=False, ) out_df = conn.sql( f""" -- Unnest monomer and mRNA count columns, labelling with -- index so we can later calculate per-cistron aggregates WITH unnested_counts AS ( SELECT lineage_seed, generation, agent_id, unnest(monomer_counts) AS monomer_counts, unnest(mrna_counts) AS mrna_counts, generate_subscripts(mrna_counts, 1) AS cistron_idx FROM {subquery} ), -- Group by cell and cistron to get existence of each mRNA per cell cell_aggregate AS ( SELECT SUM(mrna_counts) > 0 AS exists, MAX(monomer_counts) AS max_monomer_counts, MAX(mrna_counts) AS max_mRNA_counts, cistron_idx FROM unnested_counts GROUP BY lineage_seed, generation, agent_id, cistron_idx ), full_aggregate AS ( SELECT -- Calculate probability that mRNA exists per cell cycle AVG(exists::INTEGER) AS p_expressed, -- Get maximum mRNA and monomer counts across all cells and times MAX(max_monomer_counts) AS max_monomer_counts, MAX(max_mRNA_counts) AS max_mRNA_counts, cistron_idx FROM cell_aggregate GROUP BY cistron_idx ) SELECT * FROM full_aggregate -- Filter to only include sub-generational genes WHERE p_expressed > 0 AND p_expressed < 1 """ ).pl() # Add gene, cistron, and protein names (DuckDB lists are 1-indexed so # must subtract one before using to index Numpy arrays) out_df = out_df.with_columns( gene_name=pl.Series(gene_ids)[out_df["cistron_idx"] - 1], cistron_name=pl.Series(mRNA_cistron_ids)[out_df["cistron_idx"] - 1], protein_name=pl.Series([i[:-3] for i in monomer_ids])[ out_df["cistron_idx"] - 1 ], ) out_df.write_csv(os.path.join(outdir, "subgen.tsv"), separator="\t")