ecoli.library.parquet_emitter
- ecoli.library.parquet_emitter.EXPERIMENT_SCHEMA_SUFFIX = 'variant=0/lineage_seed=0/generation=1/agent_id=0/_metadata'
Hive partitioning suffix following experiment ID partition for Parquet file containing schema unified over all cells in this simulation.
- ecoli.library.parquet_emitter.METADATA_PREFIX = 'output_metadata__'
In the config dataset, user-defined metadata for each store (see
get_output_metadata()
) will be contained in columns with this prefix.
- class ecoli.library.parquet_emitter.ParquetEmitter(config)[source]
Bases:
Emitter
Emit data to a Parquet dataset.
Configure emitter.
- Parameters:
Should be a dictionary as follows:
{ 'type': 'parquet', 'emits_to_batch': Number of emits per Parquet row group (optional, default: 400), # One of the following is REQUIRED 'out_dir': local output directory (absolute/relative), 'out_uri': Google Cloud storage bucket URI }
- _finalize()[source]
Convert remaining batched emits to Parquet at sim shutdown. This also writes a
_metadata
file containing a unified Parquet schema as follows:1. Read up to 10 most recently written ``_metadata`` files from other simulations of the same experiment ID 2. Unifies those schemas with current sim schema (e.g. column that is NULL is current sim but float in another is promoted to float) 3. Write unified schema to ``_metadata`` file in current sim output folder 4. Write unified schema to ``_metadata`` file in output folder for sim of same experiment ID + :py:data:`~.EXPERIMENT_SCHEMA_SUFFIX`
Unless more than 10 simulations finish at the exact same time, the
_metadata
file in the folder for experiment ID +EXPERIMENT_SCHEMA_SUFFIX
should contain a schema that unifies the output of all finished simulations. The subquery generated byget_dataset_sql()
reads this file first to ensure that all columns are read as the correct type.
- emit(data)[source]
Flattens emit dictionary by concatenating nested key names with double underscores (:py:func:~.flatten_dict), serializes flattened emit with
orjson
, and writes newline-delimited JSONs in a temporary file to be batched for some number of timesteps before conversion to Parquet byjson_to_parquet()
.The output directory (
config["out_dir"]
orconfig["out_uri"]
) will have the following structure:{experiment_id} |-- history | |-- experiment_id={experiment_id} | | |-- variant={variant} | | | |-- lineage_seed={seed} | | | | |-- generation={generation} | | | | | |-- agent_id={agent_id} | | | | | | |-- _metadata (unified schema, see _finalize) | | | | | | |-- 400.pq (batched emits) | | | | | | |-- 800.pq | | | | | | |-- .. |-- configuration | |-- experiment_id={experiment_id} | | |-- variant={variant} | | | |-- lineage_seed={seed} | | | | |-- generation={generation} | | | | | |-- agent_id={agent_id} | | | | | | |-- config.pq (sim config data)
This Hive-partioned directory structure can be efficiently filtered and queried using DuckDB (see
get_dataset_sql()
).
- ecoli.library.parquet_emitter.USE_UINT16 = {'listeners__complexation_listener__complexation_events', 'listeners__ribosome_data__mRNA_TU_index', 'listeners__ribosome_data__n_ribosomes_on_each_mRNA', 'listeners__rna_degradation_listener__count_RNA_degraded_per_cistron', 'listeners__rna_degradation_listener__count_rna_degraded', 'listeners__rna_synth_prob__bound_TF_domains', 'listeners__rna_synth_prob__bound_TF_indexes', 'listeners__rna_synth_prob__expected_rna_init_per_cistron', 'listeners__rna_synth_prob__gene_copy_number', 'listeners__rna_synth_prob__n_bound_TF_per_TU', 'listeners__rna_synth_prob__n_bound_TF_per_cistron', 'listeners__rna_synth_prob__promoter_copy_number', 'listeners__rnap_data__active_rnap_domain_indexes', 'listeners__rnap_data__active_rnap_n_bound_ribosomes', 'listeners__rnap_data__rna_init_event', 'listeners__rnap_data__rna_init_event_per_cistron', 'listeners__transcript_elongation_listener__count_rna_synthesized'}
uint16 is 4x smaller than int64 for values between 0 - 65,535.
- ecoli.library.parquet_emitter.USE_UINT32 = {'listeners__fba_results__catalyst_counts', 'listeners__monomer_counts', 'listeners__ribosome_data__n_ribosomes_on_partial_mRNA_per_transcript', 'listeners__ribosome_data__n_ribosomes_per_transcript', 'listeners__ribosome_data__ribosome_init_event_per_monomer', 'listeners__rna_counts__full_mRNA_cistron_counts', 'listeners__rna_counts__full_mRNA_counts', 'listeners__rna_counts__mRNA_cistron_counts', 'listeners__rna_counts__mRNA_counts', 'listeners__rna_counts__partial_mRNA_cistron_counts', 'listeners__rna_counts__partial_mRNA_counts'}
uint32 is 2x smaller than int64 for values between 0 - 4,294,967,295.
- ecoli.library.parquet_emitter.flatten_dict(d)[source]
Flatten nested dictionary down to key-value pairs where each key concatenates all the keys needed to reach the corresponding value in the input. Allows each leaf field in a nested emit to be turned into a column in a Parquet file for efficient storage and retrieval.
- Parameters:
d (dict)
- ecoli.library.parquet_emitter.get_config_value(conn, config_subquery, field)[source]
Gets the saved configuration option (anything in config JSON, with double underscore concatenation for nested fields due to
flatten_dict()
).
- ecoli.library.parquet_emitter.get_dataset_sql(out_dir)[source]
Creates DuckDB SQL strings for sim configs and outputs.
- Parameters:
out_dir (str) – Path to output directory for workflows to retrieve data for (relative or absolute local path OR URI beginning with
gcs://
orgs://
for Google Cloud Storage bucket)- Returns:
2-element tuple containing
history_sql: SQL query for sim output (see
read_stacked_columns()
),config_sql: SQL query for sim configs (see
get_field_metadata()
andget_config_value()
)
- Return type:
- ecoli.library.parquet_emitter.get_encoding(val, field_name, use_uint16=False, use_uint32=False)[source]
Get optimal PyArrow type and Parquet encoding for input value.
- ecoli.library.parquet_emitter.get_field_metadata(conn, config_subquery, field)[source]
Gets the saved metadata (see
get_output_metadata()
) for a given field as a list.
- ecoli.library.parquet_emitter.get_plot_metadata(conn, config_subquery, variant_name)[source]
Gets dictionary that can be used as
metadata
kwarg towholecell.utils.plotting_tools.export_figure()
.
- ecoli.library.parquet_emitter.json_to_parquet(ndjson, encodings, schema, outfile, filesystem=None)[source]
Reads newline-delimited JSON file and converts to Parquet file.
- Parameters:
ndjson (str) – Path to newline-delimited JSON file.
encodings (dict[str, str]) – Mapping of column names to Parquet encodings
schema (Schema) – PyArrow schema of Parquet file to write
outfile (str) – Filepath of output Parquet file
filesystem (FileSystem | None) – PyArrow filesystem for Parquet output (local if None)
- ecoli.library.parquet_emitter.named_idx(col, names, idx)[source]
Create DuckDB expressions for given indices from a list column. Can be used in
projection
kwarg ofread_stacked_columns()
. Since each index gets pulled out into its own column, this greatly simplifies aggregations like averages, etc. Only use this if the number of indices is relatively small (<100) and the list column is 1-dimensional. For 2+ dimensions or >100 indices, seendidx_to_duckdb_expr()
.Warning
DuckDB arrays are 1-indexed so this function adds 1 to every supplied index!
- ecoli.library.parquet_emitter.ndarray_to_ndlist(arr)[source]
Convert a Numpy ndarray into a PyArrow FixedSizeListArray. This is useful for writing user-defined functions (see DuckDB documentation on Python Function API and
func
kwarg forread_stacked_columns()
) that expect a PyArrow return type.Note that the number of rows in the returned PyArrow array is equal to the size of the first dimension of the input array. This means for a 3 x 4 x 5 Numpy array, the return PyArrow array will have 3 rows where each row is a nested list with 4 lists of length 5.
- Parameters:
arr (ndarray)
- Return type:
FixedSizeListArray
- ecoli.library.parquet_emitter.ndidx_to_duckdb_expr(name, idx)[source]
Returns a DuckDB expression for a column equivalent to converting each row of
name
into an ndarrayname_arr
(ndlist_to_ndarray()
) and gettingname_arr[idx]
.idx
can contain 1D lists of integers, boolean masks, or":"
(no 2D+ indices likex[[[1,2]]]
). See alsonamed_idx()
if pulling out a relatively small set of indices.Warning
DuckDB arrays are 1-indexed so this function adds 1 to every supplied integer index!
- Parameters:
name (str) – Name of column to recursively index
idx (list[int | list[int | bool] | str]) –
To get all elements for a dimension, supply the string
":"
. Otherwise, only single integers or 1D integer lists of indices are allowed for each dimension. Some examples:[0, 1] # First row, second column [[0, 1], 1] # First and second row, second column [0, 1, ":"] # First element of axis 1, second of 2, all of 3 # Final example differs between this function and Numpy # This func: 1st and 2nd of axis 1, all of 2, 1st and 2nd of 3 # Numpy: Complicated, see Numpy docs on advanced indexing [[0, 1], ":", [0, 1]]
- Return type:
- ecoli.library.parquet_emitter.ndlist_to_ndarray(s)[source]
Convert a PyArrow series of nested lists with fixed dimensions into a Numpy ndarray. This should really only be necessary if you are trying to perform linear algebra (e.g. matrix multiplication, dot products) inside a user-defined function (see DuckDB documentation on Python Function API and
func
kwarg forread_stacked_columns()
).For elementwise arithmetic with two nested list columns, this can be used in combination with
ndarray_to_ndlist()
to define a custom DuckDB function as follows:import duckdb from ecoli.library.parquet_emitter import ( ndarray_to_ndlist, ndlist_to_ndarray) def sum_arrays(col_0, col_1): return ndarray_to_ndlist( ndlist_to_ndarray(col_0) + ndlist_to_ndarray(col_1) ) conn = duckdb.connect() conn.create_function( "sum_2d_int_arrays", # Function name for use in SQL (must be unique) sum_arrays, # Python function that takes and returns PyArrow arrays [list[list[int]], list[list[int]]], # Input types (2D lists here) list[list[int]], # Return type (2D list here) type = "arrow" # Tell DuckDB function operates on Arrow arrays ) conn.sql("SELECT sum_2d_int_arrays(int_col_0, int_col_1) from input_table") # Note that function must be registered under different name for each # set of unique input/output types conn.create_function( "sum_2d_int_and_float", sum_arrays, [list[list[int]], list[list[float]]], # Second input is 2D float array list[list[float]], # Adding int to float array gives float in Numpy type = "arrow" ) conn.sql("SELECT sum_2d_int_arrays(int_col_0, float_col_0) from input_table")
- Parameters:
s (Array)
- Return type:
- ecoli.library.parquet_emitter.num_cells(conn, subquery)[source]
Return cell count in DuckDB subquery containing
experiment_id
,variant
,lineage_seed
,generation
, andagent_id
columns.
- ecoli.library.parquet_emitter.open_arbitrary_sim_data(sim_data_dict)[source]
Given a mapping from experiment ID(s) to mappings from variant ID(s) to sim_data path(s), pick an arbitrary sim_data to read.
- ecoli.library.parquet_emitter.open_output_file(outfile)[source]
Open a file by its path, whether that be a path on local storage or Google Cloud Storage.
- Parameters:
outfile (str) – Path to file. Must have
gs://
orgcs://
prefix if on Google Cloud Storage. Can be relative or absolute path if on local storage.- Returns:
PyArrow file object that supports reading, seeking, etc. in bytes
- Return type:
NativeFile
- ecoli.library.parquet_emitter.read_stacked_columns(history_sql, columns, projections=None, remove_first=False, func=None, conn=None, order_results=True)[source]
Loads columns for many cells. If you would like to perform more advanced computatations (aggregations, window functions, etc.) using the optimized DuckDB API, you can omit
conn
, in which case this function will return an SQL string that can be used as a subquery. For computations that cannot be easily performed using the DuckDB API, you can define a custom functionfunc
that will be called on the data for each cell. By default, the return value (whether it be the actual data or an SQL subquery) will also include theexperiment_id
,variant
,lineage_seed
,generation
,agent_id
, andtime
columns.For example, to get the average total concentration of three bulk molecules with indices 100, 1000, and 10000 per cell:
import duckdb from ecoli.library.parquet_emitter import ( get_dataset_sql, read_stacked_columns) history_sql, config_sql = get_dataset_sql('out/') subquery = read_stacked_columns( history_sql, ["bulk", "listeners__enzyme_kinetics__counts_to_molar"], # Note DuckDB arrays are 1-indexed ["bulk[100 + 1] + bulk[1000 + 1] + bulk[10000 + 1] AS bulk_sum", "listeners__enzyme_kinetics__counts_to_molar AS counts_to_molar"], order_results=False, ) query = ''' SELECT avg(bulk_sum * counts_to_molar) AS avg_total_conc FROM ({subquery}) GROUP BY experiment_id, variant, lineage_seed, generation, agent_id ''' conn = duckdb.connect() data = conn.sql(query).arrow()
Here is a more complicated example that defines a custom function to get the per-cell average RNA synthesis probability per cistron:
import duckdb import pickle import pyarrow as pa from ecoli.library.parquet_emitter import ( get_dataset_sql, ndlist_to_ndarray, read_stacked_columns) history_sql, config_sql = get_dataset_sql('out/') # Load sim data with open("reconstruction/sim_data/kb/simData.cPickle", "rb") as f: sim_data = pickle.load(f) # Get mapping from RNAs (TUs) to cistrons cistron_tu_mat = sim_data.process.transcription.cistron_tu_mapping_matrix # Custom aggregation function with Numpy dot product and mean def avg_rna_synth_prob_per_cistron(rna_synth_prob): # Convert rna_synth_prob into 2-D Numpy array (time x TU) rna_synth_prob = ndlist_to_ndarray(rna_synth_prob[ "listeners__rna_synth_prob__actual_rna_synth_prob"]) rna_synth_prob_per_cistron = cistron_tu_mat.dot(rna_synth_prob.T).T # Return value must be a PyArrow table return pa.table({'avg_rna_synth_prob_per_cistron': [ rna_synth_prob_per_cistron.mean(axis=0)]}) conn = duckdb.connect() result = read_stacked_columns( history_sql, ["listeners__rna_synth_prob__actual_rna_synth_prob"], func=avg_rna_synth_prob_per_cistron, conn=conn, )
- Parameters:
history_sql (str) – DuckDB SQL string from
get_dataset_sql()
, potentially with filters appended inWHERE
clausecolumns (list[str]) – Names of columns to read data for. Unless you need to perform a computation involving multiple columns, calling this function many times with one column each time will use less RAM.
projections (list[str] | None) – Expressions to project from
columns
that are read. If not given, allcolumns
are projected as is.remove_first (bool) – Remove data for first timestep of each cell
func (Callable[[Table], Table] | None) – Function to call on data for each cell, should take and return a PyArrow table with columns equal to
columns
conn (DuckDBPyConnection | None) – DuckDB connection instance with which to run query. Typically provided by
runscripts.analysis.main()
to theplot
method of analysis scripts (tweaked some DuckDB settings). Can be omitted to return SQL query string to be used as subquery instead of running query immediately and returning result.order_results (bool) – Whether to sort returned table by
experiment_id
,variant
,lineage_seed
,generation
,agent_id
, andtime
. If noconn
is provided, this can usually be disabled and any sorting can be deferred until the last step in the query with a manualORDER BY
. Doing this can greatly reduce RAM usage.
- Return type:
Table | str