import argparse
import json
import os
import shutil
import subprocess
import warnings
from datetime import datetime
from urllib import parse
from pyarrow import fs
LIST_KEYS_TO_MERGE = (
"save_times",
"add_processes",
"exclude_processes",
"processes",
"engine_process_reports",
"initial_state_overrides",
)
"""
Special configuration keys that are list values which are concatenated
together when they are found in multiple sources (e.g. default JSON and
user-specified JSON) instead of being directly overriden.
"""
CONFIG_DIR_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"ecoli",
"composites",
"ecoli_configs",
)
NEXTFLOW_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "nextflow")
MULTIDAUGHTER_CHANNEL = """
generationSize = {gen_size}
simCh
.map {{ tuple(groupKey(it[1..4], generationSize[it[4]]), it[0], it[1], it[2], it[3], it[4] ) }}
.groupTuple(remainder: true)
.map {{ tuple(it[1][0], it[2][0], it[3][0], it[4][0], it[5][0]) }}
.set {{ multiDaughterCh }}
"""
MULTIGENERATION_CHANNEL = """
simCh
.groupTuple(by: [1, 2, 3], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2], it[3]) }}
.set {{ multiGenerationCh }}
"""
MULTISEED_CHANNEL = """
simCh
.groupTuple(by: [1, 2], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2]) }}
.set {{ multiSeedCh }}
"""
MULTIVARIANT_CHANNEL = """
// Group once to deduplicate variant names and pickles
// Group again into single value for entire experiment
simCh
.groupTuple(by: [1, 2], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2]) }}
.groupTuple(by: [1])
.set {{ multiVariantCh }}
"""
[docs]
def merge_dicts(a, b):
"""
Recursively merges dictionary b into dictionary a.
This mutates dictionary a.
"""
for key, value in b.items():
if isinstance(value, dict) and key in a and isinstance(a[key], dict):
# If both values are dictionaries, recursively merge
merge_dicts(a[key], value)
else:
# Otherwise, overwrite or add the value from b to a
a[key] = value
[docs]
def generate_colony(seeds: int):
"""
Create strings to import and compose Nextflow processes for colony sims.
"""
return [], []
[docs]
def generate_lineage(
seed: int,
n_init_sims: int,
generations: int,
single_daughters: bool,
analysis_config: dict[str, dict[str, dict]],
):
"""
Create strings to import and compose Nextflow processes for lineage sims:
cells that divide for a number of generations but do not interact. Also
contains import statements and workflow jobs for analysis scripts.
Args:
seed: First seed for first sim
n_init_sims: Number of sims to initialize with different seeds
generations: Number of generations to run for each seed
single_daughters: If True, only simulate one daughter cell each gen
analysis_config: Dictionary with any of the following keys::
{
'variant': analyses to run on output of all cells combined,
'cohort': analyses to run on output grouped by variant,
'multigen': analyses to run on output grouped by variant & seed,
'single': analyses to run on output for each individual cell,
'parca': analyses to run on parameter calculator output
}
Each key corresponds to a mapping from analysis name (as defined
in ``ecol/analysis/__init__.py``) to keyword arguments.
Returns:
2-element tuple containing
- **sim_imports**: All `include` statements for Nextflow sim processes
- **sim_workflow**: Fully composed workflow for entire lineage
"""
sim_imports = []
sim_workflow = [f"\tchannel.of( {seed}..<{seed + n_init_sims} ).set {{ seedCh }}"]
all_sim_tasks = []
for gen in range(generations):
name = f"sim_gen_{gen + 1}"
# Handle special case of 1st generation
if gen == 0:
sim_imports.append(
f"include {{ simGen0 as {name} }} from '{NEXTFLOW_DIR}/sim'"
)
sim_workflow.append(
(
f"\t{name}(params.config, variantCh.combine(seedCh).combine([1]), '0')"
)
)
all_sim_tasks.append(f"{name}.out.metadata")
if not single_daughters:
sim_workflow.append(
f"\t{name}.out.nextGen0.mix({name}.out.nextGen1).set {{ {name}_nextGen }}"
)
else:
sim_workflow.append(f"\t{name}.out.nextGen0.set {{ {name}_nextGen }}")
continue
sim_imports.append(f"include {{ sim as {name} }} from '{NEXTFLOW_DIR}/sim'")
parent = f"sim_gen_{gen}"
sim_workflow.append(f"\t{name}({parent}_nextGen)")
if not single_daughters:
sim_workflow.append(
f"\t{name}.out.nextGen0.mix({name}.out.nextGen1).set {{ {name}_nextGen }}"
)
else:
sim_workflow.append(f"\t{name}.out.nextGen0.set {{ {name}_nextGen }}")
all_sim_tasks.append(f"{name}.out.metadata")
# Channel that combines metadata for all sim tasks
if len(all_sim_tasks) > 1:
tasks = all_sim_tasks[0]
other_tasks = ", ".join(all_sim_tasks[1:])
sim_workflow.append(f"\t{tasks}.mix({other_tasks}).set {{ simCh }}")
else:
sim_workflow.append(f"\t{all_sim_tasks[0]}.set {{ simCh }}")
sims_per_seed = generations if single_daughters else 2**generations - 1
if analysis_config.get("multivariant", False):
# Channel that groups all sim tasks
sim_workflow.append(
MULTIVARIANT_CHANNEL.format(size=sims_per_seed * n_init_sims)
)
sim_workflow.append(
"\tanalysisMultiVariant(params.config, kb, multiVariantCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiVariant }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multiseed", False):
# Channel that groups sim tasks by variant sim_data
sim_workflow.append(MULTISEED_CHANNEL.format(size=sims_per_seed * n_init_sims))
sim_workflow.append(
"\tanalysisMultiSeed(params.config, kb, multiSeedCh, variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiSeed }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multigeneration", False):
# Channel that groups sim tasks by variant sim_data and initial seed
sim_workflow.append(MULTIGENERATION_CHANNEL.format(size=sims_per_seed))
sim_workflow.append(
"\tanalysisMultiGeneration(params.config, kb, multiGenerationCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiGeneration }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multidaughter", False) and not single_daughters:
# Channel that groups sim tasks by variant sim_data, initial seed, and generation
# When simulating both daughters, will have >1 cell for generation >1
gen_size = "[" + ", ".join([f"{g+1}: {2**g}" for g in range(generations)]) + "]"
sim_workflow.append(MULTIDAUGHTER_CHANNEL.format(gen_size=gen_size))
sim_workflow.append(
"\tanalysisMultiDaughter(params.config, kb, multiDaughterCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiDaughter }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("single", False):
sim_workflow.append(
"\tanalysisSingle(params.config, kb, simCh, variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisSingle }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("parca", False):
sim_workflow.append("\tanalysisParca(params.config, kb)")
return sim_imports, sim_workflow
[docs]
def generate_code(config):
sim_data_path = config.get("sim_data_path")
if sim_data_path is not None:
kb_dir = os.path.dirname(sim_data_path)
run_parca = [
f"\tfile('{kb_dir}').copyTo(\"${{params.publishDir}}/${{params.experimentId}}/parca/kb\")",
f"\tChannel.fromPath('{kb_dir}').toList().set {{ kb }}",
]
else:
run_parca = ["\trunParca(params.config)", "\trunParca.out.toList().set {kb}"]
seed = config.get("seed", 0)
generations = config.get("generations", 0)
if generations:
lineage_seed = config.get("lineage_seed", 0)
n_init_sims = config.get("n_init_sims")
print(
f"Specified generations: initial lineage seed {lineage_seed}, {n_init_sims} initial sims"
)
single_daughters = config.get("single_daughters", True)
sim_imports, sim_workflow = generate_lineage(
lineage_seed,
n_init_sims,
generations,
single_daughters,
config.get("analysis_options", {}),
)
else:
sim_imports, sim_workflow = generate_colony(seed, n_init_sims)
return "\n".join(run_parca), "\n".join(sim_imports), "\n".join(sim_workflow)
[docs]
def build_runtime_image(image_name, apptainer=False):
build_script = os.path.join(
os.path.dirname(__file__), "container", "build-runtime.sh"
)
cmd = [build_script, "-r", image_name]
if apptainer:
cmd.append("-a")
subprocess.run(cmd, check=True)
[docs]
def build_wcm_image(image_name, runtime_image_name):
build_script = os.path.join(os.path.dirname(__file__), "container", "build-wcm.sh")
if runtime_image_name is None:
warnings.warn(
"No runtime image name supplied. By default, "
"we build the model image from the runtime "
"image with name " + os.environ["USER"] + '-wcm-code." '
'If this is correct, add this under "gcloud" > '
'"runtime_image_name" in your config JSON.'
)
cmd = [build_script, "-w", image_name, "-r", runtime_image_name]
subprocess.run(cmd, check=True)
[docs]
def copy_to_filesystem(source: str, dest: str, filesystem: fs.FileSystem):
"""
Robustly copy the contents of a local source file to a destination path on
a PyArrow filesystem.
Args:
source: Path to source file on local filesystem
dest: Path to destination file on PyArrow filesystem. If Cloud Storage
bucket, DO NOT include ``gs://`` or ``gcs://``.
filesystem: PyArrow filesystem instantiated from URI of ``dest``
"""
with filesystem.open_output_stream(dest) as stream:
with open(source, "rb") as f:
stream.write(f.read())
[docs]
def main():
parser = argparse.ArgumentParser()
config_file = os.path.join(CONFIG_DIR_PATH, "default.json")
parser.add_argument(
"--config",
action="store",
default=config_file,
help=(
"Path to configuration file for the simulation. "
"All key-value pairs in this file will be applied on top "
f"of the options defined in {config_file}."
),
)
parser.add_argument(
"--resume",
type=str,
default=None,
help="Resume workflow with given experiment ID. The experiment ID must "
"match the supplied configuration file and if suffix_time was used, must "
"contain the full time suffix (suffix_time will not be applied again).",
)
args = parser.parse_args()
with open(config_file, "r") as f:
config = json.load(f)
if args.config is not None:
config_file = args.config
with open(args.config, "r") as f:
user_config = json.load(f)
for key in LIST_KEYS_TO_MERGE:
user_config.setdefault(key, [])
user_config[key].extend(config.get(key, []))
if key == "engine_process_reports":
user_config[key] = [tuple(path) for path in user_config[key]]
# Ensures there are no duplicates in d2
user_config[key] = list(set(user_config[key]))
user_config[key].sort()
merge_dicts(config, user_config)
experiment_id = config["experiment_id"]
if experiment_id is None:
raise RuntimeError("No experiment ID was provided.")
if args.resume is not None:
experiment_id = args.resume
config["experiment_id"] = args.resume
elif config["suffix_time"]:
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
experiment_id = experiment_id + "_" + current_time
config["experiment_id"] = experiment_id
config["suffix_time"] = False
# Special characters are messy so do not allow them
if experiment_id != parse.quote_plus(experiment_id):
raise TypeError(
"Experiment ID cannot contain special characters"
f"that change the string when URL quoted: {experiment_id}"
f" != {parse.quote_plus(experiment_id)}"
)
# Resolve output directory
out_bucket = ""
if "out_uri" not in config["emitter_arg"]:
out_uri = os.path.abspath(config["emitter_arg"]["out_dir"])
config["emitter_arg"]["out_dir"] = out_uri
else:
out_uri = config["emitter_arg"]["out_uri"]
out_bucket = out_uri.split("://")[1].split("/")[0]
# Resolve sim_data_path if provided
if config["sim_data_path"] is not None:
config["sim_data_path"] = os.path.abspath(config["sim_data_path"])
filesystem, outdir = fs.FileSystem.from_uri(out_uri)
outdir = os.path.join(outdir, experiment_id, "nextflow")
out_uri = os.path.join(out_uri, experiment_id, "nextflow")
repo_dir = os.path.dirname(os.path.dirname(__file__))
local_outdir = os.path.join(repo_dir, "nextflow_temp", experiment_id)
os.makedirs(local_outdir, exist_ok=True)
filesystem.create_dir(outdir)
temp_config_path = f"{local_outdir}/workflow_config.json"
final_config_path = os.path.join(outdir, "workflow_config.json")
final_config_uri = os.path.join(out_uri, "workflow_config.json")
with open(temp_config_path, "w") as f:
json.dump(config, f)
if args.resume is None:
copy_to_filesystem(temp_config_path, final_config_path, filesystem)
nf_config = os.path.join(os.path.dirname(__file__), "nextflow", "config.template")
with open(nf_config, "r") as f:
nf_config = f.readlines()
nf_config = "".join(nf_config)
nf_config = nf_config.replace("EXPERIMENT_ID", experiment_id)
nf_config = nf_config.replace("CONFIG_FILE", final_config_uri)
nf_config = nf_config.replace("BUCKET", out_bucket)
nf_config = nf_config.replace(
"PUBLISH_DIR", os.path.dirname(os.path.dirname(out_uri))
)
nf_config = nf_config.replace("PARCA_CPUS", str(config["parca_options"]["cpus"]))
# By default, assume running on local device
nf_profile = "standard"
# If not running on a local device, build container images according
# to options under gcloud or sherlock configuration keys
cloud_config = config.get("gcloud", None)
if cloud_config is not None:
nf_profile = "gcloud"
project_id = subprocess.run(
["gcloud", "config", "get", "project"], stdout=subprocess.PIPE, text=True
).stdout.strip()
region = subprocess.run(
["gcloud", "config", "get", "compute/region"],
stdout=subprocess.PIPE,
text=True,
).stdout.strip()
image_prefix = f"{region}-docker.pkg.dev/{project_id}/vecoli/"
runtime_image_name = cloud_config.get("runtime_image_name", None)
if cloud_config.get("build_runtime_image", False):
if runtime_image_name is None:
raise RuntimeError("Must supply name for runtime image.")
build_runtime_image(runtime_image_name)
wcm_image_name = cloud_config.get("wcm_image_name", None)
if wcm_image_name is None:
raise RuntimeError("Must supply name for WCM image.")
if cloud_config.get("build_wcm_image", False):
if runtime_image_name is None:
raise RuntimeError("Must supply name for runtime image.")
build_wcm_image(wcm_image_name, runtime_image_name)
nf_config = nf_config.replace("IMAGE_NAME", image_prefix + wcm_image_name)
sherlock_config = config.get("sherlock", None)
if sherlock_config is not None:
if nf_profile == "gcloud":
raise RuntimeError(
"Cannot set both Sherlock and Google Cloud "
"options in the input JSON."
)
runtime_image_name = sherlock_config.get("runtime_image_name", None)
if runtime_image_name is None:
raise RuntimeError("Must supply name for runtime image.")
if sherlock_config.get("build_runtime_image", False):
build_runtime_image(runtime_image_name, True)
nf_config = nf_config.replace("IMAGE_NAME", runtime_image_name)
subprocess.run(
[
"apptainer",
"exec",
"-B",
f"{repo_dir}:{repo_dir}",
"--cwd",
repo_dir,
"--writable-tmpfs",
"-e",
runtime_image_name,
"uv",
"sync",
"--frozen",
"--no-cache",
]
)
if sherlock_config.get("jenkins", False):
nf_profile = "jenkins"
else:
nf_profile = "sherlock"
local_config = os.path.join(local_outdir, "nextflow.config")
with open(local_config, "w") as f:
f.writelines(nf_config)
run_parca, sim_imports, sim_workflow = generate_code(config)
nf_template_path = os.path.join(
os.path.dirname(__file__), "nextflow", "template.nf"
)
with open(nf_template_path, "r") as f:
nf_template = f.readlines()
nf_template = "".join(nf_template)
nf_template = nf_template.replace("RUN_PARCA", run_parca)
nf_template = nf_template.replace("IMPORTS", sim_imports)
nf_template = nf_template.replace("WORKFLOW", sim_workflow)
local_workflow = os.path.join(local_outdir, "main.nf")
with open(local_workflow, "w") as f:
f.writelines(nf_template)
workflow_path = os.path.join(out_uri, "main.nf")
config_path = os.path.join(out_uri, "nextflow.config")
if args.resume is None:
copy_to_filesystem(local_workflow, os.path.join(outdir, "main.nf"), filesystem)
copy_to_filesystem(
local_config, os.path.join(outdir, "nextflow.config"), filesystem
)
# Start nextflow workflow
report_path = os.path.join(
out_uri,
f"{experiment_id}_report.html",
)
workdir = os.path.join(out_uri, "nextflow_workdirs")
if nf_profile == "standard" or nf_profile == "gcloud":
subprocess.run(
[
"nextflow",
"-C",
local_config,
"run",
local_workflow,
"-profile",
nf_profile,
"-with-report",
report_path,
"-work-dir",
workdir,
"-resume" if args.resume is not None else "",
],
check=True,
)
elif nf_profile == "sherlock":
batch_script = os.path.join(local_outdir, "nextflow_job.sh")
with open(batch_script, "w") as f:
f.write(f"""#!/bin/bash
#SBATCH --job-name="nextflow-{experiment_id}"
#SBATCH --time=7-00:00:00
#SBATCH --cpus-per-task 1
#SBATCH --mem=4GB
#SBATCH --partition=mcovert
nextflow -C {config_path} run {workflow_path} -profile {nf_profile} \
-with-report {report_path} -work-dir {workdir} {"-resume" if args.resume is not None else ""}
""")
copy_to_filesystem(
batch_script, os.path.join(outdir, "nextflow_job.sh"), filesystem
)
subprocess.run(["sbatch", batch_script], check=True)
elif nf_profile == "jenkins":
subprocess.run(
[
"nextflow",
"-C",
config_path,
"run",
workflow_path,
"-profile",
"sherlock",
"-with-report",
report_path,
"-work-dir",
workdir,
],
check=True,
)
shutil.rmtree(local_outdir)
if __name__ == "__main__":
main()