runscripts.workflow
- runscripts.workflow.CLUSTER_PRESETS: dict[str, dict[str, Any]]
Default config values for different SLURM clusters.
Each key is a cluster name and each value is a dictionary with the following structure:
{ # Environment variables to load for string substitution in any of # the options in the following sections. Note that the variables # "outdir" (experiment output directory from emitter_arg --> out_dir) # and "experiment_id" are always available for substitution. "env_vars": { ... }, # SLURM options for building container image "build_image": { ... }, # Strings to substitute in Nextflow config file (only QUEUE and # CLUSTER_OPTIONS are required, others use defaults from # configs/default.json) "nf_config_overrides": { # SLURM partition to run HyperQueue workers and non-HyperQueue jobs on "QUEUE": str, # Number of cores to allocate each HyperQueue worker "HQ_CORES": int, # Number of CPUs to allocate per simulation (max 2) "SIM_CPUS": int, # Amount of memory to allocate per simulation in GB "SIM_MEM": int, # Whether to use HyperQueue for simulation job scheduling "HYPERQUEUE": bool, # Additional cluster options to pass to SLURM "CLUSTER_OPTIONS": dict[str, str], }, # SLURM options for Nextflow job "nextflow": { ... }, # Shell commands to setup environment in Nextflow job script "nf_setup": str }
- class runscripts.workflow.ClusterConfig(name, config_dict, build_options, nextflow_options, nf_config_overrides, nf_setup=None)[source]
Bases:
objectResolved configuration for running on a SLURM HPC cluster.
- Parameters:
- runscripts.workflow.LIST_KEYS_TO_MERGE = ('save_times', 'add_processes', 'exclude_processes', 'processes', 'engine_process_reports', 'initial_state_overrides')
Special configuration keys that are list values which are concatenated together when they are found in multiple sources (e.g. default JSON and user-specified JSON) instead of being directly overriden.
- runscripts.workflow._apply_nf_config_overrides(nf_config, overrides)[source]
Apply configuration overrides to a Nextflow config string.
The keys in
overridesare treated as literal placeholders innf_configand replaced with their stringified values. The special key"CLUSTER_OPTIONS"may contain a dictionary of options, which is serialized into a command-line style string via_serialize_cluster_options. Boolean values are converted to their lowercase string representation ("true"or"false") before substitution to match JSON-style booleans.
- runscripts.workflow._confirm_overwrite(image)[source]
Prompt the user to confirm building and pushing container image to an existing AWS ECR repository or GCP Artifact Registry manifest.
Returns True if the user confirms, False otherwise.
- runscripts.workflow._ecr_image_exists(repo_name, image_tag, region)[source]
Return True if the given tag already exists in an ECR repository.
- runscripts.workflow._format_recursive(value, formatter, path, on_missing)[source]
Recursively apply string formatting to nested configuration values.
This helper walks over
value, which may be a plain value, string, or nested mapping, and formats any strings using the providedformatter(typically a_FormatDictused withstr.format_map).The current location within the nested structure is tracked via
path, which is extended as the recursion descends into dictionaries. When a placeholder in a string cannot be resolved byformatter, aKeyErroris raised, the missing placeholder name is reported to theon_missingcallback together with the correspondingpath, and the function returns the internal_MISSING_SENTINELso that the missing entry can be skipped by the caller.Non-string, non-mapping values are returned unchanged.
- runscripts.workflow._format_template_section(section, context, cluster_name, section_name)[source]
Format a configuration or template section using the provided context.
The
sectionvalue (which may be a string, mapping, or nested structure) is traversed and anystr.format-style placeholders are resolved using keys fromcontext. If a placeholder cannot be resolved, the corresponding entry is skipped from the resulting structure and a warning is emitted indicating the missing placeholder and its location withincluster_name.section_name.- Parameters:
section (Any) – The configuration or template section to format. May be
None, in which caseNoneis returned.context (dict[str, Any]) – Mapping of placeholder names to their replacement values.
cluster_name (str) – Name of the cluster, used only for constructing human-readable warning messages.
section_name (str) – Name of the section being formatted, used in warning messages and as the root path when reporting missing placeholders.
- Returns:
The formatted section with all resolvable placeholders substituted and entries with missing placeholders omitted, or
NoneifsectionisNone.
- runscripts.workflow._gcloud_image_exists(full_image_uri)[source]
Return True if the given image already exists in GCP Artifact Registry.
- runscripts.workflow._load_cluster_env_values(cluster_name, env_var_map)[source]
Load environment variable values for a cluster configuration.
This helper resolves placeholders used in a cluster’s configuration by reading their corresponding environment variables. For each entry in
env_var_map, it looks up the environment variable name and, if the value is set and non-empty, includes it in the returned mapping. When an environment variable is missing or empty, a warning is emitted and any options that depend on the associated placeholder will be skipped.- Parameters:
- Returns:
A mapping from placeholder names to resolved environment variable values for the given cluster.
- Return type:
- runscripts.workflow._merge_configs(base_config, overlay_config)[source]
Merge overlay_config into base_config, with overlay taking priority. Mutates base_config in place.
- runscripts.workflow._render_slurm_directives(options)[source]
Convert a mapping of SLURM options into #SBATCH directive lines.
Each key in
optionsis treated as a SLURM option name (without the leading--). Values that areNoneor the empty string are skipped entirely. Boolean values are interpreted as flags: if the value isTrue, a directive of the form#SBATCH --<key>is emitted; ifFalse, the option is omitted. All other values are rendered as#SBATCH --<key>=<value>.The returned string contains one directive per line, separated by newlines.
- runscripts.workflow._serialize_cluster_options(options)[source]
Serialize cluster options into a command-line string.
Each dictionary item is converted into a
--key=valueflag. Keys that do not already start with"--"are automatically prefixed. Options with a value ofNoneor the empty string are skipped. Values containing whitespace are wrapped in double quotes so they are treated as a single argument by the shell.
- runscripts.workflow.build_cluster_container_image(cluster_config, experiment_id, local_outdir, thread_executor)[source]
Build container image on HPC cluster using SLURM batch job.
- Parameters:
cluster_config (ClusterConfig)
experiment_id (str)
local_outdir (str)
thread_executor (ThreadPoolExecutor)
- Return type:
None
- runscripts.workflow.compute_file_hash(path, chunk_size=8192)[source]
Compute SHA256 hash of a file.
Works with both local files and cloud URIs (via fsspec when available).
- runscripts.workflow.copy_to_filesystem(source, dest, filesystem=None)[source]
Robustly copy the contents of a local source file to a destination path.
- runscripts.workflow.generate_colony(seeds)[source]
Create strings to import and compose Nextflow processes for colony sims.
- Parameters:
seeds (int)
- runscripts.workflow.generate_lineage(seed, n_init_sims, generations, single_daughters, analysis_config, different_seeds_per_variant=False)[source]
Create strings to import and compose Nextflow processes for lineage sims: cells that divide for a number of generations but do not interact. Also contains import statements and workflow jobs for analysis scripts.
- Parameters:
seed (int) – First seed for first sim
n_init_sims (int) – Number of sims to initialize with different seeds
generations (int) – Number of generations to run for each seed
single_daughters (bool) – If True, only simulate one daughter cell each gen
different_seeds_per_variant (bool) – If True, each variant
iis given seeds[seed + i*n_init_sims, seed + (i+1)*n_init_sims)so that different variants simulate statistically independent cells. If False (default), all variants share the same seed range[seed, seed + n_init_sims).analysis_config (dict[str, dict[str, dict]]) –
Dictionary with any of the following keys:
{ 'variant': analyses to run on output of all cells combined, 'cohort': analyses to run on output grouped by variant, 'multigen': analyses to run on output grouped by variant & seed, 'single': analyses to run on output for each individual cell, 'parca': analyses to run on parameter calculator output } Each key corresponds to a mapping from analysis name (as defined in ``ecol/analysis/__init__.py``) to keyword arguments.
- Returns:
2-element tuple containing
sim_imports: All include statements for Nextflow sim processes
sim_workflow: Fully composed workflow for entire lineage
- runscripts.workflow.get_cluster_config(config, outdir, experiment_id)[source]
Resolve cluster settings using CLUSTER_PRESETS description.
- Parameters:
- Return type:
ClusterConfig | None
- runscripts.workflow.hyperqueue_snippets(outdir)[source]
Return init and exit shell snippets for HyperQueue if enabled.
- runscripts.workflow.load_config_with_inheritance(config_path)[source]
Load a config file and recursively resolve all inheritance chains.
Priority order: Current config > First inherited > … > Last inherited If config A inherits from [B, D] and B inherits from [C]: Priority is A > B > C > D
- runscripts.workflow.merge_dicts(a, b)[source]
Recursively merges dictionary b into dictionary a. This mutates dictionary a.
- runscripts.workflow.parse_uri(uri)[source]
Parse URI and return appropriate filesystem and path.
For cloud/remote URIs (when fsspec is available), returns fsspec filesystem. For local paths, returns None and absolute path.
- runscripts.workflow.run_ecr_script(image, build, region='us-gov-west-1')[source]
Run the ECR build script to either build/push or just resolve the URI.
- runscripts.workflow.stream_log(output_log, sleep_time=1, stop_event=None)[source]
Periodically stream appended content from
output_logto stdout.
- runscripts.workflow.strip_resource_keys(config)[source]
Create a copy of the config with resource-only keys removed.
This allows changing resource allocations (memory, CPUs, time limits) without invalidating Nextflow’s cache, since these keys should not affect simulation output.