runscripts.workflow

runscripts.workflow.CLUSTER_PRESETS: dict[str, dict[str, Any]]

Default config values for different SLURM clusters.

Each key is a cluster name and each value is a dictionary with the following structure:

{
    # Environment variables to load for string substitution in any of
    # the options in the following sections. Note that the variables
    # "outdir" (experiment output directory from emitter_arg --> out_dir)
    # and "experiment_id" are always available for substitution.
    "env_vars": { ... },
    # SLURM options for building container image
    "build_image": { ... },
    # Strings to substitute in Nextflow config file (only QUEUE and
    # CLUSTER_OPTIONS are required, others use defaults from
    # configs/default.json)
    "nf_config_overrides": {
        # SLURM partition to run HyperQueue workers and non-HyperQueue jobs on
        "QUEUE": str,
        # Number of cores to allocate each HyperQueue worker
        "HQ_CORES": int,
        # Number of CPUs to allocate per simulation (max 2)
        "SIM_CPUS": int,
        # Amount of memory to allocate per simulation in GB
        "SIM_MEM": int,
        # Whether to use HyperQueue for simulation job scheduling
        "HYPERQUEUE": bool,
        # Additional cluster options to pass to SLURM
        "CLUSTER_OPTIONS": dict[str, str],
    },
    # SLURM options for Nextflow job
    "nextflow": { ... },
    # Shell commands to setup environment in Nextflow job script
    "nf_setup": str
}
class runscripts.workflow.ClusterConfig(name, config_dict, build_options, nextflow_options, nf_config_overrides, nf_setup=None)[source]

Bases: object

Resolved configuration for running on a SLURM HPC cluster.

Parameters:
static _coerce_positive_int(value, field_name)[source]
Parameters:
  • value (Any)

  • field_name (str)

Return type:

int

apply_config_overrides(nf_config)[source]
Parameters:

nf_config (str)

Return type:

str

property build_image: bool
build_options: dict[str, Any]
config_dict: dict
property jenkins: bool
name: str
nextflow_options: dict[str, Any]
nf_config_overrides: dict[str, Any]
nf_setup: str | None = None
require_container_image()[source]
Return type:

str

runscripts.workflow.LIST_KEYS_TO_MERGE = ('save_times', 'add_processes', 'exclude_processes', 'processes', 'engine_process_reports', 'initial_state_overrides')

Special configuration keys that are list values which are concatenated together when they are found in multiple sources (e.g. default JSON and user-specified JSON) instead of being directly overriden.

class runscripts.workflow._FormatDict[source]

Bases: dict

runscripts.workflow._apply_nf_config_overrides(nf_config, overrides)[source]

Apply configuration overrides to a Nextflow config string.

The keys in overrides are treated as literal placeholders in nf_config and replaced with their stringified values. The special key "CLUSTER_OPTIONS" may contain a dictionary of options, which is serialized into a command-line style string via _serialize_cluster_options. Boolean values are converted to their lowercase string representation ("true" or "false") before substitution to match JSON-style booleans.

Parameters:
Return type:

str

runscripts.workflow._confirm_overwrite(image)[source]

Prompt the user to confirm building and pushing container image to an existing AWS ECR repository or GCP Artifact Registry manifest.

Returns True if the user confirms, False otherwise.

Parameters:

image (str)

Return type:

bool

runscripts.workflow._ecr_image_exists(repo_name, image_tag, region)[source]

Return True if the given tag already exists in an ECR repository.

Parameters:
  • repo_name (str)

  • image_tag (str)

  • region (str)

Return type:

bool

runscripts.workflow._format_recursive(value, formatter, path, on_missing)[source]

Recursively apply string formatting to nested configuration values.

This helper walks over value, which may be a plain value, string, or nested mapping, and formats any strings using the provided formatter (typically a _FormatDict used with str.format_map).

The current location within the nested structure is tracked via path, which is extended as the recursion descends into dictionaries. When a placeholder in a string cannot be resolved by formatter, a KeyError is raised, the missing placeholder name is reported to the on_missing callback together with the corresponding path, and the function returns the internal _MISSING_SENTINEL so that the missing entry can be skipped by the caller.

Non-string, non-mapping values are returned unchanged.

Parameters:
runscripts.workflow._format_template_section(section, context, cluster_name, section_name)[source]

Format a configuration or template section using the provided context.

The section value (which may be a string, mapping, or nested structure) is traversed and any str.format-style placeholders are resolved using keys from context. If a placeholder cannot be resolved, the corresponding entry is skipped from the resulting structure and a warning is emitted indicating the missing placeholder and its location within cluster_name.section_name.

Parameters:
  • section (Any) – The configuration or template section to format. May be None, in which case None is returned.

  • context (dict[str, Any]) – Mapping of placeholder names to their replacement values.

  • cluster_name (str) – Name of the cluster, used only for constructing human-readable warning messages.

  • section_name (str) – Name of the section being formatted, used in warning messages and as the root path when reporting missing placeholders.

Returns:

The formatted section with all resolvable placeholders substituted and entries with missing placeholders omitted, or None if section is None.

runscripts.workflow._gcloud_image_exists(full_image_uri)[source]

Return True if the given image already exists in GCP Artifact Registry.

Parameters:

full_image_uri (str)

Return type:

bool

runscripts.workflow._load_cluster_env_values(cluster_name, env_var_map)[source]

Load environment variable values for a cluster configuration.

This helper resolves placeholders used in a cluster’s configuration by reading their corresponding environment variables. For each entry in env_var_map, it looks up the environment variable name and, if the value is set and non-empty, includes it in the returned mapping. When an environment variable is missing or empty, a warning is emitted and any options that depend on the associated placeholder will be skipped.

Parameters:
  • cluster_name (str) – Name of the cluster whose configuration is being prepared.

  • env_var_map (dict[str, str]) – Mapping from placeholder names to environment variable names.

Returns:

A mapping from placeholder names to resolved environment variable values for the given cluster.

Return type:

dict[str, str]

runscripts.workflow._merge_configs(base_config, overlay_config)[source]

Merge overlay_config into base_config, with overlay taking priority. Mutates base_config in place.

Parameters:
  • base_config (dict) – Configuration to update (lower priority)

  • overlay_config (dict) – Configuration to merge in (higher priority)

runscripts.workflow._render_slurm_directives(options)[source]

Convert a mapping of SLURM options into #SBATCH directive lines.

Each key in options is treated as a SLURM option name (without the leading --). Values that are None or the empty string are skipped entirely. Boolean values are interpreted as flags: if the value is True, a directive of the form #SBATCH --<key> is emitted; if False, the option is omitted. All other values are rendered as #SBATCH --<key>=<value>.

The returned string contains one directive per line, separated by newlines.

Parameters:

options (dict[str, Any])

Return type:

str

runscripts.workflow._serialize_cluster_options(options)[source]

Serialize cluster options into a command-line string.

Each dictionary item is converted into a --key=value flag. Keys that do not already start with "--" are automatically prefixed. Options with a value of None or the empty string are skipped. Values containing whitespace are wrapped in double quotes so they are treated as a single argument by the shell.

Parameters:

options (dict[str, Any])

Return type:

str

runscripts.workflow.build_cluster_container_image(cluster_config, experiment_id, local_outdir, thread_executor)[source]

Build container image on HPC cluster using SLURM batch job.

Parameters:
  • cluster_config (ClusterConfig)

  • experiment_id (str)

  • local_outdir (str)

  • thread_executor (ThreadPoolExecutor)

Return type:

None

runscripts.workflow.build_image_cmd(image_name, apptainer=False)[source]
Return type:

list[str]

runscripts.workflow.compute_file_hash(path, chunk_size=8192)[source]

Compute SHA256 hash of a file.

Works with both local files and cloud URIs (via fsspec when available).

Parameters:
  • path (str) – Local path or cloud URI to the file

  • chunk_size (int) – Size of chunks to read at a time

Returns:

Hexadecimal SHA256 hash of the file contents

Return type:

str

runscripts.workflow.copy_to_filesystem(source, dest, filesystem=None)[source]

Robustly copy the contents of a local source file to a destination path.

Parameters:
  • source (str) – Path to source file on local filesystem

  • dest (str) – Path to destination file on filesystem

  • filesystem (AbstractFileSystem | None) – LocalFileSystem or fsspec filesystem

runscripts.workflow.generate_code(config)[source]
runscripts.workflow.generate_colony(seeds)[source]

Create strings to import and compose Nextflow processes for colony sims.

Parameters:

seeds (int)

runscripts.workflow.generate_lineage(seed, n_init_sims, generations, single_daughters, analysis_config, different_seeds_per_variant=False)[source]

Create strings to import and compose Nextflow processes for lineage sims: cells that divide for a number of generations but do not interact. Also contains import statements and workflow jobs for analysis scripts.

Parameters:
  • seed (int) – First seed for first sim

  • n_init_sims (int) – Number of sims to initialize with different seeds

  • generations (int) – Number of generations to run for each seed

  • single_daughters (bool) – If True, only simulate one daughter cell each gen

  • different_seeds_per_variant (bool) – If True, each variant i is given seeds [seed + i*n_init_sims, seed + (i+1)*n_init_sims) so that different variants simulate statistically independent cells. If False (default), all variants share the same seed range [seed, seed + n_init_sims).

  • analysis_config (dict[str, dict[str, dict]]) –

    Dictionary with any of the following keys:

    {
        'variant': analyses to run on output of all cells combined,
        'cohort': analyses to run on output grouped by variant,
        'multigen': analyses to run on output grouped by variant & seed,
        'single': analyses to run on output for each individual cell,
        'parca': analyses to run on parameter calculator output
    }
    
    Each key corresponds to a mapping from analysis name (as defined
    in ``ecol/analysis/__init__.py``) to keyword arguments.
    

Returns:

2-element tuple containing

  • sim_imports: All include statements for Nextflow sim processes

  • sim_workflow: Fully composed workflow for entire lineage

runscripts.workflow.get_cluster_config(config, outdir, experiment_id)[source]

Resolve cluster settings using CLUSTER_PRESETS description.

Parameters:
Return type:

ClusterConfig | None

runscripts.workflow.hyperqueue_snippets(outdir)[source]

Return init and exit shell snippets for HyperQueue if enabled.

Parameters:

outdir (str)

Return type:

tuple[str, str]

runscripts.workflow.load_config_with_inheritance(config_path)[source]

Load a config file and recursively resolve all inheritance chains.

Priority order: Current config > First inherited > … > Last inherited If config A inherits from [B, D] and B inherits from [C]: Priority is A > B > C > D

Parameters:

config_path (str) – Path to the configuration file

Returns:

Fully resolved configuration dictionary

Return type:

dict

runscripts.workflow.main()[source]
runscripts.workflow.merge_dicts(a, b)[source]

Recursively merges dictionary b into dictionary a. This mutates dictionary a.

runscripts.workflow.parse_uri(uri)[source]

Parse URI and return appropriate filesystem and path.

For cloud/remote URIs (when fsspec is available), returns fsspec filesystem. For local paths, returns None and absolute path.

Parameters:

uri (str)

Return type:

tuple[AbstractFileSystem | None, str]

runscripts.workflow.run_ecr_script(image, build, region='us-gov-west-1')[source]

Run the ECR build script to either build/push or just resolve the URI.

Parameters:
  • image (str) – Image specification, either full URI or repo:tag format.

  • build (bool) – If True, build and push the image. If False, just resolve the URI.

  • region (str) – AWS region for ECR (e.g., ‘us-gov-west-1’ for GovCloud).

Returns:

Full ECR image URI.

Return type:

str

runscripts.workflow.stream_log(output_log, sleep_time=1, stop_event=None)[source]

Periodically stream appended content from output_log to stdout.

Parameters:
  • output_log (str)

  • sleep_time (int)

  • stop_event (Event | None)

runscripts.workflow.strip_resource_keys(config)[source]

Create a copy of the config with resource-only keys removed.

This allows changing resource allocations (memory, CPUs, time limits) without invalidating Nextflow’s cache, since these keys should not affect simulation output.

Parameters:

config (dict) – Full configuration dictionary

Returns:

Config dictionary with resource-only keys stripped

Return type:

dict

runscripts.workflow.submit_cluster_nextflow_job(cluster_config, experiment_id, local_outdir, outdir, config_path, workflow_path, report_path, workdir, resume, hyperqueue, filesystem, thread_executor=None)[source]

Submit Nextflow workflow as SLURM batch job on HPC cluster.

Parameters:
  • cluster_config (ClusterConfig)

  • experiment_id (str)

  • local_outdir (str)

  • outdir (str)

  • config_path (str)

  • workflow_path (str)

  • report_path (str)

  • workdir (str)

  • resume (bool)

  • hyperqueue (bool)

  • filesystem (AbstractFileSystem | None)

  • thread_executor (ThreadPoolExecutor | None)

Return type:

None