Source code for runscripts.create_variants

import argparse
import copy
import hashlib
import importlib
import itertools
import json
import os
import pickle
import shutil
import subprocess
import time
from pathlib import Path
from fsspec import open as fsspec_open
from typing import TYPE_CHECKING, Any, cast

import numpy as np

from configs import CONFIG_DIR_PATH
from ecoli.experiments.ecoli_master_sim import SimConfig
from wholecell.utils.filepath import cloud_path_join, is_cloud_uri

if TYPE_CHECKING:
    from reconstruction.ecoli.simulation_data import SimulationDataEcoli


[docs] def parse_variants( variant_config: dict[str, str | dict[str, Any]], ) -> list[dict[str, Any]]: """ Parse parameters for a variant specified under ``variants`` key of config. Args: variant_config: Dictionary of the form:: { # Define one or more parameters with any names EXCEPT `op` 'param_name': { # Each parameter defines only ONE of the following keys # A list of parameter values 'value': [...] # Numpy function that returns array of parameter values # Example: np.linspace 'linspace': { # Kwargs 'start': ..., 'stop': ..., 'num' (optional): ... } # Dictionary of parameters with same rules as this one 'nested': {...} }, # When more than one parameter is defined, an 'op' key # MUST define how to combine them. The three options are: # 'zip': Zip parameters (must have same length) # 'prod': Cartesian product of parameters # 'add': Concatenate parameter lists into single parameter # named {param_name_1}__{param_name_2}__... 'param_2': {...}, 'op': 'zip' } Returns: List of parameter dictionaries generated from variant config """ # Extract operation if more than one parameter operation = None if len(variant_config) > 1: assert "op" in variant_config, ( "Variant has more than 1 parameter but no op key defined." ) operation = variant_config.pop("op") elif "op" in variant_config: raise TypeError( "Variant only has a single parameter and should not define op key." ) # Perform pre-processing of parameters parsed = {} for param_name, param_conf in variant_config.items(): param_conf = cast(dict[str, Any], param_conf) if len(param_conf) > 1: raise TypeError(f"{param_name} should only have 1 type.") param_type = list(param_conf.keys())[0] param_vals = param_conf[param_type] if param_type == "value": if not isinstance(param_vals, list): raise TypeError(f"{param_name} should have a list value.") parsed[param_name] = param_vals elif param_type == "nested": param_vals = cast(dict[str, str | dict[str, Any]], param_vals) parsed[param_name] = parse_variants(param_vals) else: try: np_func = getattr(np, param_type) except AttributeError as e: raise TypeError(f"{param_name} is unknown type {param_type}.") from e parsed[param_name] = np_func(**param_vals).tolist() # Apply parameter operations if operation == "prod": param_tuples = itertools.product(*(parsed[k] for k in parsed)) param_dicts = [ {name: val for name, val in zip(parsed.keys(), param_tuple)} for param_tuple in param_tuples ] elif operation == "zip": n_combos = -1 for name, val in parsed.items(): if n_combos == -1: n_combos = len(val) if len(val) != n_combos: raise RuntimeError( f"At least 1 other parameter has a " f"different # of values than {name}." ) param_dicts = [ {name: val[i] for name, val in parsed.items()} for i in range(n_combos) ] elif operation == "add": combined_param_name = "__".join(parsed) param_dicts = [] for val in parsed.values(): param_dicts.extend({combined_param_name: i} for i in val) elif operation is None: param_name = list(parsed.keys())[0] param_vals = parsed[param_name] param_dicts = [{param_name: param_val} for param_val in param_vals] else: raise RuntimeError(f"Unknown operation {operation} in {variant_config}") return param_dicts
[docs] def apply_and_save_variants( sim_data: "SimulationDataEcoli", param_dicts: list[dict[str, Any]], variant_name: str, outdir: str, skip_baseline: bool, ) -> list[tuple[int, str]]: """ Applies variant function to ``sim_data`` with each parameter dictionary in ``param_dicts``. Saves each variant as ``{i}.cPickle`` in ``outdir``, where ``i`` is the index of the parameter dictionary in ``param_dicts`` used to create that variant. Also saves ``metadata.json`` in ``outdir`` that maps each ``{i}`` to the parameter dictionary used to create it. Args: sim_data: Simulation data object to modify param_dicts: Return value of :py:func:`~.parse_variants` variant_name: Name of variant function file in ``ecoli/variants`` folder outdir: Path to folder where variant ``sim_data`` pickles are saved skip_baseline: Whether to save metadata for baseline sim_data Returns: List of (variant_idx, hash) tuples for each variant written """ # Use appropriate path join for cloud vs local paths path_join = cloud_path_join if is_cloud_uri(outdir) else os.path.join variant_mod = importlib.import_module(f"ecoli.variants.{variant_name}") variant_metadata: dict[int, str | dict[str, Any]] = {} variant_hashes: list[tuple[int, str]] = [] if not skip_baseline: variant_metadata[0] = "baseline" for i, params in enumerate(param_dicts): sim_data_copy = copy.deepcopy(sim_data) variant_metadata[i + 1] = params variant_sim_data = variant_mod.apply_variant(sim_data_copy, params) outpath = path_join(outdir, f"{i + 1}.cPickle") # Serialize and compute hash data_bytes = pickle.dumps(variant_sim_data) data_hash = hashlib.sha256(data_bytes).hexdigest() variant_hashes.append((i + 1, data_hash)) with fsspec_open(outpath, "wb") as f: f.write(data_bytes) with fsspec_open(path_join(outdir, "metadata.json"), "w") as f: json.dump({variant_name: variant_metadata}, f) return variant_hashes
def test_parse_variants(): """ Test variant parameter parsing. """ variant_config = { "a": {"value": [1, 2]}, "b": {"value": ["one", "two"]}, "c": {"nested": {"d": {"value": [3, 4]}, "e": {"value": [5, 6]}, "op": "zip"}}, "op": "prod", } parsed_params = parse_variants(variant_config) assert parsed_params == [ {"a": 1, "b": "one", "c": {"d": 3, "e": 5}}, {"a": 1, "b": "one", "c": {"d": 4, "e": 6}}, {"a": 1, "b": "two", "c": {"d": 3, "e": 5}}, {"a": 1, "b": "two", "c": {"d": 4, "e": 6}}, {"a": 2, "b": "one", "c": {"d": 3, "e": 5}}, {"a": 2, "b": "one", "c": {"d": 4, "e": 6}}, {"a": 2, "b": "two", "c": {"d": 3, "e": 5}}, {"a": 2, "b": "two", "c": {"d": 4, "e": 6}}, ] def test_parse_variants_zip(): """ Test variant parameter parsing with zip operation. """ variant_config = { "x": {"value": [10, 20, 30]}, "y": {"value": ["a", "b", "c"]}, "op": "zip", } parsed_params = parse_variants(variant_config) assert parsed_params == [ {"x": 10, "y": "a"}, {"x": 20, "y": "b"}, {"x": 30, "y": "c"}, ] def test_parse_variants_add(): """ Test variant parameter parsing with add operation (concatenation). """ variant_config = { "x": {"value": [1, 2]}, "y": {"value": [3, 4]}, "op": "add", } parsed_params = parse_variants(variant_config) assert parsed_params == [ {"x__y": 1}, {"x__y": 2}, {"x__y": 3}, {"x__y": 4}, ] def test_parse_variants_single(): """ Test variant parameter parsing with a single parameter (no op key). """ variant_config = {"z": {"value": [100, 200, 300]}} parsed_params = parse_variants(variant_config) assert parsed_params == [{"z": 100}, {"z": 200}, {"z": 300}] def test_parse_variants_numpy(): """ Test variant parameter parsing using numpy functions (e.g. linspace, arange). """ # Test np.linspace variant_config = {"rate": {"linspace": {"start": [0.0], "stop": [1.0], "num": 3}}} parsed_params = parse_variants(variant_config) assert len(parsed_params) == 3 assert parsed_params[0] == {"rate": [0.0]} assert parsed_params[1] == {"rate": [0.5]} assert parsed_params[2] == {"rate": [1.0]} # Test np.arange variant_config = {"step": {"arange": {"start": 2, "stop": 8, "step": 2}}} parsed_params = parse_variants(variant_config) assert parsed_params == [{"step": 2}, {"step": 4}, {"step": 6}]
[docs] class SimData: """ Mock sim_data class for testing. """ pass
def test_create_variants(): """ Test modification and saving of variant sim_data. """ try: os.makedirs("test_create_variants/kb", exist_ok=True) # Create mock sim_data pickle with open("test_create_variants/kb/simData.cPickle", "wb") as f: pickle.dump(SimData(), f) repo_dir = os.path.dirname(os.path.dirname(__file__)) # Test script and config system os.environ["PYTHONPATH"] = repo_dir subprocess.run( [ "python", "runscripts/create_variants.py", "--config", "configs/test_variant.json", "--kb", "test_create_variants/kb", "-o", "test_create_variants/out", ], check=True, env=os.environ, ) # Check that metadata aligns with variant sim_data attrs with open("test_create_variants/out/metadata.json") as f: variant_metadata = json.load(f) assert "variant_test" in variant_metadata variant_metadata = variant_metadata["variant_test"] out_path = Path("test_create_variants/out") var_paths = out_path.glob("*.cPickle") for var_path in var_paths: # Skip baseline if var_path.stem == "0": continue with open(var_path, "rb") as f: variant_sim_data = pickle.load(f) variant_params = variant_metadata[var_path.stem] assert variant_sim_data.a == variant_params["a"] assert variant_sim_data.b == variant_params["b"] assert variant_sim_data.d == variant_params["c"]["d"] assert variant_sim_data.e == variant_params["c"]["e"] assert variant_sim_data.f == variant_params["c"]["f"] finally: shutil.rmtree("test_create_variants", ignore_errors=True)
[docs] def main(): parser = argparse.ArgumentParser() default_config = os.path.join(CONFIG_DIR_PATH, "default.json") parser.add_argument( "--config", action="store", default=default_config, help=( "Path to configuration file for the simulation. " "All key-value pairs in this file will be applied on top " f"of the options defined in {default_config}. To configure " "variants, the config must include the `variant` key. Under the " "`variant` key should be a single key with the name of the " "variant module under `ecoli.variant` (for example, `variant_1` " "if imported as `ecoli.variant.variant_1` or `folder_1.variant_1` " "if imported as `ecoli.variant.folder_1.variant_1`). See " "`ecoli.variants.template` for variant template. Under the " "variant module name should be a parameter dictionary as " "described in the docstring for `parse_variants`." ), ) parser.add_argument( "--kb", action="store", type=str, help="Path to kb folder generated by ParCa." ) parser.add_argument( "--outdir", "-o", action="store", type=str, help="Path to folder where variant sim_data and metadata are written.", ) args = parser.parse_args() with open(default_config, "r") as f: config = json.load(f) if args.config is not None: with fsspec_open(args.config, "r") as f: SimConfig.merge_config_dicts(config, json.load(f)) for k, v in vars(args).items(): if v is not None: config[k] = v # Handle cloud vs local paths kb_path = config["kb"] outdir = config["outdir"] path_join = cloud_path_join if is_cloud_uri(kb_path) else os.path.join print("Loading sim_data...") with fsspec_open(path_join(kb_path, "simData.cPickle"), "rb") as f: sim_data = pickle.load(f) # Handle output directory - only expand to absolute for local paths if is_cloud_uri(outdir): config_outdir = outdir out_path_join = cloud_path_join else: config_outdir = os.path.abspath(outdir) os.makedirs(config_outdir, exist_ok=True) out_path_join = os.path.join # Track variant info: (uri, hash, variant_idx) variant_info: list[tuple[int, str, str]] = [] if config["skip_baseline"]: print("Skipping baseline sim_data...") else: print("Saving baseline sim_data...") baseline_path = out_path_join(config_outdir, "0.cPickle") baseline_bytes = pickle.dumps(sim_data) baseline_hash = hashlib.sha256(baseline_bytes).hexdigest() with fsspec_open(baseline_path, "wb") as f: f.write(baseline_bytes) variant_info.append((baseline_path, baseline_hash, 0)) variant_config = config.get("variants", {}) if len(variant_config) > 1: raise RuntimeError( "Only one variant name allowed. Variants can " "be manually composed in Python by having one " "variant function internally call another." ) elif len(variant_config) == 1: variant_name = list(variant_config.keys())[0] variant_params = variant_config[variant_name] print("Parsing variants...") parsed_params = parse_variants(variant_params) print("Applying variants and saving variant sim_data...") variant_hashes = apply_and_save_variants( sim_data, parsed_params, variant_name, config_outdir, config["skip_baseline"], ) # Add variant info for var_idx, var_hash in variant_hashes: var_path = out_path_join(config_outdir, f"{var_idx}.cPickle") variant_info.append((var_path, var_hash, var_idx)) else: with fsspec_open(out_path_join(config_outdir, "metadata.json"), "w") as f: json.dump({None: {0: "baseline"}}, f) # Write metadata_uri to file for Nextflow (cloud URI of metadata.json) metadata_uri = out_path_join(config_outdir, "metadata.json") with open("metadata_uri.txt", "w") as f: f.write(metadata_uri) # Write variant info to file for Nextflow (local only) # Each line: variant_name<TAB>sim_data_uri<TAB>sim_data_hash with open("variant_info.txt", "w") as f: for var_idx, var_uri, var_hash in variant_info: f.write(f"{var_idx}\t{var_uri}\t{var_hash}\n") print(f"{time.ctime()}: Wrote {len(variant_info)} variant(s) to {config_outdir}") print("Done.")
if __name__ == "__main__": main()