ecoli.library.parquet_emitter

class ecoli.library.parquet_emitter.BlockingExecutor[source]

Bases: object

submit(fn, *args, **kwargs)[source]

Run function in the current thread and return a Future that is already done.

Parameters:

fn (Callable)

Return type:

Future

ecoli.library.parquet_emitter.METADATA_PREFIX = 'output_metadata__'

In the config dataset, user-defined metadata for each store (see output_metadata()) will be contained in columns with this prefix.

class ecoli.library.parquet_emitter.ParquetEmitter(config)[source]

Bases: Emitter

Emit data to a Parquet dataset.

Configure emitter.

Parameters:

config (dict[str, Any]) –

Should be a dictionary with the following keys:

{
    'batch_size': Number of emits per Parquet row
        group (optional, default: 400),
    'threaded': Whether to write Parquet files
        in a background thread (optional, default: True),
    # One of the following is REQUIRED
    'out_dir': local output directory (absolute/relative),
    'out_uri': Google Cloud storage bucket URI
}

_finalize()[source]

Convert remaining batched emits to Parquet at sim shutdown and mark sim as successful if success flag was set. In vEcoli, this is done by EcoliSim upon reaching division.

emit(data)[source]

Flattens emit dictionary by concatenating nested key names with double underscores (flatten_dict()), buffers it in memory, and writes to a Parquet file upon buffering a configurable number of emits.

The output directory (config["out_dir"] or config["out_uri"]) will have the following structure:

{experiment_id}
|-- history
|   |-- experiment_id={experiment_id}
|   |   |-- variant={variant}
|   |   |   |-- lineage_seed={seed}
|   |   |   |   |-- generation={generation}
|   |   |   |   |   |-- agent_id={agent_id}
|   |   |   |   |   |   |-- 400.pq (batched emits)
|   |   |   |   |   |   |-- 800.pq
|   |   |   |   |   |   |-- ..
|-- configuration
|   |-- experiment_id={experiment_id}
|   |   |-- variant={variant}
|   |   |   |-- lineage_seed={seed}
|   |   |   |   |-- generation={generation}
|   |   |   |   |   |-- agent_id={agent_id}
|   |   |   |   |   |   |-- config.pq (sim config data)

This Hive-partioned directory structure can be efficiently filtered and queried using DuckDB (see dataset_sql()).

Parameters:

data (dict[str, Any])

ecoli.library.parquet_emitter.USE_UINT16 = {'listeners__complexation_listener__complexation_events', 'listeners__ribosome_data__mRNA_TU_index', 'listeners__ribosome_data__n_ribosomes_on_each_mRNA', 'listeners__rna_degradation_listener__count_RNA_degraded_per_cistron', 'listeners__rna_degradation_listener__count_rna_degraded', 'listeners__rna_synth_prob__bound_TF_domains', 'listeners__rna_synth_prob__bound_TF_indexes', 'listeners__rna_synth_prob__expected_rna_init_per_cistron', 'listeners__rna_synth_prob__gene_copy_number', 'listeners__rna_synth_prob__n_bound_TF_per_TU', 'listeners__rna_synth_prob__n_bound_TF_per_cistron', 'listeners__rna_synth_prob__promoter_copy_number', 'listeners__rnap_data__active_rnap_domain_indexes', 'listeners__rnap_data__active_rnap_n_bound_ribosomes', 'listeners__rnap_data__rna_init_event', 'listeners__rnap_data__rna_init_event_per_cistron', 'listeners__transcript_elongation_listener__count_rna_synthesized'}

uint16 is 4x smaller than int64 for values between 0 - 65,535.

ecoli.library.parquet_emitter.USE_UINT32 = {'listeners__fba_results__catalyst_counts', 'listeners__monomer_counts', 'listeners__ribosome_data__n_ribosomes_on_partial_mRNA_per_transcript', 'listeners__ribosome_data__n_ribosomes_per_transcript', 'listeners__ribosome_data__ribosome_init_event_per_monomer', 'listeners__rna_counts__full_mRNA_cistron_counts', 'listeners__rna_counts__full_mRNA_counts', 'listeners__rna_counts__mRNA_cistron_counts', 'listeners__rna_counts__mRNA_counts', 'listeners__rna_counts__partial_mRNA_cistron_counts', 'listeners__rna_counts__partial_mRNA_counts'}

uint32 is 2x smaller than int64 for values between 0 - 4,294,967,295.

ecoli.library.parquet_emitter.config_value(conn, config_subquery, field)[source]

Gets the saved configuration option (anything in config JSON, with double underscore concatenation for nested fields due to flatten_dict()). Automatically quotes the field name to handle special characters. Do NOT use double quotes in field.

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • field (str) – Name of configuration option to get value of

Return type:

Any

ecoli.library.parquet_emitter.create_duckdb_conn(temp_dir='/tmp', gcs=False, cpus=None)[source]

Create a DuckDB connection.

Parameters:
  • temp_dir (str) – Temporary directory for spilling to disk.

  • gcs (bool) – Set to True if reading from Google Cloud Storage.

  • cpus (int | None) – Number of cores to use (by default, use all detected cores).

Return type:

DuckDBPyConnection

ecoli.library.parquet_emitter.dataset_sql(out_dir, experiment_ids)[source]

Creates DuckDB SQL strings for sim outputs, configs, and metadata on which sims were successful.

Parameters:
  • out_dir (str) – Path to output directory for workflows to retrieve data for (relative or absolute local path OR URI beginning with gcs:// or gs:// for Google Cloud Storage bucket)

  • experiment_ids (list[str]) – List of experiment IDs to include in query. To read data from more than one experiment ID, the listeners in the output of the first experiment ID in the list must be a strict subset of the listeners in the output of the subsequent experiment ID(s).

Returns:

3-element tuple containing

Return type:

tuple[str, str, str]

ecoli.library.parquet_emitter.field_metadata(conn, config_subquery, field)[source]

Gets the saved metadata (see output_metadata()) for a given field as a list. Automatically quotes the field name to handle special characters. Do NOT use double quotes in field.

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • field (str) – Name of field to get metadata for

Return type:

list

ecoli.library.parquet_emitter.flatten_dict(d)[source]

Flatten nested dictionary down to key-value pairs where each key concatenates all the keys needed to reach the corresponding value in the input. Allows each leaf field in a nested emit to be turned into a column in a Parquet file for efficient storage and retrieval.

Parameters:

d (dict)

ecoli.library.parquet_emitter.json_to_parquet(emit_dict, outfile, schema, filesystem)[source]

Convert dictionary to Parquet.

Parameters:
  • emit_dict (dict[str, ndarray | list[Series]]) – Mapping from column names to NumPy arrays (fixed-shape) or lists of Polars Series (variable-shape).

  • outfile (str) – Path to output Parquet file. Can be local path or URI.

  • schema (dict[str, Any]) – Full mapping of column names to Polars dtypes.

  • filesystem (AbstractFileSystem) – On local filesystem, fsspec filesystem needed to write Parquet file atomically.

ecoli.library.parquet_emitter.named_idx(col, names, idx, zero_to_null=False, _quote_col=True)[source]

Create DuckDB expressions for given indices from a list column. Can be used in projection kwarg of read_stacked_columns(). Since each index gets pulled out into its own column, this greatly simplifies aggregations like averages, etc. Only use this if the number of indices is relatively small (<100) and the list column is 1-dimensional. For 2+ dimensions or >100 indices, see ndidx_to_duckdb_expr(). Automatically quotes column names to handle special characters. Do NOT use double quotes in names or col.

Warning

DuckDB arrays are 1-indexed so this function adds 1 to every supplied index!

Parameters:
  • col (str) – Name of list column.

  • names (list[str]) – List of names for the new columns. Length must match the number of index combinations in idx (4 for example below).

  • idx (list[list[int]]) – Integer indices to retrieve from each dimension of col. For example, [[0, 1], [2, 3]] will retrieve the third and fourth elements of the second dimension for the first and second elements of the first dimension.

  • zero_to_null (bool) – Whether to turn 0s into nulls. This is useful when dividing by the values in this column, as most DuckDB aggregation functions (e.g. avg, max) propagate NaNs but ignore nulls.

  • _quote_col (bool) – Private argument to ensure col is quoted properly.

Returns:

DuckDB SQL expression for a set of named columns corresponding to the values at given indices of a list column

Return type:

str

ecoli.library.parquet_emitter.ndidx_to_duckdb_expr(name, idx)[source]

Returns a DuckDB expression for a column equivalent to converting each row of name into an ndarray name_arr (ndlist_to_ndarray()) and getting name_arr[idx]. idx can contain 1D lists of integers, boolean masks, or ":" (no 2D+ indices like x[[[1,2]]]). See also named_idx() if pulling out a relatively small set of indices. Automatically quotes column names to handle special characters. Do NOT use double quotes in name.

Warning

DuckDB arrays are 1-indexed so this function adds 1 to every supplied integer index!

Parameters:
  • name (str) – Name of column to recursively index

  • idx (list[int | list[int] | list[bool] | str]) –

    To get all elements for a dimension, supply the string ":". Otherwise, only single integers or 1D integer lists of indices are allowed for each dimension. Some examples:

    [0, 1] # First row, second column
    [[0, 1], 1] # First and second row, second column
    [0, 1, ":"] # First element of axis 1, second of 2, all of 3
    # Final example differs between this function and Numpy
    # This func: 1st and 2nd of axis 1, all of 2, 1st and 2nd of 3
    # Numpy: Complicated, see Numpy docs on advanced indexing
    [[0, 1], ":", [0, 1]]
    

Return type:

str

ecoli.library.parquet_emitter.ndlist_to_ndarray(s)[source]

Convert a PyArrow series of nested lists with fixed dimensions into a Numpy ndarray. This should really only be necessary if you are trying to perform linear algebra (e.g. matrix multiplication, dot products) inside a user-defined function (see DuckDB documentation on Python Function API and func kwarg for read_stacked_columns()).

For elementwise arithmetic of two nested list columns, this can be used to define a custom DuckDB function as follows:

import duckdb
import polars as pl
from ecoli.library.parquet_emitter import ndlist_to_ndarray
def sum_arrays(col_0, col_1):
    return pl.Series(
        ndlist_to_ndarray(col_0) +
        ndlist_to_ndarray(col_1)
    ).to_arrow()
conn = duckdb.connect()
conn.create_function(
    "sum_2d_int_arrays", # Function name for use in SQL (must be unique)
    sum_arrays, # Python function that takes and returns PyArrow arrays
    [list[list[int]], list[list[int]]], # Input types (2D lists here)
    list[list[int]], # Return type (2D list here)
    type = "arrow" # Tell DuckDB function operates on Arrow arrays
)
conn.sql("SELECT sum_2d_int_arrays(int_col_0, int_col_1) from input_table")
# Note that function must be registered under different name for each
# set of unique input/output types
conn.create_function(
    "sum_2d_int_and_float",
    sum_arrays,
    [list[list[int]], list[list[float]]], # Second input is 2D float array
    list[list[float]], # Adding int to float array gives float in Numpy
    type = "arrow"
)
conn.sql("SELECT sum_2d_int_and_float(int_col_0, float_col_0) from input_table")
Return type:

ndarray

ecoli.library.parquet_emitter.np_dtype(val, field_name)[source]

Get NumPy type for input value. There are a few scenarios where this function raises an exception intentionally:

  • An internal value is None or an empty list/tuple: data is ragged/nullable and needs Polars serialization.

  • Python bytes type: NumPy only has fixed-length bytes type so use Polars serialization to avoid truncation.

  • Python datetime types: Simpler and less error-prone to use Polars serialization instead of converting to NumPy.

Warning

np.bytes_ values and arrays will get truncated to the size of the first encountered value. Convert to Python bytes type to avoid this.

The try...except blocks in emit() are designed to catch these exceptions and fall back to Polars serialization.

All other exceptions raised by this function indicate that the value is of an unsupported type for which even the fall back Polars serialization likely will not work.

Parameters:
Return type:

Any

ecoli.library.parquet_emitter.num_cells(conn, subquery)[source]

Return cell count in DuckDB subquery containing experiment_id, variant, lineage_seed, generation, and agent_id columns.

Parameters:
  • conn (DuckDBPyConnection)

  • subquery (str)

Return type:

int

ecoli.library.parquet_emitter.open_arbitrary_sim_data(sim_data_dict)[source]

Given a mapping from experiment ID(s) to mappings from variant ID(s) to sim_data path(s), pick an arbitrary sim_data to read.

Parameters:

sim_data_dict (dict[str, dict[int, Any]]) – Generated by runscripts.analysis and passed to each analysis script as an argument.

Returns:

File object for arbitrarily chosen sim_data to be loaded with pickle.load

Return type:

OpenFile

ecoli.library.parquet_emitter.open_output_file(outfile)[source]

Open a file by its path, whether that be a path on local storage or Google Cloud Storage.

Parameters:

outfile (str) – Path to file. Must have gs:// or gcs:// prefix if on Google Cloud Storage. Can be relative or absolute path if on local storage.

Returns:

File object that supports reading, seeking, etc. in bytes

Return type:

OpenFile

ecoli.library.parquet_emitter.pl_dtype_from_ndarray(arr)[source]

Get Polars data type for a Numpy array, including nested lists.

Parameters:

arr (ndarray)

Return type:

DataType

ecoli.library.parquet_emitter.plot_metadata(conn, config_subquery, variant_name)[source]

Gets dictionary that can be used as metadata kwarg to wholecell.utils.plotting_tools.export_figure().

Parameters:
  • conn (DuckDBPyConnection) – DuckDB connection

  • config_subquery (str) – DuckDB query containing sim config data

  • variant_name (str) – Name of variant

Return type:

dict[str, Any]

ecoli.library.parquet_emitter.read_stacked_columns(history_sql, columns, remove_first=False, func=None, conn=None, order_results=True, success_sql=None)[source]

Loads columns for many cells. If you would like to perform more advanced computatations (aggregations, window functions, etc.) using the optimized DuckDB API, you can omit conn, in which case this function will return an SQL string that can be used as a subquery. For computations that cannot be easily performed using the DuckDB API, you can define a custom function func that will be called on the data for each cell. By default, the return value (whether it be the actual data or an SQL subquery) will also include the experiment_id, variant, lineage_seed, generation, agent_id, and time columns.

Warning

If the column expressions in columns are not from named_idx() or ndidx_to_duckdb_expr(), they may need to be enclosed in double quotes to handle special characters (e.g. "col-with-hyphens").

For example, to get the average total concentration of three bulk molecules with indices 100, 1000, and 10000 per cell:

import duckdb
from ecoli.library.parquet_emitter import (
    dataset_sql, read_stacked_columns)
history_sql, config_sql, _ = dataset_sql('out/', 'exp_id')
subquery = read_stacked_columns(
    history_sql,
    # Note DuckDB arrays are 1-indexed
    ["bulk[100 + 1] + bulk[1000 + 1] + bulk[10000 + 1] AS bulk_sum",
    "listeners__enzyme_kinetics__counts_to_molar AS counts_to_molar"],
    order_results=False,
)
query = '''
    SELECT avg(bulk_sum * counts_to_molar) AS avg_total_conc
    FROM ({subquery})
    GROUP BY experiment_id, variant, lineage_seed, generation, agent_id
    '''
conn = duckdb.connect()
data = conn.sql(query).pl()

Here is a more complicated example that defines a custom function to get the per-cell average RNA synthesis probability per cistron:

import duckdb
import pickle
from ecoli.library.parquet_emitter import (
    dataset_sql, ndlist_to_ndarray, read_stacked_columns)
history_sql, config_sql, _ = dataset_sql('out/', 'exp_id')
# Load sim data
with open("reconstruction/sim_data/kb/simData.cPickle", "rb") as f:
    sim_data = pickle.load(f)
# Get mapping from RNAs (TUs) to cistrons
cistron_tu_mat = sim_data.process.transcription.cistron_tu_mapping_matrix
# Custom aggregation function with Numpy dot product and mean
def avg_rna_synth_prob_per_cistron(rna_synth_prob):
    # Convert rna_synth_prob into 2-D Numpy array (time x TU)
    rna_synth_prob = ndlist_to_ndarray(rna_synth_prob[
        "listeners__rna_synth_prob__actual_rna_synth_prob"])
    rna_synth_prob_per_cistron = cistron_tu_mat.dot(rna_synth_prob.T).T
    # Return value must be a PyArrow table
    return pl.DataFrame({'avg_rna_synth_prob_per_cistron': [
        rna_synth_prob_per_cistron.mean(axis=0)]}).to_arrow()
conn = duckdb.connect()
result = read_stacked_columns(
    history_sql,
    ["listeners__rna_synth_prob__actual_rna_synth_prob"],
    func=avg_rna_synth_prob_per_cistron,
    conn=conn,
)
Parameters:
  • history_sql (str) – DuckDB SQL string from dataset_sql(), potentially with filters appended in WHERE clause

  • columns (list[str]) – Names of columns to read data for. Alternatively, DuckDB expressions of columns (e.g. avg(listeners__mass__cell_mass) AS avg_mass or the output of named_idx() or ndidx_to_duckdb_expr()).

  • remove_first (bool) – Remove data for first timestep of each cell

  • func (Callable[[DataFrame], DataFrame] | None) – Function to call on data for each cell, should take and return a Polars DataFrame with columns equal to columns

  • conn (DuckDBPyConnection | None) – DuckDB connection instance with which to run query. Typically provided by runscripts.analysis.main() to the plot method of analysis scripts (tweaked some DuckDB settings). Can be omitted to return SQL query string to be used as subquery instead of running query immediately and returning result.

  • order_results (bool) – Whether to sort returned table by experiment_id, variant, lineage_seed, generation, agent_id, and time. If no conn is provided, this can usually be disabled and any sorting can be deferred until the last step in the query with a manual ORDER BY. Doing this can greatly reduce RAM usage.

  • success_sql (str | None) – Final DuckDB SQL string from dataset_sql(). If provided, will be used to filter out unsuccessful sims.

Return type:

DataFrame | str

ecoli.library.parquet_emitter.skip_n_gens(subquery, n)[source]

Modifies a DuckDB SQL query to skip the first n generations of data.

Parameters:
Return type:

str

ecoli.library.parquet_emitter.union_by_name(query_sql)[source]

Modifies SQL query string from dataset_sql() to include union_by_name = true in the DuckDB read_parquet function. This allows data to be read from simulations that have different columns by filling in nulls and casting as necessary. This comes with a performance penalty and should be avoided if possible.

Parameters:

query_sql (str) – SQL query string from dataset_sql()

Return type:

str

ecoli.library.parquet_emitter.union_pl_dtypes(dt1, dt2, k, force_inner=None)[source]

Returns the more specific data type when combining two Polars datatypes. Mainly intended to fill out nested List types that contain Nulls.

Parameters:
  • dt1 (DataType | DataTypeClass) – First Polars datatype

  • dt2 (DataType | DataTypeClass) – Second Polars datatype

  • k (str) – Name of column being combined (for error messages)

  • force_inner (DataType | DataTypeClass | None) – Force this inner type when possible

Returns:

The resulting datatype

Return type:

DataType | DataTypeClass