ecoli.library.parquet_emitter
- ecoli.library.parquet_emitter.METADATA_PREFIX = 'output_metadata__'
In the config dataset, user-defined metadata for each store (see
output_metadata()
) will be contained in columns with this prefix.
- class ecoli.library.parquet_emitter.ParquetEmitter(config)[source]
Bases:
Emitter
Emit data to a Parquet dataset.
Configure emitter.
- Parameters:
Should be a dictionary with the following keys:
{ 'batch_size': Number of emits per Parquet row group (optional, default: 400), 'threaded': Whether to write Parquet files in a background thread (optional, default: True), # One of the following is REQUIRED 'out_dir': local output directory (absolute/relative), 'out_uri': Google Cloud storage bucket URI }
- _finalize()[source]
Convert remaining batched emits to Parquet at sim shutdown and mark sim as successful if
success
flag was set. In vEcoli, this is done byEcoliSim
upon reaching division.
- emit(data)[source]
Flattens emit dictionary by concatenating nested key names with double underscores (
flatten_dict()
), buffers it in memory, and writes to a Parquet file upon buffering a configurable number of emits.The output directory (
config["out_dir"]
orconfig["out_uri"]
) will have the following structure:{experiment_id} |-- history | |-- experiment_id={experiment_id} | | |-- variant={variant} | | | |-- lineage_seed={seed} | | | | |-- generation={generation} | | | | | |-- agent_id={agent_id} | | | | | | |-- 400.pq (batched emits) | | | | | | |-- 800.pq | | | | | | |-- .. |-- configuration | |-- experiment_id={experiment_id} | | |-- variant={variant} | | | |-- lineage_seed={seed} | | | | |-- generation={generation} | | | | | |-- agent_id={agent_id} | | | | | | |-- config.pq (sim config data)
This Hive-partioned directory structure can be efficiently filtered and queried using DuckDB (see
dataset_sql()
).
- ecoli.library.parquet_emitter.USE_UINT16 = {'listeners__complexation_listener__complexation_events', 'listeners__ribosome_data__mRNA_TU_index', 'listeners__ribosome_data__n_ribosomes_on_each_mRNA', 'listeners__rna_degradation_listener__count_RNA_degraded_per_cistron', 'listeners__rna_degradation_listener__count_rna_degraded', 'listeners__rna_synth_prob__bound_TF_domains', 'listeners__rna_synth_prob__bound_TF_indexes', 'listeners__rna_synth_prob__expected_rna_init_per_cistron', 'listeners__rna_synth_prob__gene_copy_number', 'listeners__rna_synth_prob__n_bound_TF_per_TU', 'listeners__rna_synth_prob__n_bound_TF_per_cistron', 'listeners__rna_synth_prob__promoter_copy_number', 'listeners__rnap_data__active_rnap_domain_indexes', 'listeners__rnap_data__active_rnap_n_bound_ribosomes', 'listeners__rnap_data__rna_init_event', 'listeners__rnap_data__rna_init_event_per_cistron', 'listeners__transcript_elongation_listener__count_rna_synthesized'}
uint16 is 4x smaller than int64 for values between 0 - 65,535.
- ecoli.library.parquet_emitter.USE_UINT32 = {'listeners__fba_results__catalyst_counts', 'listeners__monomer_counts', 'listeners__ribosome_data__n_ribosomes_on_partial_mRNA_per_transcript', 'listeners__ribosome_data__n_ribosomes_per_transcript', 'listeners__ribosome_data__ribosome_init_event_per_monomer', 'listeners__rna_counts__full_mRNA_cistron_counts', 'listeners__rna_counts__full_mRNA_counts', 'listeners__rna_counts__mRNA_cistron_counts', 'listeners__rna_counts__mRNA_counts', 'listeners__rna_counts__partial_mRNA_cistron_counts', 'listeners__rna_counts__partial_mRNA_counts'}
uint32 is 2x smaller than int64 for values between 0 - 4,294,967,295.
- ecoli.library.parquet_emitter.config_value(conn, config_subquery, field)[source]
Gets the saved configuration option (anything in config JSON, with double underscore concatenation for nested fields due to
flatten_dict()
). Automatically quotes the field name to handle special characters. Do NOT use double quotes infield
.
- ecoli.library.parquet_emitter.create_duckdb_conn(temp_dir='/tmp', gcs=False, cpus=None)[source]
Create a DuckDB connection.
- ecoli.library.parquet_emitter.dataset_sql(out_dir, experiment_ids)[source]
Creates DuckDB SQL strings for sim outputs, configs, and metadata on which sims were successful.
- Parameters:
out_dir (str) – Path to output directory for workflows to retrieve data for (relative or absolute local path OR URI beginning with
gcs://
orgs://
for Google Cloud Storage bucket)experiment_ids (list[str]) – List of experiment IDs to include in query. To read data from more than one experiment ID, the listeners in the output of the first experiment ID in the list must be a strict subset of the listeners in the output of the subsequent experiment ID(s).
- Returns:
3-element tuple containing
history_sql: SQL query for sim output (see
read_stacked_columns()
),config_sql: SQL query for sim configs (see
field_metadata()
andconfig_value()
)success_sql: SQL query for metadata marking successful sims (see
read_stacked_columns()
)
- Return type:
- ecoli.library.parquet_emitter.field_metadata(conn, config_subquery, field)[source]
Gets the saved metadata (see
output_metadata()
) for a given field as a list. Automatically quotes the field name to handle special characters. Do NOT use double quotes infield
.
- ecoli.library.parquet_emitter.flatten_dict(d)[source]
Flatten nested dictionary down to key-value pairs where each key concatenates all the keys needed to reach the corresponding value in the input. Allows each leaf field in a nested emit to be turned into a column in a Parquet file for efficient storage and retrieval.
- Parameters:
d (dict)
- ecoli.library.parquet_emitter.json_to_parquet(emit_dict, outfile, schema, filesystem)[source]
Convert dictionary to Parquet.
- Parameters:
emit_dict (dict[str, ndarray | list[Series]]) – Mapping from column names to NumPy arrays (fixed-shape) or lists of Polars Series (variable-shape).
outfile (str) – Path to output Parquet file. Can be local path or URI.
schema (dict[str, Any]) – Full mapping of column names to Polars dtypes.
filesystem (AbstractFileSystem) – On local filesystem, fsspec filesystem needed to write Parquet file atomically.
- ecoli.library.parquet_emitter.named_idx(col, names, idx, zero_to_null=False, _quote_col=True)[source]
Create DuckDB expressions for given indices from a list column. Can be used in
projection
kwarg ofread_stacked_columns()
. Since each index gets pulled out into its own column, this greatly simplifies aggregations like averages, etc. Only use this if the number of indices is relatively small (<100) and the list column is 1-dimensional. For 2+ dimensions or >100 indices, seendidx_to_duckdb_expr()
. Automatically quotes column names to handle special characters. Do NOT use double quotes innames
orcol
.Warning
DuckDB arrays are 1-indexed so this function adds 1 to every supplied index!
- Parameters:
col (str) – Name of list column.
names (list[str]) – List of names for the new columns. Length must match the number of index combinations in
idx
(4 for example below).idx (list[list[int]]) – Integer indices to retrieve from each dimension of
col
. For example,[[0, 1], [2, 3]]
will retrieve the third and fourth elements of the second dimension for the first and second elements of the first dimension.zero_to_null (bool) – Whether to turn 0s into nulls. This is useful when dividing by the values in this column, as most DuckDB aggregation functions (e.g.
avg
,max
) propagate NaNs but ignore nulls._quote_col (bool) – Private argument to ensure
col
is quoted properly.
- Returns:
DuckDB SQL expression for a set of named columns corresponding to the values at given indices of a list column
- Return type:
- ecoli.library.parquet_emitter.ndidx_to_duckdb_expr(name, idx)[source]
Returns a DuckDB expression for a column equivalent to converting each row of
name
into an ndarrayname_arr
(ndlist_to_ndarray()
) and gettingname_arr[idx]
.idx
can contain 1D lists of integers, boolean masks, or":"
(no 2D+ indices likex[[[1,2]]]
). See alsonamed_idx()
if pulling out a relatively small set of indices. Automatically quotes column names to handle special characters. Do NOT use double quotes inname
.Warning
DuckDB arrays are 1-indexed so this function adds 1 to every supplied integer index!
- Parameters:
name (str) – Name of column to recursively index
idx (list[int | list[int] | list[bool] | str]) –
To get all elements for a dimension, supply the string
":"
. Otherwise, only single integers or 1D integer lists of indices are allowed for each dimension. Some examples:[0, 1] # First row, second column [[0, 1], 1] # First and second row, second column [0, 1, ":"] # First element of axis 1, second of 2, all of 3 # Final example differs between this function and Numpy # This func: 1st and 2nd of axis 1, all of 2, 1st and 2nd of 3 # Numpy: Complicated, see Numpy docs on advanced indexing [[0, 1], ":", [0, 1]]
- Return type:
- ecoli.library.parquet_emitter.ndlist_to_ndarray(s)[source]
Convert a PyArrow series of nested lists with fixed dimensions into a Numpy ndarray. This should really only be necessary if you are trying to perform linear algebra (e.g. matrix multiplication, dot products) inside a user-defined function (see DuckDB documentation on Python Function API and
func
kwarg forread_stacked_columns()
).For elementwise arithmetic of two nested list columns, this can be used to define a custom DuckDB function as follows:
import duckdb import polars as pl from ecoli.library.parquet_emitter import ndlist_to_ndarray def sum_arrays(col_0, col_1): return pl.Series( ndlist_to_ndarray(col_0) + ndlist_to_ndarray(col_1) ).to_arrow() conn = duckdb.connect() conn.create_function( "sum_2d_int_arrays", # Function name for use in SQL (must be unique) sum_arrays, # Python function that takes and returns PyArrow arrays [list[list[int]], list[list[int]]], # Input types (2D lists here) list[list[int]], # Return type (2D list here) type = "arrow" # Tell DuckDB function operates on Arrow arrays ) conn.sql("SELECT sum_2d_int_arrays(int_col_0, int_col_1) from input_table") # Note that function must be registered under different name for each # set of unique input/output types conn.create_function( "sum_2d_int_and_float", sum_arrays, [list[list[int]], list[list[float]]], # Second input is 2D float array list[list[float]], # Adding int to float array gives float in Numpy type = "arrow" ) conn.sql("SELECT sum_2d_int_and_float(int_col_0, float_col_0) from input_table")
- Return type:
- ecoli.library.parquet_emitter.np_dtype(val, field_name)[source]
Get NumPy type for input value. There are a few scenarios where this function raises an exception intentionally:
An internal value is None or an empty list/tuple: data is ragged/nullable and needs Polars serialization.
Python bytes type: NumPy only has fixed-length bytes type so use Polars serialization to avoid truncation.
Python datetime types: Simpler and less error-prone to use Polars serialization instead of converting to NumPy.
Warning
np.bytes_
values and arrays will get truncated to the size of the first encountered value. Convert to Python bytes type to avoid this.The
try...except
blocks inemit()
are designed to catch these exceptions and fall back to Polars serialization.All other exceptions raised by this function indicate that the value is of an unsupported type for which even the fall back Polars serialization likely will not work.
- ecoli.library.parquet_emitter.num_cells(conn, subquery)[source]
Return cell count in DuckDB subquery containing
experiment_id
,variant
,lineage_seed
,generation
, andagent_id
columns.
- ecoli.library.parquet_emitter.open_arbitrary_sim_data(sim_data_dict)[source]
Given a mapping from experiment ID(s) to mappings from variant ID(s) to sim_data path(s), pick an arbitrary sim_data to read.
- ecoli.library.parquet_emitter.open_output_file(outfile)[source]
Open a file by its path, whether that be a path on local storage or Google Cloud Storage.
- Parameters:
outfile (str) – Path to file. Must have
gs://
orgcs://
prefix if on Google Cloud Storage. Can be relative or absolute path if on local storage.- Returns:
File object that supports reading, seeking, etc. in bytes
- Return type:
OpenFile
- ecoli.library.parquet_emitter.pl_dtype_from_ndarray(arr)[source]
Get Polars data type for a Numpy array, including nested lists.
- ecoli.library.parquet_emitter.plot_metadata(conn, config_subquery, variant_name)[source]
Gets dictionary that can be used as
metadata
kwarg towholecell.utils.plotting_tools.export_figure()
.
- ecoli.library.parquet_emitter.read_stacked_columns(history_sql, columns, remove_first=False, func=None, conn=None, order_results=True, success_sql=None)[source]
Loads columns for many cells. If you would like to perform more advanced computatations (aggregations, window functions, etc.) using the optimized DuckDB API, you can omit
conn
, in which case this function will return an SQL string that can be used as a subquery. For computations that cannot be easily performed using the DuckDB API, you can define a custom functionfunc
that will be called on the data for each cell. By default, the return value (whether it be the actual data or an SQL subquery) will also include theexperiment_id
,variant
,lineage_seed
,generation
,agent_id
, andtime
columns.Warning
If the column expressions in
columns
are not fromnamed_idx()
orndidx_to_duckdb_expr()
, they may need to be enclosed in double quotes to handle special characters (e.g."col-with-hyphens"
).For example, to get the average total concentration of three bulk molecules with indices 100, 1000, and 10000 per cell:
import duckdb from ecoli.library.parquet_emitter import ( dataset_sql, read_stacked_columns) history_sql, config_sql, _ = dataset_sql('out/', 'exp_id') subquery = read_stacked_columns( history_sql, # Note DuckDB arrays are 1-indexed ["bulk[100 + 1] + bulk[1000 + 1] + bulk[10000 + 1] AS bulk_sum", "listeners__enzyme_kinetics__counts_to_molar AS counts_to_molar"], order_results=False, ) query = ''' SELECT avg(bulk_sum * counts_to_molar) AS avg_total_conc FROM ({subquery}) GROUP BY experiment_id, variant, lineage_seed, generation, agent_id ''' conn = duckdb.connect() data = conn.sql(query).pl()
Here is a more complicated example that defines a custom function to get the per-cell average RNA synthesis probability per cistron:
import duckdb import pickle from ecoli.library.parquet_emitter import ( dataset_sql, ndlist_to_ndarray, read_stacked_columns) history_sql, config_sql, _ = dataset_sql('out/', 'exp_id') # Load sim data with open("reconstruction/sim_data/kb/simData.cPickle", "rb") as f: sim_data = pickle.load(f) # Get mapping from RNAs (TUs) to cistrons cistron_tu_mat = sim_data.process.transcription.cistron_tu_mapping_matrix # Custom aggregation function with Numpy dot product and mean def avg_rna_synth_prob_per_cistron(rna_synth_prob): # Convert rna_synth_prob into 2-D Numpy array (time x TU) rna_synth_prob = ndlist_to_ndarray(rna_synth_prob[ "listeners__rna_synth_prob__actual_rna_synth_prob"]) rna_synth_prob_per_cistron = cistron_tu_mat.dot(rna_synth_prob.T).T # Return value must be a PyArrow table return pl.DataFrame({'avg_rna_synth_prob_per_cistron': [ rna_synth_prob_per_cistron.mean(axis=0)]}).to_arrow() conn = duckdb.connect() result = read_stacked_columns( history_sql, ["listeners__rna_synth_prob__actual_rna_synth_prob"], func=avg_rna_synth_prob_per_cistron, conn=conn, )
- Parameters:
history_sql (str) – DuckDB SQL string from
dataset_sql()
, potentially with filters appended inWHERE
clausecolumns (list[str]) – Names of columns to read data for. Alternatively, DuckDB expressions of columns (e.g.
avg(listeners__mass__cell_mass) AS avg_mass
or the output ofnamed_idx()
orndidx_to_duckdb_expr()
).remove_first (bool) – Remove data for first timestep of each cell
func (Callable[[DataFrame], DataFrame] | None) – Function to call on data for each cell, should take and return a Polars DataFrame with columns equal to
columns
conn (DuckDBPyConnection | None) – DuckDB connection instance with which to run query. Typically provided by
runscripts.analysis.main()
to theplot
method of analysis scripts (tweaked some DuckDB settings). Can be omitted to return SQL query string to be used as subquery instead of running query immediately and returning result.order_results (bool) – Whether to sort returned table by
experiment_id
,variant
,lineage_seed
,generation
,agent_id
, andtime
. If noconn
is provided, this can usually be disabled and any sorting can be deferred until the last step in the query with a manualORDER BY
. Doing this can greatly reduce RAM usage.success_sql (str | None) – Final DuckDB SQL string from
dataset_sql()
. If provided, will be used to filter out unsuccessful sims.
- Return type:
DataFrame | str
- ecoli.library.parquet_emitter.skip_n_gens(subquery, n)[source]
Modifies a DuckDB SQL query to skip the first
n
generations of data.
- ecoli.library.parquet_emitter.union_by_name(query_sql)[source]
Modifies SQL query string from
dataset_sql()
to includeunion_by_name = true
in the DuckDBread_parquet
function. This allows data to be read from simulations that have different columns by filling in nulls and casting as necessary. This comes with a performance penalty and should be avoided if possible.- Parameters:
query_sql (str) – SQL query string from
dataset_sql()
- Return type:
- ecoli.library.parquet_emitter.union_pl_dtypes(dt1, dt2, k, force_inner=None)[source]
Returns the more specific data type when combining two Polars datatypes. Mainly intended to fill out nested List types that contain Nulls.
- Parameters:
- Returns:
The resulting datatype
- Return type:
DataType | DataTypeClass