import argparse
import json
import os
import pathlib
import random
import shutil
import subprocess
import sys
import time
from datetime import datetime
from urllib import parse
from pyarrow import fs
LIST_KEYS_TO_MERGE = (
"save_times",
"add_processes",
"exclude_processes",
"processes",
"engine_process_reports",
"initial_state_overrides",
)
"""
Special configuration keys that are list values which are concatenated
together when they are found in multiple sources (e.g. default JSON and
user-specified JSON) instead of being directly overriden.
"""
CONFIG_DIR_PATH = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"ecoli",
"composites",
"ecoli_configs",
)
NEXTFLOW_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "nextflow")
MULTIDAUGHTER_CHANNEL = """
generationSize = {gen_size}
simCh
.map {{ tuple(groupKey(it[1..4], generationSize[it[4]]), it[0], it[1], it[2], it[3], it[4] ) }}
.groupTuple(remainder: true)
.map {{ tuple(it[1][0], it[2][0], it[3][0], it[4][0], it[5][0]) }}
.set {{ multiDaughterCh }}
"""
MULTIGENERATION_CHANNEL = """
simCh
.groupTuple(by: [1, 2, 3], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2], it[3]) }}
.set {{ multiGenerationCh }}
"""
MULTISEED_CHANNEL = """
simCh
.groupTuple(by: [1, 2], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2]) }}
.set {{ multiSeedCh }}
"""
MULTIVARIANT_CHANNEL = """
// Group once to deduplicate variant names and pickles
// Group again into single value for entire experiment
simCh
.groupTuple(by: [1, 2], size: {size}, remainder: true)
.map {{ tuple(it[0][0], it[1], it[2]) }}
.groupTuple(by: [1])
.set {{ multiVariantCh }}
"""
[docs]
def merge_dicts(a, b):
"""
Recursively merges dictionary b into dictionary a.
This mutates dictionary a.
"""
for key, value in b.items():
if isinstance(value, dict) and key in a and isinstance(a[key], dict):
# If both values are dictionaries, recursively merge
merge_dicts(a[key], value)
else:
# Otherwise, overwrite or add the value from b to a
a[key] = value
[docs]
def generate_colony(seeds: int):
"""
Create strings to import and compose Nextflow processes for colony sims.
"""
return [], []
[docs]
def generate_lineage(
seed: int,
n_init_sims: int,
generations: int,
single_daughters: bool,
analysis_config: dict[str, dict[str, dict]],
):
"""
Create strings to import and compose Nextflow processes for lineage sims:
cells that divide for a number of generations but do not interact. Also
contains import statements and workflow jobs for analysis scripts.
Args:
seed: First seed for first sim
n_init_sims: Number of sims to initialize with different seeds
generations: Number of generations to run for each seed
single_daughters: If True, only simulate one daughter cell each gen
analysis_config: Dictionary with any of the following keys::
{
'variant': analyses to run on output of all cells combined,
'cohort': analyses to run on output grouped by variant,
'multigen': analyses to run on output grouped by variant & seed,
'single': analyses to run on output for each individual cell,
'parca': analyses to run on parameter calculator output
}
Each key corresponds to a mapping from analysis name (as defined
in ``ecol/analysis/__init__.py``) to keyword arguments.
Returns:
2-element tuple containing
- **sim_imports**: All `include` statements for Nextflow sim processes
- **sim_workflow**: Fully composed workflow for entire lineage
"""
sim_imports = []
sim_workflow = [f"\tchannel.of( {seed}..<{seed + n_init_sims} ).set {{ seedCh }}"]
all_sim_tasks = []
for gen in range(generations):
name = f"sim_gen_{gen + 1}"
# Handle special case of 1st generation
if gen == 0:
sim_imports.append(
f"include {{ simGen0 as {name} }} from '{NEXTFLOW_DIR}/sim'"
)
sim_workflow.append(
(
f"\t{name}(params.config, variantCh.combine(seedCh).combine([1]), '0')"
)
)
all_sim_tasks.append(f"{name}.out.metadata")
if not single_daughters:
sim_workflow.append(
f"\t{name}.out.nextGen0.mix({name}.out.nextGen1).set {{ {name}_nextGen }}"
)
else:
sim_workflow.append(f"\t{name}.out.nextGen0.set {{ {name}_nextGen }}")
continue
sim_imports.append(f"include {{ sim as {name} }} from '{NEXTFLOW_DIR}/sim'")
parent = f"sim_gen_{gen}"
sim_workflow.append(f"\t{name}({parent}_nextGen)")
if not single_daughters:
sim_workflow.append(
f"\t{name}.out.nextGen0.mix({name}.out.nextGen1).set {{ {name}_nextGen }}"
)
else:
sim_workflow.append(f"\t{name}.out.nextGen0.set {{ {name}_nextGen }}")
all_sim_tasks.append(f"{name}.out.metadata")
# Channel that combines metadata for all sim tasks
if len(all_sim_tasks) > 1:
tasks = all_sim_tasks[0]
other_tasks = ", ".join(all_sim_tasks[1:])
sim_workflow.append(f"\t{tasks}.mix({other_tasks}).set {{ simCh }}")
else:
sim_workflow.append(f"\t{all_sim_tasks[0]}.set {{ simCh }}")
sims_per_seed = generations if single_daughters else 2**generations - 1
if analysis_config.get("multivariant", False):
# Channel that groups all sim tasks
sim_workflow.append(
MULTIVARIANT_CHANNEL.format(size=sims_per_seed * n_init_sims)
)
sim_workflow.append(
"\tanalysisMultiVariant(params.config, kb, multiVariantCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiVariant }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multiseed", False):
# Channel that groups sim tasks by variant sim_data
sim_workflow.append(MULTISEED_CHANNEL.format(size=sims_per_seed * n_init_sims))
sim_workflow.append(
"\tanalysisMultiSeed(params.config, kb, multiSeedCh, variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiSeed }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multigeneration", False):
# Channel that groups sim tasks by variant sim_data and initial seed
sim_workflow.append(MULTIGENERATION_CHANNEL.format(size=sims_per_seed))
sim_workflow.append(
"\tanalysisMultiGeneration(params.config, kb, multiGenerationCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiGeneration }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("multidaughter", False) and not single_daughters:
# Channel that groups sim tasks by variant sim_data, initial seed, and generation
# When simulating both daughters, will have >1 cell for generation >1
gen_size = (
"[" + ", ".join([f"{g + 1}: {2**g}" for g in range(generations)]) + "]"
)
sim_workflow.append(MULTIDAUGHTER_CHANNEL.format(gen_size=gen_size))
sim_workflow.append(
"\tanalysisMultiDaughter(params.config, kb, multiDaughterCh, "
"variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisMultiDaughter }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("single", False):
sim_workflow.append(
"\tanalysisSingle(params.config, kb, simCh, variantMetadataCh)"
)
sim_imports.append(
f"include {{ analysisSingle }} from '{NEXTFLOW_DIR}/analysis'"
)
if analysis_config.get("parca", False):
sim_workflow.append("\tanalysisParca(params.config, kb)")
return sim_imports, sim_workflow
[docs]
def generate_code(config):
sim_data_path = config.get("sim_data_path")
if sim_data_path is not None:
kb_dir = os.path.dirname(sim_data_path)
run_parca = [
f"\tfile('{kb_dir}').copyTo(\"${{params.publishDir}}/${{params.experimentId}}/parca/kb\")",
f"\tChannel.fromPath('{kb_dir}').toList().set {{ kb }}",
]
else:
run_parca = ["\trunParca(params.config)", "\trunParca.out.toList().set {kb}"]
seed = config.get("seed", 0)
generations = config.get("generations", 0)
if generations:
lineage_seed = config.get("lineage_seed", 0)
n_init_sims = config.get("n_init_sims")
print(
f"Specified generations: initial lineage seed {lineage_seed}, {n_init_sims} initial sims"
)
single_daughters = config.get("single_daughters", True)
sim_imports, sim_workflow = generate_lineage(
lineage_seed,
n_init_sims,
generations,
single_daughters,
config.get("analysis_options", {}),
)
else:
sim_imports, sim_workflow = generate_colony(seed, n_init_sims)
return "\n".join(run_parca), "\n".join(sim_imports), "\n".join(sim_workflow)
[docs]
def build_image_cmd(image_name, apptainer=False) -> list[str]:
build_script = os.path.join(
os.path.dirname(__file__), "container", "build-image.sh"
)
cmd = [build_script, "-i", image_name]
if apptainer:
cmd.append("-a")
return cmd
[docs]
def copy_to_filesystem(source: str, dest: str, filesystem: fs.FileSystem):
"""
Robustly copy the contents of a local source file to a destination path on
a PyArrow filesystem.
Args:
source: Path to source file on local filesystem
dest: Path to destination file on PyArrow filesystem. If Cloud Storage
bucket, DO NOT include ``gs://`` or ``gcs://``.
filesystem: PyArrow filesystem instantiated from URI of ``dest``
"""
with filesystem.open_output_stream(dest) as stream:
with open(source, "rb") as f:
stream.write(f.read())
[docs]
def forward_sbatch_output(
batch_script: str,
output_log: str,
):
"""
Submit a SLURM job that is configured to pipe its output to a log file.
Then, monitor the log file with `tail -f` and print the output to stdout.
This function will exit when the job completes.
"""
# Delete any pre-existing log file
log_path = pathlib.Path(output_log)
if log_path.exists():
log_path.unlink()
# Submit the job and get the job ID
result = subprocess.run(
["sbatch", "--parsable", batch_script],
check=True,
text=True,
stdout=subprocess.PIPE,
)
job_id = result.stdout.strip().split(";")[0]
print(f"Submitted SLURM job {job_id}, log file: {output_log}")
# Track last position read in output log file
last_position = 0
while True:
# Read any new content from the log file
if log_path.exists():
with open(output_log, "r") as f:
# Move to where we left off
f.seek(last_position)
# Read and print new content
new_content = f.read()
if new_content:
print(new_content, end="", flush=True)
# Remember where we are now
last_position = f.tell()
# Check if job is still running
job_status = subprocess.run(
["squeue", "-j", job_id, "-h", "-o", "%t"],
text=True,
stdout=subprocess.PIPE,
).stdout.strip()
# If job no longer exists in queue
if not job_status:
# Catch up with final output
time.sleep(5)
with open(output_log, "r") as f:
f.seek(last_position)
new_content = f.read()
if new_content:
print(new_content, end="", flush=True)
break
# Wait a bit before checking job status again
time.sleep(30)
# Final check of job success
job_state = (
subprocess.run(
["sacct", "-j", job_id, "-o", "State", "-n", "--parsable2"],
text=True,
stdout=subprocess.PIPE,
)
.stdout.strip()
.split("\n")[0]
)
# Stop process if submitted job did not complete successfully
if job_state != "COMPLETED":
print(f"Job {job_id} failed with state {job_state}")
sys.exit(1)
[docs]
def main():
parser = argparse.ArgumentParser()
config_file = os.path.join(CONFIG_DIR_PATH, "default.json")
parser.add_argument(
"--config",
action="store",
default=config_file,
help=(
"Path to configuration file for the simulation. "
"All key-value pairs in this file will be applied on top "
f"of the options defined in {config_file}."
),
)
parser.add_argument(
"--resume",
type=str,
default=None,
help="Resume workflow with given experiment ID. The experiment ID must "
"match the supplied configuration file and if suffix_time was used, must "
"contain the full time suffix (suffix_time will not be applied again).",
)
args = parser.parse_args()
with open(config_file, "r") as f:
config = json.load(f)
if args.config is not None:
config_file = args.config
with open(args.config, "r") as f:
user_config = json.load(f)
for key in LIST_KEYS_TO_MERGE:
user_config.setdefault(key, [])
user_config[key].extend(config.get(key, []))
if key == "engine_process_reports":
user_config[key] = [tuple(path) for path in user_config[key]]
# Ensures there are no duplicates in d2
user_config[key] = list(set(user_config[key]))
user_config[key].sort()
merge_dicts(config, user_config)
experiment_id = config["experiment_id"]
if experiment_id is None:
raise RuntimeError("No experiment ID was provided.")
if args.resume is not None:
experiment_id = args.resume
config["experiment_id"] = args.resume
elif config["suffix_time"]:
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
experiment_id = experiment_id + "_" + current_time
config["experiment_id"] = experiment_id
config["suffix_time"] = False
# Special characters are messy so do not allow them
if experiment_id != parse.quote_plus(experiment_id):
raise TypeError(
"Experiment ID cannot contain special characters"
f"that change the string when URL quoted: {experiment_id}"
f" != {parse.quote_plus(experiment_id)}"
)
# Resolve output directory
out_bucket = ""
if "out_uri" not in config["emitter_arg"]:
out_uri = os.path.abspath(config["emitter_arg"]["out_dir"])
config["emitter_arg"]["out_dir"] = out_uri
else:
out_uri = config["emitter_arg"]["out_uri"]
out_bucket = out_uri.split("://")[1].split("/")[0]
# Resolve sim_data_path if provided
if config["sim_data_path"] is not None:
config["sim_data_path"] = os.path.abspath(config["sim_data_path"])
# Use random seed for Jenkins CI runs
if config.get("sherlock", {}).get("jenkins", False):
config["lineage_seed"] = random.randint(0, 2**31 - 1)
filesystem, outdir = fs.FileSystem.from_uri(out_uri)
outdir = os.path.join(outdir, experiment_id, "nextflow")
out_uri = os.path.join(out_uri, experiment_id, "nextflow")
repo_dir = os.path.dirname(os.path.dirname(__file__))
local_outdir = os.path.join(repo_dir, "nextflow_temp", experiment_id)
os.makedirs(local_outdir, exist_ok=True)
filesystem.create_dir(outdir)
temp_config_path = f"{local_outdir}/workflow_config.json"
final_config_path = os.path.join(outdir, "workflow_config.json")
final_config_uri = os.path.join(out_uri, "workflow_config.json")
with open(temp_config_path, "w") as f:
json.dump(config, f)
if args.resume is None:
copy_to_filesystem(temp_config_path, final_config_path, filesystem)
nf_config = os.path.join(os.path.dirname(__file__), "nextflow", "config.template")
with open(nf_config, "r") as f:
nf_config = f.readlines()
nf_config = "".join(nf_config)
nf_config = nf_config.replace("EXPERIMENT_ID", experiment_id)
nf_config = nf_config.replace("CONFIG_FILE", final_config_uri)
nf_config = nf_config.replace("BUCKET", out_bucket)
nf_config = nf_config.replace(
"PUBLISH_DIR", os.path.dirname(os.path.dirname(out_uri))
)
nf_config = nf_config.replace("PARCA_CPUS", str(config["parca_options"]["cpus"]))
# By default, assume running on local device
nf_profile = "standard"
# If not running on a local device, build container images according
# to options under gcloud or sherlock configuration keys
cloud_config = config.get("gcloud", None)
if cloud_config is not None:
nf_profile = "gcloud"
project_id = subprocess.run(
["gcloud", "config", "get", "project"], stdout=subprocess.PIPE, text=True
).stdout.strip()
region = subprocess.run(
["gcloud", "config", "get", "compute/region"],
stdout=subprocess.PIPE,
text=True,
).stdout.strip()
image_prefix = f"{region}-docker.pkg.dev/{project_id}/vecoli/"
container_image = cloud_config.get("container_image", None)
if container_image is None:
raise RuntimeError("Must supply name for container image.")
if cloud_config.get("build_image", False):
image_cmd = build_image_cmd(container_image)
subprocess.run(image_cmd, check=True)
nf_config = nf_config.replace("IMAGE_NAME", image_prefix + container_image)
sherlock_config = config.get("sherlock", None)
if sherlock_config is not None:
if nf_profile == "gcloud":
raise RuntimeError(
"Cannot set both Sherlock and Google Cloud options in the input JSON."
)
nf_profile = "sherlock"
container_image = sherlock_config.get("container_image", None)
if container_image is None:
raise RuntimeError("Must supply name for container image.")
if sherlock_config.get("build_image", False):
image_cmd = " ".join(build_image_cmd(container_image, True))
image_build_script = os.path.join(local_outdir, "container.sh")
with open(image_build_script, "w") as f:
f.write(f"""#!/bin/bash
#SBATCH --job-name="build-image-{experiment_id}"
#SBATCH --time=30:00
#SBATCH --cpus-per-task 2
#SBATCH --mem=8GB
#SBATCH --partition=owners,normal
#SBATCH --output={os.path.join(local_outdir, "build-image.out")}
{image_cmd}
""")
forward_sbatch_output(
image_build_script, os.path.join(local_outdir, "build-image.out")
)
nf_config = nf_config.replace("IMAGE_NAME", container_image)
local_config = os.path.join(local_outdir, "nextflow.config")
with open(local_config, "w") as f:
f.writelines(nf_config)
run_parca, sim_imports, sim_workflow = generate_code(config)
nf_template_path = os.path.join(
os.path.dirname(__file__), "nextflow", "template.nf"
)
with open(nf_template_path, "r") as f:
nf_template = f.readlines()
nf_template = "".join(nf_template)
nf_template = nf_template.replace("RUN_PARCA", run_parca)
nf_template = nf_template.replace("IMPORTS", sim_imports)
nf_template = nf_template.replace("WORKFLOW", sim_workflow)
local_workflow = os.path.join(local_outdir, "main.nf")
with open(local_workflow, "w") as f:
f.writelines(nf_template)
workflow_path = os.path.join(out_uri, "main.nf")
config_path = os.path.join(out_uri, "nextflow.config")
if args.resume is None:
copy_to_filesystem(local_workflow, os.path.join(outdir, "main.nf"), filesystem)
copy_to_filesystem(
local_config, os.path.join(outdir, "nextflow.config"), filesystem
)
# Start nextflow workflow
report_path = os.path.join(
out_uri,
f"{experiment_id}_report.html",
)
workdir = os.path.join(out_uri, "nextflow_workdirs")
if nf_profile == "standard" or nf_profile == "gcloud":
subprocess.run(
[
"nextflow",
"-C",
local_config,
"run",
local_workflow,
"-profile",
nf_profile,
"-with-report",
report_path,
"-work-dir",
workdir,
"-resume" if args.resume is not None else "",
],
check=True,
)
elif nf_profile == "sherlock":
batch_script = os.path.join(local_outdir, "nextflow_job.sh")
hyperqueue_init = ""
hyperqueue_exit = ""
if sherlock_config.get("hyperqueue", False):
nf_profile = "sherlock_hq"
hyperqueue_init = f"""
# Set the directory which HyperQueue will use
export HQ_SERVER_DIR={os.path.join(outdir, ".hq-server")}
mkdir -p ${{HQ_SERVER_DIR}}
# Start the server in the background (&) and wait until it has started
hq server start --journal {os.path.join(outdir, ".hq-server/journal")} &
until hq job list &>/dev/null ; do sleep 1 ; done
# Enable HyperQueue automatic allocation
hq alloc add slurm --time-limit 8h -- --partition=owners,normal --mem=4GB
"""
hyperqueue_exit = "hq job wait all; hq worker stop all; hq server stop"
nf_slurm_output = os.path.join(outdir, f"{experiment_id}_slurm.out")
with open(batch_script, "w") as f:
f.write(f"""#!/bin/bash
#SBATCH --job-name="nextflow-{experiment_id}"
#SBATCH --time=7-00:00:00
#SBATCH --cpus-per-task 1
#SBATCH --mem=4GB
#SBATCH --partition=mcovert
#SBATCH --output={nf_slurm_output}
set -e
# Ensure HyperQueue shutdown on failure or interruption
trap 'exitcode=$?; {hyperqueue_exit}' EXIT
{hyperqueue_init}
nextflow -C {config_path} run {workflow_path} -profile {nf_profile} \
-with-report {report_path} -work-dir {workdir} {"-resume" if args.resume is not None else ""}
""")
copy_to_filesystem(
batch_script, os.path.join(outdir, "nextflow_job.sh"), filesystem
)
# Make stdout of workflow viewable in Jenkins
if sherlock_config.get("jenkins", False):
forward_sbatch_output(batch_script, nf_slurm_output)
else:
subprocess.run(["sbatch", batch_script], check=True)
shutil.rmtree(local_outdir)
if __name__ == "__main__":
main()