Source code for ecoli.experiments.ecoli_engine_process

"""
Composite for simulations with EngineProcess cells in an environment.

.. note::
    This composite requires a config with the spatial environment
    enabled.
"""

from datetime import datetime, timezone
import os
import re
import gc
import warnings

import numpy as np
from vivarium.core.composer import Composer, Composite
from vivarium.core.emitter import SharedRamEmitter
from vivarium.core.engine import Engine
from vivarium.core.serialize import serialize_value
from vivarium.library.dict_utils import deep_merge
from vivarium.library.topology import get_in

from ecoli.experiments.ecoli_master_sim import (
    EcoliSim,
    SimConfig,
    get_git_revision_hash,
    get_git_status,
    report_profiling,
    tuplify_topology,
)
from ecoli.library.logging_tools import write_json
from ecoli.library.sim_data import RAND_MAX
from ecoli.library.schema import not_a_process
from ecoli.library.json_state import get_state_from_file
from ecoli.processes.engine_process import EngineProcess
from ecoli.processes.environment.field_timeline import FieldTimeline
from ecoli.processes.environment.lysis import Lysis
from ecoli.processes.division_detector import DivisionDetector
from ecoli.composites.environment.lattice import Lattice
from ecoli.composites.ecoli_configs import CONFIG_DIR_PATH


[docs] class EcoliInnerWrapper(Composite): """Helper class to ensure calling initial_state() on the composite generated by EcoliInnerSim returns the cached initial state and removes the internal reference to this state."""
[docs] def initial_state(self, config): initial_state = self.cached_state self.cached_state = None return initial_state
[docs] class EcoliInnerSim(Composer): """Inner composer to be used with :py:class:`~ecoli.processes.engine_process.EngineProcess.""" defaults = { "agent_id": "0", "seed": 0, "division_threshold": None, "division_variable": None, "initial_state": None, "chromosome_path": None, }
[docs] def generate(self, config=None): """ Generates a complete E. coli simulation composite to be used as the inner simulation in an EngineProcess. This requires caching the initial state generated in the course of calling :py:meth:`~ecoli.experiments.ecoli_master_sim.EcoliSim.build_ecoli` and wrapping the returned composite using :py:class:`~.EcoliInnerWrapper` to ensure that the cached initial state is returned when the ``initial_state`` method is called on the composite. Additionally, this requires a special :py:class:`~ecoli.processes.division_detector.DivisionDetector` Step to set the store located at ``("division_trigger",)`` to ``True`` when it is time to divide. This is necessary because the actual logic of division is included in the :py:meth:`~ecoli.processes.engine_process.EngineProcess.next_update` method of EngineProcess, making that part of the normal :py:class:`~ecoli.processes.cell_division.Division` process obsolete but not the part that signals the time for division (whether that be via a D period or mass threshold mechanism). The :py:class:`~ecoli.processes.division_detector.DivisionDetector` process takes over that role while relinquishing control over the actual division to EngineProcess. """ if config is None: config = self.config ecoli_sim = EcoliSim( { **self.config, **config, # Division is handled by EngineProcess but we want to have # Ecoli composite create Division process so we can swap it # with DivisionDetector "divide": True, "spatial_environment": False, } ) ecoli_sim.build_ecoli() composite = ecoli_sim.ecoli # Composite embedded under ('agents', agent_id) when 'divide' is True composite = { key: val["agents"][config["agent_id"]] for key, val in composite.items() if key not in ["state", "_schema"] } # Cache initial state so wrapping with EcoliInnerWrapper # causes initial_state() method to return cached state composite["cached_state"] = ecoli_sim.generated_initial_state["agents"][ config["agent_id"] ] composite = EcoliInnerWrapper(composite) # DivisionDetector tells EngineProcess when to divide division_step = composite["steps"]["division"] div_detector_config = { "division_threshold": config["division_threshold"], "dry_mass_inc_dict": division_step.dry_mass_inc_dict, "division_mass_multiplier": division_step.division_mass_multiplier, } composite["steps"]["division"] = DivisionDetector(div_detector_config) composite["topology"]["division"] = { "division_variable": config["division_variable"], "division_trigger": ("division_trigger",), "full_chromosomes": config["chromosome_path"], "media_id": ("environment", "media_id"), "division_threshold": ("division_threshold",), } return composite
# Not used
[docs] def generate_processes(self, config): pass
[docs] def generate_topology(self, config): pass
[docs] class EcoliEngineProcess(Composer): """ Outer composer for use with :py:class:`~ecoli.processes.engine_process.EngineProcess. In addition to creating an EngineProcess containing an entire vEcoli simulation, the composite generated by this composer optionally includes the :py:class:`~ecoli.processes.environment.lysis.Lysis` Step when given a non-empty ``lysis_config``. """ defaults = { "agent_id": "0", "seed": 0, "tunnel_out_schemas": {}, "stub_schemas": {}, "parallel": False, "divide": False, "tunnels_in": tuple(), "emit_paths": tuple(), "start_time": 0, "experiment_id": "", "inner_emitter": "null", "inner_composer_config": {}, "lysis_config": {}, "inner_same_timestep": True, "division_threshold": True, }
[docs] def generate_processes(self, config): processes = {} if config["lysis_config"]: config["tunnels_in"] += (("bulk",),) config["tunnels_in"] += (("burst",),) config["lysis_config"]["agent_id"] = config["agent_id"] processes = {"lysis": Lysis(config["lysis_config"])} inner_composer_config = config.pop("inner_composer_config") cell_process_config = { "agent_id": config["agent_id"], "outer_composer": EcoliEngineProcess, "outer_composer_config": config, "inner_composer": EcoliInnerSim, "inner_composer_config": inner_composer_config, "tunnels_in": dict( {f'{"-".join(path)}_tunnel': path for path in config["tunnels_in"]} ), "emit_paths": config["emit_paths"], "tunnel_out_schemas": config["tunnel_out_schemas"], "stub_schemas": config["stub_schemas"], "seed": (config["seed"] + 1) % RAND_MAX, "inner_emitter": config["inner_emitter"], "divide": config["divide"], # Inner sim will set store at ('division_trigger',) # equal to True when time to divide "division_threshold": config["division_threshold"], "division_variable": ("division_trigger",), "_parallel": config["parallel"], "start_time": config["start_time"], "experiment_id": config["experiment_id"], "inner_same_timestep": config["inner_same_timestep"], } cell_process = EngineProcess(cell_process_config) processes.update({"cell_process": cell_process}) return processes
[docs] def generate_topology(self, config): topology = { "cell_process": { "agents": ("..",), "fields_tunnel": ("..", "..", "fields"), "dimensions_tunnel": ("..", "..", "dimensions"), }, } for path in config["tunnels_in"]: topology["cell_process"][f'{"-".join(path)}_tunnel'] = path if config["lysis_config"]: topology["lysis"] = { "trigger": ("burst",), "internal": ("bulk",), "agents": ("..",), "fields": ("..", "..", "fields"), "location": ("boundary", "location"), "dimensions": ("..", "..", "dimensions"), } return topology
[docs] def colony_save_states(engine, config): """ Runs the simulation while saving the states of the colony at specific timesteps to JSONs in the ``data`` folder. The saved files have names of the format ``{prefix}_seed_{seed}_colony_t{save time}.json`` if a prefix is specified via the ``colony_save_prefix`` configuration option and ``seed_{seed}_colony_t{save time}.json`` if not. """ for time in config["save_times"]: if time > config["total_time"]: raise ValueError( f'Config contains save_time ({time}) > total ' f'time ({config["total_time"]})' ) for i in range(len(config["save_times"])): if i == 0: time_to_next_save = config["save_times"][i] else: time_to_next_save = config["save_times"][i] - config["save_times"][i - 1] # Run engine to next save point engine.update(time_to_next_save) time_elapsed = config["save_times"][i] # Save the full state of the super-simulation state_to_save = engine.state.get_value(condition=not_a_process) # Get internal state from the EngineProcess sub-simulation for agent_id in state_to_save["agents"]: engine.state.get_path( ("agents", agent_id, "cell_process") ).value.send_command("get_inner_state") for agent_id in state_to_save["agents"]: cell_state = engine.state.get_path( ("agents", agent_id, "cell_process") ).value.get_command_result() # Can't save, but will be restored when loading state del cell_state["environment"]["exchange_data"] # Shared processes are re-initialized on load del cell_state["process"] # Save bulk and unique dtypes cell_state["bulk_dtypes"] = str(cell_state["bulk"].dtype) cell_state["unique_dtypes"] = {} for name, mols in cell_state["unique"].items(): cell_state["unique_dtypes"][name] = str(mols.dtype) state_to_save["agents"][agent_id] = cell_state state_to_save = serialize_value(state_to_save) if config.get("colony_save_prefix", None): write_json( "data/" + str(config["colony_save_prefix"]) + "_seed_" + str(config["seed"]) + "_colony_t" + str(time_elapsed) + ".json", state_to_save, ) else: write_json( "data/seed_" + str(config["seed"]) + "_colony_t" + str(time_elapsed) + ".json", state_to_save, ) # Cleanup namespace (significant with high cell counts) del state_to_save, cell_state print("Finished saving the state at t = " + str(time_elapsed)) gc.collect() # Finish running the simulation time_remaining = config["total_time"] - config["save_times"][-1] if time_remaining: engine.update(time_remaining)
[docs] def run_simulation(config): """ Main method for running colony simulations in accordance with dictionary of configuration options. """ tunnel_out_schemas = {} stub_schemas = {} if config["spatial_environment"]: spatial_config = config["spatial_environment_config"] # Generate environment composite. environment_composer = Lattice(spatial_config) environment_composite = environment_composer.generate() del environment_composer # Add field timeline process if timeline given if len(spatial_config["field_timeline"].get("timeline")) > 0: field_timeline = FieldTimeline(spatial_config["field_timeline"]) environment_composite.merge( processes={"field_timeline": field_timeline}, topology={ "field_timeline": tuplify_topology( spatial_config["field_timeline_topology"] ) }, ) del field_timeline diffusion_schema = environment_composite.processes[ "reaction_diffusion" ].get_schema() multibody_schema = environment_composite.processes["multibody"].get_schema() tunnel_out_schemas["fields_tunnel"] = diffusion_schema["fields"] tunnel_out_schemas["dimensions_tunnel"] = diffusion_schema["dimensions"] stub_schemas["diffusion"] = { ("boundary",): diffusion_schema["agents"]["*"]["boundary"], ("environment",): diffusion_schema["agents"]["*"]["environment"], } stub_schemas["multibody"] = { ("boundary",): multibody_schema["agents"]["*"]["boundary"], } del multibody_schema, diffusion_schema experiment_id = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S_%f%z") emitter_config = {"type": config["emitter"]} for key, value in config["emitter_arg"]: emitter_config[key] = value base_config = { "agent_id": config["agent_id"], "tunnel_out_schemas": tunnel_out_schemas, "stub_schemas": stub_schemas, "parallel": config["parallel"], "divide": config["divide"], "tunnels_in": ( ("environment",), ("boundary",), ), "emit_paths": tuple(tuple(path) for path in config["engine_process_reports"]), "seed": config["seed"], "experiment_id": experiment_id, "start_time": config.get("start_time", 0), "inner_composer_config": config.to_dict(), "lysis_config": config.get("lysis_config", {}), "inner_same_timestep": config.get("inner_same_timestep", True), } composite = {} if "initial_colony_file" in config.keys(): initial_state = get_state_from_file( path="data/" + config["initial_colony_file"] + ".json" ) agent_states = initial_state["agents"] for agent_id, agent_state in agent_states.items(): # Assume that initial colony file ends in string # '_t{save time}'. Use the save time and each # agent ID to give each agent a new random seed # to prevent unique ID clashes time_str = re.fullmatch( r".*_t([0-9]+)$", config["initial_colony_file"] ).group(1) seed = ( base_config["seed"] + int(float(time_str)) + int(agent_id, base=2) ) % RAND_MAX agent_path = ("agents", agent_id) division_threshold = agent_state.pop("division_threshold", None) agent_config = { "inner_composer_config": { "agent_id": agent_id, "seed": seed, "initial_state": agent_state, }, "agent_id": agent_id, "seed": seed, "inner_emitter": { **emitter_config, "embed_path": agent_path, }, "division_threshold": division_threshold, } agent_composer = EcoliEngineProcess(base_config) agent_composite = agent_composer.generate(agent_config, path=agent_path) if not composite: composite = agent_composite composite.processes["agents"][agent_id] = agent_composite.processes[ "agents" ][agent_id] composite.topology["agents"][agent_id] = agent_composite.topology["agents"][ agent_id ] initial_state = composite.initial_state() # Clean up namespace for garbage collector del ( agent_id, agent_state, agent_states, agent_path, agent_composer, agent_composite, base_config, agent_config, ) else: agent_config = {} if "initial_state_file" in config.keys(): agent_path = ("agents", config["agent_id"]) agent_config = { "inner_emitter": { **emitter_config, "embed_path": agent_path, }, } composer = EcoliEngineProcess(base_config) composite = composer.generate(agent_config, path=agent_path) initial_state = composite.initial_state() del agent_path, composer, agent_config, base_config if config["spatial_environment"]: # Merge a lattice composite for the spatial environment. initial_environment = environment_composite.initial_state() composite.merge(environment_composite) initial_state = deep_merge(initial_state, initial_environment) del environment_composite, initial_environment metadata = config.to_dict() metadata.pop("initial_state", None) metadata["git_hash"] = get_git_revision_hash() metadata["git_status"] = get_git_status() # Since unique numpy updater is an class method, internal # deepcopying in vivarium-core causes this warning to appear warnings.filterwarnings( "ignore", message="Incompatible schema " "assignment at .+ Trying to assign the value <bound method " "UniqueNumpyUpdater\.updater .+ to key updater, which already " "has the value <bound method UniqueNumpyUpdater\.updater", ) engine = Engine( processes=composite.processes, topology=composite.topology, initial_state=initial_state, experiment_id=experiment_id, emitter=emitter_config, progress_bar=config["progress_bar"], metadata=metadata, profile=config["profile"], initial_global_time=config.get("start_time", 0.0), ) # Unnecessary reference to initial_state engine.initial_state = None # Tidy up namespace and free memory del composite, initial_state, experiment_id, emitter_config gc.collect() # Save states while running if needed if config["save"]: colony_save_states(engine, config) else: engine.update(config["total_time"]) engine.end() if config["profile"]: report_profiling(engine.stats) return engine
def test_run_simulation(): # Clear the emitter's data in case it has been filled by another test. SharedRamEmitter.saved_data.clear() config = SimConfig() spatial_config_path = os.path.join(CONFIG_DIR_PATH, "spatial.json") config.update_from_json(spatial_config_path) config.update_from_dict( { "total_time": 30, "divide": True, "emitter": "shared_ram", "parallel": False, "engine_process_reports": [ ("listeners", "mass"), ], } ) engine = run_simulation(config) data = engine.emitter.get_data() assert min(data.keys()) == 0 assert max(data.keys()) == 30 assert np.all(np.array(data[0]["fields"]["GLC"]) == 1) assert np.any(np.array(data[29]["fields"]["GLC"]) != 1) mass_path = ("agents", "0", "listeners", "mass", "cell_mass") assert get_in(data[29], mass_path) > get_in(data[0], mass_path) if __name__ == "__main__": config = SimConfig() config.update_from_cli() run_simulation(config)