ecoli.library.parquet_emitter

ecoli.library.parquet_emitter.EXPERIMENT_SCHEMA_SUFFIX = 'variant=0/lineage_seed=0/generation=1/agent_id=0/_metadata'

Hive partitioning suffix following experiment ID partition for Parquet file containing schema unified over all cells in this simulation.

ecoli.library.parquet_emitter.METADATA_PREFIX = 'output_metadata__'

In the config dataset, user-defined metadata for each store (see get_output_metadata()) will be contained in columns with this prefix.

class ecoli.library.parquet_emitter.ParquetEmitter(config)[source]

Bases: Emitter

Emit data to a Parquet dataset.

Configure emitter.

Parameters:

config (dict[str, Any]) –

Should be a dictionary as follows:

{
    'type': 'parquet',
    'emits_to_batch': Number of emits per Parquet row
        group (optional, default: 400),
    # One of the following is REQUIRED
    'out_dir': local output directory (absolute/relative),
    'out_uri': Google Cloud storage bucket URI
}

_finalize()[source]

Convert remaining batched emits to Parquet at sim shutdown. This also writes a _metadata file containing a unified Parquet schema as follows:

1. Read up to 10 most recently written ``_metadata`` files from other
    simulations of the same experiment ID
2. Unifies those schemas with current sim schema (e.g. column that is
    NULL is current sim but float in another is promoted to float)
3. Write unified schema to ``_metadata`` file in current sim output folder
4. Write unified schema to ``_metadata`` file in output folder for
    sim of same experiment ID + :py:data:`~.EXPERIMENT_SCHEMA_SUFFIX`

Unless more than 10 simulations finish at the exact same time, the _metadata file in the folder for experiment ID + EXPERIMENT_SCHEMA_SUFFIX should contain a schema that unifies the output of all finished simulations. The subquery generated by get_dataset_sql() reads this file first to ensure that all columns are read as the correct type.

emit(data)[source]

Flattens emit dictionary by concatenating nested key names with double underscores (:py:func:~.flatten_dict), serializes flattened emit with orjson, and writes newline-delimited JSONs in a temporary file to be batched for some number of timesteps before conversion to Parquet by json_to_parquet().

The output directory (config["out_dir"] or config["out_uri"]) will have the following structure:

{experiment_id}
|-- history
|   |-- experiment_id={experiment_id}
|   |   |-- variant={variant}
|   |   |   |-- lineage_seed={seed}
|   |   |   |   |-- generation={generation}
|   |   |   |   |   |-- agent_id={agent_id}
|   |   |   |   |   |   |-- _metadata (unified schema, see _finalize)
|   |   |   |   |   |   |-- 400.pq (batched emits)
|   |   |   |   |   |   |-- 800.pq
|   |   |   |   |   |   |-- ..
|-- configuration
|   |-- experiment_id={experiment_id}
|   |   |-- variant={variant}
|   |   |   |-- lineage_seed={seed}
|   |   |   |   |-- generation={generation}
|   |   |   |   |   |-- agent_id={agent_id}
|   |   |   |   |   |   |-- config.pq (sim config data)

This Hive-partioned directory structure can be efficiently filtered and queried using DuckDB (see get_dataset_sql()).

Parameters:

data (dict[str, Any])

ecoli.library.parquet_emitter.USE_UINT16 = {'listeners__complexation_listener__complexation_events', 'listeners__ribosome_data__mRNA_TU_index', 'listeners__ribosome_data__n_ribosomes_on_each_mRNA', 'listeners__rna_degradation_listener__count_RNA_degraded_per_cistron', 'listeners__rna_degradation_listener__count_rna_degraded', 'listeners__rna_synth_prob__bound_TF_domains', 'listeners__rna_synth_prob__bound_TF_indexes', 'listeners__rna_synth_prob__expected_rna_init_per_cistron', 'listeners__rna_synth_prob__gene_copy_number', 'listeners__rna_synth_prob__n_bound_TF_per_TU', 'listeners__rna_synth_prob__n_bound_TF_per_cistron', 'listeners__rna_synth_prob__promoter_copy_number', 'listeners__rnap_data__active_rnap_domain_indexes', 'listeners__rnap_data__active_rnap_n_bound_ribosomes', 'listeners__rnap_data__rna_init_event', 'listeners__rnap_data__rna_init_event_per_cistron', 'listeners__transcript_elongation_listener__count_rna_synthesized'}

uint16 is 4x smaller than int64 for values between 0 - 65,535.

ecoli.library.parquet_emitter.USE_UINT32 = {'listeners__fba_results__catalyst_counts', 'listeners__monomer_counts', 'listeners__ribosome_data__n_ribosomes_on_partial_mRNA_per_transcript', 'listeners__ribosome_data__n_ribosomes_per_transcript', 'listeners__ribosome_data__ribosome_init_event_per_monomer', 'listeners__rna_counts__full_mRNA_cistron_counts', 'listeners__rna_counts__full_mRNA_counts', 'listeners__rna_counts__mRNA_cistron_counts', 'listeners__rna_counts__mRNA_counts', 'listeners__rna_counts__partial_mRNA_cistron_counts', 'listeners__rna_counts__partial_mRNA_counts'}

uint32 is 2x smaller than int64 for values between 0 - 4,294,967,295.

ecoli.library.parquet_emitter.flatten_dict(d)[source]

Flatten nested dictionary down to key-value pairs where each key concatenates all the keys needed to reach the corresponding value in the input. Allows each leaf field in a nested emit to be turned into a column in a Parquet file for efficient storage and retrieval.

Parameters:

d (dict)

ecoli.library.parquet_emitter.get_config_value(conn, config_subquery, field)[source]

Gets the saved configuration option (anything in config JSON, with double underscore concatenation for nested fields due to flatten_dict()).

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • field (str) – Name of configuration option to get value of

Return type:

Any

ecoli.library.parquet_emitter.get_dataset_sql(out_dir)[source]

Creates DuckDB SQL strings for sim configs and outputs.

Parameters:

out_dir (str) – Path to output directory for workflows to retrieve data for (relative or absolute local path OR URI beginning with gcs:// or gs:// for Google Cloud Storage bucket)

Returns:

2-element tuple containing

Return type:

tuple[str, str]

ecoli.library.parquet_emitter.get_encoding(val, field_name, use_uint16=False, use_uint32=False)[source]

Get optimal PyArrow type and Parquet encoding for input value.

Parameters:
Return type:

tuple[Any, str, str, bool]

ecoli.library.parquet_emitter.get_field_metadata(conn, config_subquery, field)[source]

Gets the saved metadata (see get_output_metadata()) for a given field as a list.

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • field (str) – Name of field to get metadata for

Return type:

list

ecoli.library.parquet_emitter.get_plot_metadata(conn, config_subquery, variant_name)[source]

Gets dictionary that can be used as metadata kwarg to wholecell.utils.plotting_tools.export_figure().

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • variant_name (str) – Name of variant

Return type:

dict[str, Any]

ecoli.library.parquet_emitter.json_to_parquet(ndjson, encodings, schema, outfile, filesystem=None)[source]

Reads newline-delimited JSON file and converts to Parquet file.

Parameters:
  • ndjson (str) – Path to newline-delimited JSON file.

  • encodings (dict[str, str]) – Mapping of column names to Parquet encodings

  • schema (Schema) – PyArrow schema of Parquet file to write

  • outfile (str) – Filepath of output Parquet file

  • filesystem (FileSystem | None) – PyArrow filesystem for Parquet output (local if None)

ecoli.library.parquet_emitter.named_idx(col, names, idx)[source]

Create DuckDB expressions for given indices from a list column. Can be used in projection kwarg of read_stacked_columns(). Since each index gets pulled out into its own column, this greatly simplifies aggregations like averages, etc. Only use this if the number of indices is relatively small (<100) and the list column is 1-dimensional. For 2+ dimensions or >100 indices, see ndidx_to_duckdb_expr().

Warning

DuckDB arrays are 1-indexed so this function adds 1 to every supplied index!

Parameters:
  • col (str) – Name of list column.

  • names (list[str]) – New column names, one for each index.

  • idx (list[int]) – Indices to retrieve from col

Returns:

DuckDB SQL expression for a set of named columns corresponding to the values at given indices of a list column

Return type:

str

ecoli.library.parquet_emitter.ndarray_to_ndlist(arr)[source]

Convert a Numpy ndarray into a PyArrow FixedSizeListArray. This is useful for writing user-defined functions (see DuckDB documentation on Python Function API and func kwarg for read_stacked_columns()) that expect a PyArrow return type.

Note that the number of rows in the returned PyArrow array is equal to the size of the first dimension of the input array. This means for a 3 x 4 x 5 Numpy array, the return PyArrow array will have 3 rows where each row is a nested list with 4 lists of length 5.

Parameters:

arr (ndarray)

Return type:

FixedSizeListArray

ecoli.library.parquet_emitter.ndidx_to_duckdb_expr(name, idx)[source]

Returns a DuckDB expression for a column equivalent to converting each row of name into an ndarray name_arr (ndlist_to_ndarray()) and getting name_arr[idx]. idx can contain 1D lists of integers, boolean masks, or ":" (no 2D+ indices like x[[[1,2]]]). See also named_idx() if pulling out a relatively small set of indices.

Warning

DuckDB arrays are 1-indexed so this function adds 1 to every supplied integer index!

Parameters:
  • name (str) – Name of column to recursively index

  • idx (list[int | list[int | bool] | str]) –

    To get all elements for a dimension, supply the string ":". Otherwise, only single integers or 1D integer lists of indices are allowed for each dimension. Some examples:

    [0, 1] # First row, second column
    [[0, 1], 1] # First and second row, second column
    [0, 1, ":"] # First element of axis 1, second of 2, all of 3
    # Final example differs between this function and Numpy
    # This func: 1st and 2nd of axis 1, all of 2, 1st and 2nd of 3
    # Numpy: Complicated, see Numpy docs on advanced indexing
    [[0, 1], ":", [0, 1]]
    

Return type:

str

ecoli.library.parquet_emitter.ndlist_to_ndarray(s)[source]

Convert a PyArrow series of nested lists with fixed dimensions into a Numpy ndarray. This should really only be necessary if you are trying to perform linear algebra (e.g. matrix multiplication, dot products) inside a user-defined function (see DuckDB documentation on Python Function API and func kwarg for read_stacked_columns()).

For elementwise arithmetic with two nested list columns, this can be used in combination with ndarray_to_ndlist() to define a custom DuckDB function as follows:

import duckdb
from ecoli.library.parquet_emitter import (
    ndarray_to_ndlist, ndlist_to_ndarray)
def sum_arrays(col_0, col_1):
    return ndarray_to_ndlist(
        ndlist_to_ndarray(col_0) +
        ndlist_to_ndarray(col_1)
    )
conn = duckdb.connect()
conn.create_function(
    "sum_2d_int_arrays", # Function name for use in SQL (must be unique)
    sum_arrays, # Python function that takes and returns PyArrow arrays
    [list[list[int]], list[list[int]]], # Input types (2D lists here)
    list[list[int]], # Return type (2D list here)
    type = "arrow" # Tell DuckDB function operates on Arrow arrays
)
conn.sql("SELECT sum_2d_int_arrays(int_col_0, int_col_1) from input_table")
# Note that function must be registered under different name for each
# set of unique input/output types
conn.create_function(
    "sum_2d_int_and_float",
    sum_arrays,
    [list[list[int]], list[list[float]]], # Second input is 2D float array
    list[list[float]], # Adding int to float array gives float in Numpy
    type = "arrow"
)
conn.sql("SELECT sum_2d_int_arrays(int_col_0, float_col_0) from input_table")
Parameters:

s (Array)

Return type:

ndarray

ecoli.library.parquet_emitter.num_cells(conn, subquery)[source]

Return cell count in DuckDB subquery containing experiment_id, variant, lineage_seed, generation, and agent_id columns.

Parameters:
  • conn (DuckDBPyConnection)

  • subquery (str)

Return type:

int

ecoli.library.parquet_emitter.open_arbitrary_sim_data(sim_data_dict)[source]

Given a mapping from experiment ID(s) to mappings from variant ID(s) to sim_data path(s), pick an arbitrary sim_data to read.

Parameters:

sim_data_dict (dict[str, dict[int, Any]]) – Generated by runscripts.analysis and passed to each analysis script as an argument.

Returns:

PyArrow file object for arbitrarily chosen sim_data to be loaded with pickle.load

Return type:

NativeFile

ecoli.library.parquet_emitter.open_output_file(outfile)[source]

Open a file by its path, whether that be a path on local storage or Google Cloud Storage.

Parameters:

outfile (str) – Path to file. Must have gs:// or gcs:// prefix if on Google Cloud Storage. Can be relative or absolute path if on local storage.

Returns:

PyArrow file object that supports reading, seeking, etc. in bytes

Return type:

NativeFile

ecoli.library.parquet_emitter.read_stacked_columns(history_sql, columns, projections=None, remove_first=False, func=None, conn=None, order_results=True)[source]

Loads columns for many cells. If you would like to perform more advanced computatations (aggregations, window functions, etc.) using the optimized DuckDB API, you can omit conn, in which case this function will return an SQL string that can be used as a subquery. For computations that cannot be easily performed using the DuckDB API, you can define a custom function func that will be called on the data for each cell. By default, the return value (whether it be the actual data or an SQL subquery) will also include the experiment_id, variant, lineage_seed, generation, agent_id, and time columns.

For example, to get the average total concentration of three bulk molecules with indices 100, 1000, and 10000 per cell:

import duckdb
from ecoli.library.parquet_emitter import (
    get_dataset_sql, read_stacked_columns)
history_sql, config_sql = get_dataset_sql('out/')
subquery = read_stacked_columns(
    history_sql,
    ["bulk", "listeners__enzyme_kinetics__counts_to_molar"],
    # Note DuckDB arrays are 1-indexed
    ["bulk[100 + 1] + bulk[1000 + 1] + bulk[10000 + 1] AS bulk_sum",
    "listeners__enzyme_kinetics__counts_to_molar AS counts_to_molar"],
    order_results=False,
)
query = '''
    SELECT avg(bulk_sum * counts_to_molar) AS avg_total_conc
    FROM ({subquery})
    GROUP BY experiment_id, variant, lineage_seed, generation, agent_id
    '''
conn = duckdb.connect()
data = conn.sql(query).arrow()

Here is a more complicated example that defines a custom function to get the per-cell average RNA synthesis probability per cistron:

import duckdb
import pickle
import pyarrow as pa
from ecoli.library.parquet_emitter import (
    get_dataset_sql, ndlist_to_ndarray, read_stacked_columns)
history_sql, config_sql = get_dataset_sql('out/')
# Load sim data
with open("reconstruction/sim_data/kb/simData.cPickle", "rb") as f:
    sim_data = pickle.load(f)
# Get mapping from RNAs (TUs) to cistrons
cistron_tu_mat = sim_data.process.transcription.cistron_tu_mapping_matrix
# Custom aggregation function with Numpy dot product and mean
def avg_rna_synth_prob_per_cistron(rna_synth_prob):
    # Convert rna_synth_prob into 2-D Numpy array (time x TU)
    rna_synth_prob = ndlist_to_ndarray(rna_synth_prob[
        "listeners__rna_synth_prob__actual_rna_synth_prob"])
    rna_synth_prob_per_cistron = cistron_tu_mat.dot(rna_synth_prob.T).T
    # Return value must be a PyArrow table
    return pa.table({'avg_rna_synth_prob_per_cistron': [
        rna_synth_prob_per_cistron.mean(axis=0)]})
conn = duckdb.connect()
result = read_stacked_columns(
    history_sql,
    ["listeners__rna_synth_prob__actual_rna_synth_prob"],
    func=avg_rna_synth_prob_per_cistron,
    conn=conn,
)
Parameters:
  • history_sql (str) – DuckDB SQL string from get_dataset_sql(), potentially with filters appended in WHERE clause

  • columns (list[str]) – Names of columns to read data for. Unless you need to perform a computation involving multiple columns, calling this function many times with one column each time will use less RAM.

  • projections (list[str] | None) – Expressions to project from columns that are read. If not given, all columns are projected as is.

  • remove_first (bool) – Remove data for first timestep of each cell

  • func (Callable[[Table], Table] | None) – Function to call on data for each cell, should take and return a PyArrow table with columns equal to columns

  • conn (DuckDBPyConnection | None) – DuckDB connection instance with which to run query. Typically provided by runscripts.analysis.main() to the plot method of analysis scripts (tweaked some DuckDB settings). Can be omitted to return SQL query string to be used as subquery instead of running query immediately and returning result.

  • order_results (bool) – Whether to sort returned table by experiment_id, variant, lineage_seed, generation, agent_id, and time. If no conn is provided, this can usually be disabled and any sorting can be deferred until the last step in the query with a manual ORDER BY. Doing this can greatly reduce RAM usage.

Return type:

Table | str

ecoli.library.parquet_emitter.skip_n_gens(subquery, n)[source]

Modifies a DuckDB SQL query to skip the first n generations of data.

Parameters:
Return type:

str