import argparse
import copy
import hashlib
import importlib
import itertools
import json
import os
import pickle
import shutil
import subprocess
import time
from pathlib import Path
from fsspec import open as fsspec_open
from typing import TYPE_CHECKING, Any, cast
import numpy as np
from configs import CONFIG_DIR_PATH
from ecoli.experiments.ecoli_master_sim import SimConfig
from wholecell.utils.filepath import cloud_path_join, is_cloud_uri
if TYPE_CHECKING:
from reconstruction.ecoli.simulation_data import SimulationDataEcoli
[docs]
def parse_variants(
variant_config: dict[str, str | dict[str, Any]],
) -> list[dict[str, Any]]:
"""
Parse parameters for a variant specified under ``variants`` key of config.
Args:
variant_config: Dictionary of the form::
{
# Define one or more parameters with any names EXCEPT `op`
'param_name': {
# Each parameter defines only ONE of the following keys
# A list of parameter values
'value': [...]
# Numpy function that returns array of parameter values
# Example: np.linspace
'linspace': {
# Kwargs
'start': ...,
'stop': ...,
'num' (optional): ...
}
# Dictionary of parameters with same rules as this one
'nested': {...}
},
# When more than one parameter is defined, an 'op' key
# MUST define how to combine them. The three options are:
# 'zip': Zip parameters (must have same length)
# 'prod': Cartesian product of parameters
# 'add': Concatenate parameter lists into single parameter
# named {param_name_1}__{param_name_2}__...
'param_2': {...},
'op': 'zip'
}
Returns:
List of parameter dictionaries generated from variant config
"""
# Extract operation if more than one parameter
operation = None
if len(variant_config) > 1:
assert "op" in variant_config, (
"Variant has more than 1 parameter but no op key defined."
)
operation = variant_config.pop("op")
elif "op" in variant_config:
raise TypeError(
"Variant only has a single parameter and should not define op key."
)
# Perform pre-processing of parameters
parsed = {}
for param_name, param_conf in variant_config.items():
param_conf = cast(dict[str, Any], param_conf)
if len(param_conf) > 1:
raise TypeError(f"{param_name} should only have 1 type.")
param_type = list(param_conf.keys())[0]
param_vals = param_conf[param_type]
if param_type == "value":
if not isinstance(param_vals, list):
raise TypeError(f"{param_name} should have a list value.")
parsed[param_name] = param_vals
elif param_type == "nested":
param_vals = cast(dict[str, str | dict[str, Any]], param_vals)
parsed[param_name] = parse_variants(param_vals)
else:
try:
np_func = getattr(np, param_type)
except AttributeError as e:
raise TypeError(f"{param_name} is unknown type {param_type}.") from e
parsed[param_name] = np_func(**param_vals).tolist()
# Apply parameter operations
if operation == "prod":
param_tuples = itertools.product(*(parsed[k] for k in parsed))
param_dicts = [
{name: val for name, val in zip(parsed.keys(), param_tuple)}
for param_tuple in param_tuples
]
elif operation == "zip":
n_combos = -1
for name, val in parsed.items():
if n_combos == -1:
n_combos = len(val)
if len(val) != n_combos:
raise RuntimeError(
f"At least 1 other parameter has a "
f"different # of values than {name}."
)
param_dicts = [
{name: val[i] for name, val in parsed.items()} for i in range(n_combos)
]
elif operation == "add":
combined_param_name = "__".join(parsed)
param_dicts = []
for val in parsed.values():
param_dicts.extend({combined_param_name: i} for i in val)
elif operation is None:
param_name = list(parsed.keys())[0]
param_vals = parsed[param_name]
param_dicts = [{param_name: param_val} for param_val in param_vals]
else:
raise RuntimeError(f"Unknown operation {operation} in {variant_config}")
return param_dicts
[docs]
def apply_and_save_variants(
sim_data: "SimulationDataEcoli",
param_dicts: list[dict[str, Any]],
variant_name: str,
outdir: str,
skip_baseline: bool,
) -> list[tuple[int, str]]:
"""
Applies variant function to ``sim_data`` with each parameter dictionary
in ``param_dicts``. Saves each variant as ``{i}.cPickle``
in ``outdir``, where ``i`` is the index of the parameter dictionary in
``param_dicts`` used to create that variant. Also saves ``metadata.json``
in ``outdir`` that maps each ``{i}`` to the parameter
dictionary used to create it.
Args:
sim_data: Simulation data object to modify
param_dicts: Return value of :py:func:`~.parse_variants`
variant_name: Name of variant function file in ``ecoli/variants`` folder
outdir: Path to folder where variant ``sim_data`` pickles are saved
skip_baseline: Whether to save metadata for baseline sim_data
Returns:
List of (variant_idx, hash) tuples for each variant written
"""
# Use appropriate path join for cloud vs local paths
path_join = cloud_path_join if is_cloud_uri(outdir) else os.path.join
variant_mod = importlib.import_module(f"ecoli.variants.{variant_name}")
variant_metadata: dict[int, str | dict[str, Any]] = {}
variant_hashes: list[tuple[int, str]] = []
if not skip_baseline:
variant_metadata[0] = "baseline"
for i, params in enumerate(param_dicts):
sim_data_copy = copy.deepcopy(sim_data)
variant_metadata[i + 1] = params
variant_sim_data = variant_mod.apply_variant(sim_data_copy, params)
outpath = path_join(outdir, f"{i + 1}.cPickle")
# Serialize and compute hash
data_bytes = pickle.dumps(variant_sim_data)
data_hash = hashlib.sha256(data_bytes).hexdigest()
variant_hashes.append((i + 1, data_hash))
with fsspec_open(outpath, "wb") as f:
f.write(data_bytes)
with fsspec_open(path_join(outdir, "metadata.json"), "w") as f:
json.dump({variant_name: variant_metadata}, f)
return variant_hashes
def test_parse_variants():
"""
Test variant parameter parsing.
"""
variant_config = {
"a": {"value": [1, 2]},
"b": {"value": ["one", "two"]},
"c": {"nested": {"d": {"value": [3, 4]}, "e": {"value": [5, 6]}, "op": "zip"}},
"op": "prod",
}
parsed_params = parse_variants(variant_config)
assert parsed_params == [
{"a": 1, "b": "one", "c": {"d": 3, "e": 5}},
{"a": 1, "b": "one", "c": {"d": 4, "e": 6}},
{"a": 1, "b": "two", "c": {"d": 3, "e": 5}},
{"a": 1, "b": "two", "c": {"d": 4, "e": 6}},
{"a": 2, "b": "one", "c": {"d": 3, "e": 5}},
{"a": 2, "b": "one", "c": {"d": 4, "e": 6}},
{"a": 2, "b": "two", "c": {"d": 3, "e": 5}},
{"a": 2, "b": "two", "c": {"d": 4, "e": 6}},
]
def test_parse_variants_zip():
"""
Test variant parameter parsing with zip operation.
"""
variant_config = {
"x": {"value": [10, 20, 30]},
"y": {"value": ["a", "b", "c"]},
"op": "zip",
}
parsed_params = parse_variants(variant_config)
assert parsed_params == [
{"x": 10, "y": "a"},
{"x": 20, "y": "b"},
{"x": 30, "y": "c"},
]
def test_parse_variants_add():
"""
Test variant parameter parsing with add operation (concatenation).
"""
variant_config = {
"x": {"value": [1, 2]},
"y": {"value": [3, 4]},
"op": "add",
}
parsed_params = parse_variants(variant_config)
assert parsed_params == [
{"x__y": 1},
{"x__y": 2},
{"x__y": 3},
{"x__y": 4},
]
def test_parse_variants_single():
"""
Test variant parameter parsing with a single parameter (no op key).
"""
variant_config = {"z": {"value": [100, 200, 300]}}
parsed_params = parse_variants(variant_config)
assert parsed_params == [{"z": 100}, {"z": 200}, {"z": 300}]
def test_parse_variants_numpy():
"""
Test variant parameter parsing using numpy functions (e.g. linspace, arange).
"""
# Test np.linspace
variant_config = {"rate": {"linspace": {"start": [0.0], "stop": [1.0], "num": 3}}}
parsed_params = parse_variants(variant_config)
assert len(parsed_params) == 3
assert parsed_params[0] == {"rate": [0.0]}
assert parsed_params[1] == {"rate": [0.5]}
assert parsed_params[2] == {"rate": [1.0]}
# Test np.arange
variant_config = {"step": {"arange": {"start": 2, "stop": 8, "step": 2}}}
parsed_params = parse_variants(variant_config)
assert parsed_params == [{"step": 2}, {"step": 4}, {"step": 6}]
[docs]
class SimData:
"""
Mock sim_data class for testing.
"""
pass
def test_create_variants():
"""
Test modification and saving of variant sim_data.
"""
try:
os.makedirs("test_create_variants/kb", exist_ok=True)
# Create mock sim_data pickle
with open("test_create_variants/kb/simData.cPickle", "wb") as f:
pickle.dump(SimData(), f)
repo_dir = os.path.dirname(os.path.dirname(__file__))
# Test script and config system
os.environ["PYTHONPATH"] = repo_dir
subprocess.run(
[
"python",
"runscripts/create_variants.py",
"--config",
"configs/test_variant.json",
"--kb",
"test_create_variants/kb",
"-o",
"test_create_variants/out",
],
check=True,
env=os.environ,
)
# Check that metadata aligns with variant sim_data attrs
with open("test_create_variants/out/metadata.json") as f:
variant_metadata = json.load(f)
assert "variant_test" in variant_metadata
variant_metadata = variant_metadata["variant_test"]
out_path = Path("test_create_variants/out")
var_paths = out_path.glob("*.cPickle")
for var_path in var_paths:
# Skip baseline
if var_path.stem == "0":
continue
with open(var_path, "rb") as f:
variant_sim_data = pickle.load(f)
variant_params = variant_metadata[var_path.stem]
assert variant_sim_data.a == variant_params["a"]
assert variant_sim_data.b == variant_params["b"]
assert variant_sim_data.d == variant_params["c"]["d"]
assert variant_sim_data.e == variant_params["c"]["e"]
assert variant_sim_data.f == variant_params["c"]["f"]
finally:
shutil.rmtree("test_create_variants", ignore_errors=True)
[docs]
def main():
parser = argparse.ArgumentParser()
default_config = os.path.join(CONFIG_DIR_PATH, "default.json")
parser.add_argument(
"--config",
action="store",
default=default_config,
help=(
"Path to configuration file for the simulation. "
"All key-value pairs in this file will be applied on top "
f"of the options defined in {default_config}. To configure "
"variants, the config must include the `variant` key. Under the "
"`variant` key should be a single key with the name of the "
"variant module under `ecoli.variant` (for example, `variant_1` "
"if imported as `ecoli.variant.variant_1` or `folder_1.variant_1` "
"if imported as `ecoli.variant.folder_1.variant_1`). See "
"`ecoli.variants.template` for variant template. Under the "
"variant module name should be a parameter dictionary as "
"described in the docstring for `parse_variants`."
),
)
parser.add_argument(
"--kb", action="store", type=str, help="Path to kb folder generated by ParCa."
)
parser.add_argument(
"--outdir",
"-o",
action="store",
type=str,
help="Path to folder where variant sim_data and metadata are written.",
)
args = parser.parse_args()
with open(default_config, "r") as f:
config = json.load(f)
if args.config is not None:
with fsspec_open(args.config, "r") as f:
SimConfig.merge_config_dicts(config, json.load(f))
for k, v in vars(args).items():
if v is not None:
config[k] = v
# Handle cloud vs local paths
kb_path = config["kb"]
outdir = config["outdir"]
path_join = cloud_path_join if is_cloud_uri(kb_path) else os.path.join
print("Loading sim_data...")
with fsspec_open(path_join(kb_path, "simData.cPickle"), "rb") as f:
sim_data = pickle.load(f)
# Handle output directory - only expand to absolute for local paths
if is_cloud_uri(outdir):
config_outdir = outdir
out_path_join = cloud_path_join
else:
config_outdir = os.path.abspath(outdir)
os.makedirs(config_outdir, exist_ok=True)
out_path_join = os.path.join
# Track variant info: (uri, hash, variant_idx)
variant_info: list[tuple[int, str, str]] = []
if config["skip_baseline"]:
print("Skipping baseline sim_data...")
else:
print("Saving baseline sim_data...")
baseline_path = out_path_join(config_outdir, "0.cPickle")
baseline_bytes = pickle.dumps(sim_data)
baseline_hash = hashlib.sha256(baseline_bytes).hexdigest()
with fsspec_open(baseline_path, "wb") as f:
f.write(baseline_bytes)
variant_info.append((baseline_path, baseline_hash, 0))
variant_config = config.get("variants", {})
if len(variant_config) > 1:
raise RuntimeError(
"Only one variant name allowed. Variants can "
"be manually composed in Python by having one "
"variant function internally call another."
)
elif len(variant_config) == 1:
variant_name = list(variant_config.keys())[0]
variant_params = variant_config[variant_name]
print("Parsing variants...")
parsed_params = parse_variants(variant_params)
print("Applying variants and saving variant sim_data...")
variant_hashes = apply_and_save_variants(
sim_data,
parsed_params,
variant_name,
config_outdir,
config["skip_baseline"],
)
# Add variant info
for var_idx, var_hash in variant_hashes:
var_path = out_path_join(config_outdir, f"{var_idx}.cPickle")
variant_info.append((var_path, var_hash, var_idx))
else:
with fsspec_open(out_path_join(config_outdir, "metadata.json"), "w") as f:
json.dump({None: {0: "baseline"}}, f)
# Write metadata_uri to file for Nextflow (cloud URI of metadata.json)
metadata_uri = out_path_join(config_outdir, "metadata.json")
with open("metadata_uri.txt", "w") as f:
f.write(metadata_uri)
# Write variant info to file for Nextflow (local only)
# Each line: variant_name<TAB>sim_data_uri<TAB>sim_data_hash
with open("variant_info.txt", "w") as f:
for var_idx, var_uri, var_hash in variant_info:
f.write(f"{var_idx}\t{var_uri}\t{var_hash}\n")
print(f"{time.ctime()}: Wrote {len(variant_info)} variant(s) to {config_outdir}")
print("Done.")
if __name__ == "__main__":
main()