runscripts.workflow

runscripts.workflow.LIST_KEYS_TO_MERGE = ('save_times', 'add_processes', 'exclude_processes', 'processes', 'engine_process_reports', 'initial_state_overrides')

Special configuration keys that are list values which are concatenated together when they are found in multiple sources (e.g. default JSON and user-specified JSON) instead of being directly overriden.

runscripts.workflow.build_image_cmd(image_name, apptainer=False)[source]
Return type:

list[str]

runscripts.workflow.check_job_state(job_id, timeout_seconds=3600, sleep_interval=30)[source]

Check the state of a SLURM job until it completes or fails.

Parameters:
  • job_id – The SLURM job ID to check

  • timeout_seconds – Maximum time to wait in seconds (default: 1 hour)

  • sleep_interval – Time to sleep between checks in seconds (default: 30 seconds)

Returns:

True if job completed successfully, False otherwise

runscripts.workflow.copy_to_filesystem(source, dest, filesystem)[source]

Robustly copy the contents of a local source file to a destination path on a PyArrow filesystem.

Parameters:
  • source (str) – Path to source file on local filesystem

  • dest (str) – Path to destination file on PyArrow filesystem. If Cloud Storage bucket, DO NOT include gs:// or gcs://.

  • filesystem (FileSystem) – PyArrow filesystem instantiated from URI of dest

runscripts.workflow.forward_sbatch_output(batch_script, output_log, timeout_seconds=3600)[source]

Submit a SLURM job that is configured to pipe its output to a log file. Then, monitor the log file with tail -f and print the output to stdout. This function will exit when the job completes.

Parameters:
  • batch_script (str)

  • output_log (str)

  • timeout_seconds (int)

runscripts.workflow.generate_code(config)[source]
runscripts.workflow.generate_colony(seeds)[source]

Create strings to import and compose Nextflow processes for colony sims.

Parameters:

seeds (int)

runscripts.workflow.generate_lineage(seed, n_init_sims, generations, single_daughters, analysis_config)[source]

Create strings to import and compose Nextflow processes for lineage sims: cells that divide for a number of generations but do not interact. Also contains import statements and workflow jobs for analysis scripts.

Parameters:
  • seed (int) – First seed for first sim

  • n_init_sims (int) – Number of sims to initialize with different seeds

  • generations (int) – Number of generations to run for each seed

  • single_daughters (bool) – If True, only simulate one daughter cell each gen

  • analysis_config (dict[str, dict[str, dict]]) –

    Dictionary with any of the following keys:

    {
        'variant': analyses to run on output of all cells combined,
        'cohort': analyses to run on output grouped by variant,
        'multigen': analyses to run on output grouped by variant & seed,
        'single': analyses to run on output for each individual cell,
        'parca': analyses to run on parameter calculator output
    }
    
    Each key corresponds to a mapping from analysis name (as defined
    in ``ecol/analysis/__init__.py``) to keyword arguments.
    

Returns:

2-element tuple containing

  • sim_imports: All include statements for Nextflow sim processes

  • sim_workflow: Fully composed workflow for entire lineage

runscripts.workflow.main()[source]
runscripts.workflow.merge_dicts(a, b)[source]

Recursively merges dictionary b into dictionary a. This mutates dictionary a.