"""
Composite for simulations with EngineProcess cells in an environment.
.. note::
This composite requires a config with the spatial environment
enabled.
"""
from datetime import datetime, timezone
import os
import re
import gc
import warnings
import numpy as np
from vivarium.core.composer import Composer, Composite
from vivarium.core.emitter import SharedRamEmitter
from vivarium.core.engine import Engine
from vivarium.core.serialize import serialize_value
from vivarium.library.dict_utils import deep_merge
from vivarium.library.topology import get_in
from ecoli.experiments.ecoli_master_sim import (
EcoliSim,
SimConfig,
get_git_revision_hash,
get_git_status,
report_profiling,
tuplify_topology,
)
from ecoli.library.logging_tools import write_json
from ecoli.library.sim_data import RAND_MAX
from ecoli.library.schema import not_a_process
from ecoli.library.json_state import get_state_from_file
from ecoli.processes.engine_process import EngineProcess
from ecoli.processes.environment.field_timeline import FieldTimeline
from ecoli.processes.environment.lysis import Lysis
from ecoli.processes.division_detector import DivisionDetector
from ecoli.composites.environment.lattice import Lattice
from ecoli.composites.ecoli_configs import CONFIG_DIR_PATH
[docs]
class EcoliInnerWrapper(Composite):
"""Helper class to ensure calling initial_state() on the composite
generated by EcoliInnerSim returns the cached initial state and removes
the internal reference to this state."""
[docs]
def initial_state(self, config):
initial_state = self.cached_state
self.cached_state = None
return initial_state
[docs]
class EcoliInnerSim(Composer):
"""Inner composer to be used with
:py:class:`~ecoli.processes.engine_process.EngineProcess."""
defaults = {
"agent_id": "0",
"seed": 0,
"division_threshold": None,
"division_variable": None,
"initial_state": None,
"chromosome_path": None,
}
[docs]
def generate(self, config=None):
"""
Generates a complete E. coli simulation composite to be used
as the inner simulation in an EngineProcess.
This requires caching the initial state generated in the course
of calling :py:meth:`~ecoli.experiments.ecoli_master_sim.EcoliSim.build_ecoli`
and wrapping the returned composite using :py:class:`~.EcoliInnerWrapper`
to ensure that the cached initial state is returned when the
``initial_state`` method is called on the composite.
Additionally, this requires a special
:py:class:`~ecoli.processes.division_detector.DivisionDetector`
Step to set the store located at ``("division_trigger",)`` to
``True`` when it is time to divide. This is necessary because
the actual logic of division is included in the
:py:meth:`~ecoli.processes.engine_process.EngineProcess.next_update`
method of EngineProcess, making that part of the normal
:py:class:`~ecoli.processes.cell_division.Division` process
obsolete but not the part that signals the time for division
(whether that be via a D period or mass threshold mechanism).
The :py:class:`~ecoli.processes.division_detector.DivisionDetector`
process takes over that role while relinquishing control over
the actual division to EngineProcess.
"""
if config is None:
config = self.config
ecoli_sim = EcoliSim(
{
**self.config,
**config,
# Division is handled by EngineProcess but we want to have
# Ecoli composite create Division process so we can swap it
# with DivisionDetector
"divide": True,
"spatial_environment": False,
}
)
ecoli_sim.build_ecoli()
composite = ecoli_sim.ecoli
# Composite embedded under ('agents', agent_id) when 'divide' is True
composite = {
key: val["agents"][config["agent_id"]]
for key, val in composite.items()
if key not in ["state", "_schema"]
}
# Cache initial state so wrapping with EcoliInnerWrapper
# causes initial_state() method to return cached state
composite["cached_state"] = ecoli_sim.generated_initial_state["agents"][
config["agent_id"]
]
composite = EcoliInnerWrapper(composite)
# DivisionDetector tells EngineProcess when to divide
division_step = composite["steps"]["division"]
div_detector_config = {
"division_threshold": config["division_threshold"],
"dry_mass_inc_dict": division_step.dry_mass_inc_dict,
"division_mass_multiplier": division_step.division_mass_multiplier,
}
composite["steps"]["division"] = DivisionDetector(div_detector_config)
composite["topology"]["division"] = {
"division_variable": config["division_variable"],
"division_trigger": ("division_trigger",),
"full_chromosomes": config["chromosome_path"],
"media_id": ("environment", "media_id"),
"division_threshold": ("division_threshold",),
}
return composite
# Not used
[docs]
def generate_processes(self, config):
pass
[docs]
def generate_topology(self, config):
pass
[docs]
class EcoliEngineProcess(Composer):
"""
Outer composer for use with
:py:class:`~ecoli.processes.engine_process.EngineProcess.
In addition to creating an EngineProcess containing an entire
vEcoli simulation, the composite generated by this composer optionally
includes the :py:class:`~ecoli.processes.environment.lysis.Lysis`
Step when given a non-empty ``lysis_config``.
"""
defaults = {
"agent_id": "0",
"seed": 0,
"tunnel_out_schemas": {},
"stub_schemas": {},
"parallel": False,
"divide": False,
"tunnels_in": tuple(),
"emit_paths": tuple(),
"start_time": 0,
"experiment_id": "",
"inner_emitter": "null",
"inner_composer_config": {},
"lysis_config": {},
"inner_same_timestep": True,
"division_threshold": True,
}
[docs]
def generate_processes(self, config):
processes = {}
if config["lysis_config"]:
config["tunnels_in"] += (("bulk",),)
config["tunnels_in"] += (("burst",),)
config["lysis_config"]["agent_id"] = config["agent_id"]
processes = {"lysis": Lysis(config["lysis_config"])}
inner_composer_config = config.pop("inner_composer_config")
cell_process_config = {
"agent_id": config["agent_id"],
"outer_composer": EcoliEngineProcess,
"outer_composer_config": config,
"inner_composer": EcoliInnerSim,
"inner_composer_config": inner_composer_config,
"tunnels_in": dict(
{f'{"-".join(path)}_tunnel': path for path in config["tunnels_in"]}
),
"emit_paths": config["emit_paths"],
"tunnel_out_schemas": config["tunnel_out_schemas"],
"stub_schemas": config["stub_schemas"],
"seed": (config["seed"] + 1) % RAND_MAX,
"inner_emitter": config["inner_emitter"],
"divide": config["divide"],
# Inner sim will set store at ('division_trigger',)
# equal to True when time to divide
"division_threshold": config["division_threshold"],
"division_variable": ("division_trigger",),
"_parallel": config["parallel"],
"start_time": config["start_time"],
"experiment_id": config["experiment_id"],
"inner_same_timestep": config["inner_same_timestep"],
}
cell_process = EngineProcess(cell_process_config)
processes.update({"cell_process": cell_process})
return processes
[docs]
def generate_topology(self, config):
topology = {
"cell_process": {
"agents": ("..",),
"fields_tunnel": ("..", "..", "fields"),
"dimensions_tunnel": ("..", "..", "dimensions"),
},
}
for path in config["tunnels_in"]:
topology["cell_process"][f'{"-".join(path)}_tunnel'] = path
if config["lysis_config"]:
topology["lysis"] = {
"trigger": ("burst",),
"internal": ("bulk",),
"agents": ("..",),
"fields": ("..", "..", "fields"),
"location": ("boundary", "location"),
"dimensions": ("..", "..", "dimensions"),
}
return topology
[docs]
def colony_save_states(engine, config):
"""
Runs the simulation while saving the states of the colony at specific
timesteps to JSONs in the ``data`` folder. The saved files have names
of the format ``{prefix}_seed_{seed}_colony_t{save time}.json``
if a prefix is specified via the ``colony_save_prefix`` configuration
option and ``seed_{seed}_colony_t{save time}.json`` if not.
"""
for time in config["save_times"]:
if time > config["total_time"]:
raise ValueError(
f'Config contains save_time ({time}) > total '
f'time ({config["total_time"]})'
)
for i in range(len(config["save_times"])):
if i == 0:
time_to_next_save = config["save_times"][i]
else:
time_to_next_save = config["save_times"][i] - config["save_times"][i - 1]
# Run engine to next save point
engine.update(time_to_next_save)
time_elapsed = config["save_times"][i]
# Save the full state of the super-simulation
state_to_save = engine.state.get_value(condition=not_a_process)
# Get internal state from the EngineProcess sub-simulation
for agent_id in state_to_save["agents"]:
engine.state.get_path(
("agents", agent_id, "cell_process")
).value.send_command("get_inner_state")
for agent_id in state_to_save["agents"]:
cell_state = engine.state.get_path(
("agents", agent_id, "cell_process")
).value.get_command_result()
# Can't save, but will be restored when loading state
del cell_state["environment"]["exchange_data"]
# Shared processes are re-initialized on load
del cell_state["process"]
# Save bulk and unique dtypes
cell_state["bulk_dtypes"] = str(cell_state["bulk"].dtype)
cell_state["unique_dtypes"] = {}
for name, mols in cell_state["unique"].items():
cell_state["unique_dtypes"][name] = str(mols.dtype)
state_to_save["agents"][agent_id] = cell_state
state_to_save = serialize_value(state_to_save)
if config.get("colony_save_prefix", None):
write_json(
"data/"
+ str(config["colony_save_prefix"])
+ "_seed_"
+ str(config["seed"])
+ "_colony_t"
+ str(time_elapsed)
+ ".json",
state_to_save,
)
else:
write_json(
"data/seed_"
+ str(config["seed"])
+ "_colony_t"
+ str(time_elapsed)
+ ".json",
state_to_save,
)
# Cleanup namespace (significant with high cell counts)
del state_to_save, cell_state
print("Finished saving the state at t = " + str(time_elapsed))
gc.collect()
# Finish running the simulation
time_remaining = config["total_time"] - config["save_times"][-1]
if time_remaining:
engine.update(time_remaining)
[docs]
def run_simulation(config):
"""
Main method for running colony simulations in
accordance with dictionary of configuration options.
"""
tunnel_out_schemas = {}
stub_schemas = {}
if config["spatial_environment"]:
spatial_config = config["spatial_environment_config"]
# Generate environment composite.
environment_composer = Lattice(spatial_config)
environment_composite = environment_composer.generate()
del environment_composer
# Add field timeline process if timeline given
if len(spatial_config["field_timeline"].get("timeline")) > 0:
field_timeline = FieldTimeline(spatial_config["field_timeline"])
environment_composite.merge(
processes={"field_timeline": field_timeline},
topology={
"field_timeline": tuplify_topology(
spatial_config["field_timeline_topology"]
)
},
)
del field_timeline
diffusion_schema = environment_composite.processes[
"reaction_diffusion"
].get_schema()
multibody_schema = environment_composite.processes["multibody"].get_schema()
tunnel_out_schemas["fields_tunnel"] = diffusion_schema["fields"]
tunnel_out_schemas["dimensions_tunnel"] = diffusion_schema["dimensions"]
stub_schemas["diffusion"] = {
("boundary",): diffusion_schema["agents"]["*"]["boundary"],
("environment",): diffusion_schema["agents"]["*"]["environment"],
}
stub_schemas["multibody"] = {
("boundary",): multibody_schema["agents"]["*"]["boundary"],
}
del multibody_schema, diffusion_schema
experiment_id = datetime.now(timezone.utc).strftime("%Y-%m-%d_%H-%M-%S_%f%z")
emitter_config = {"type": config["emitter"]}
for key, value in config["emitter_arg"].items():
emitter_config[key] = value
base_config = {
"agent_id": config["agent_id"],
"tunnel_out_schemas": tunnel_out_schemas,
"stub_schemas": stub_schemas,
"parallel": config["parallel"],
"divide": config["divide"],
"tunnels_in": (
("environment",),
("boundary",),
),
"emit_paths": tuple(tuple(path) for path in config["engine_process_reports"]),
"seed": config["seed"],
"experiment_id": experiment_id,
"start_time": config.get("start_time", 0),
"inner_composer_config": config.to_dict(),
"lysis_config": config.get("lysis_config", {}),
"inner_same_timestep": config.get("inner_same_timestep", True),
}
composite = {}
if "initial_colony_file" in config.keys():
initial_state = get_state_from_file(
path="data/" + config["initial_colony_file"] + ".json"
)
agent_states = initial_state["agents"]
for agent_id, agent_state in agent_states.items():
# Assume that initial colony file ends in string
# '_t{save time}'. Use the save time and each
# agent ID to give each agent a new random seed
# to prevent unique ID clashes
time_str = re.fullmatch(
r".*_t([0-9]+)$", config["initial_colony_file"]
).group(1)
seed = (
base_config["seed"] + int(float(time_str)) + int(agent_id, base=2)
) % RAND_MAX
agent_path = ("agents", agent_id)
division_threshold = agent_state.pop("division_threshold", None)
agent_config = {
"inner_composer_config": {
"agent_id": agent_id,
"seed": seed,
"initial_state": agent_state,
},
"agent_id": agent_id,
"seed": seed,
"inner_emitter": {
**emitter_config,
"embed_path": agent_path,
},
"division_threshold": division_threshold,
}
agent_composer = EcoliEngineProcess(base_config)
agent_composite = agent_composer.generate(agent_config, path=agent_path)
if not composite:
composite = agent_composite
composite.processes["agents"][agent_id] = agent_composite.processes[
"agents"
][agent_id]
composite.topology["agents"][agent_id] = agent_composite.topology["agents"][
agent_id
]
initial_state = composite.initial_state()
# Clean up namespace for garbage collector
del (
agent_id,
agent_state,
agent_states,
agent_path,
agent_composer,
agent_composite,
base_config,
agent_config,
)
else:
agent_config = {}
if "initial_state_file" in config.keys():
agent_path = ("agents", config["agent_id"])
agent_config = {
"inner_emitter": {
**emitter_config,
"embed_path": agent_path,
},
}
composer = EcoliEngineProcess(base_config)
composite = composer.generate(agent_config, path=agent_path)
initial_state = composite.initial_state()
del agent_path, composer, agent_config, base_config
if config["spatial_environment"]:
# Merge a lattice composite for the spatial environment.
initial_environment = environment_composite.initial_state()
composite.merge(environment_composite)
initial_state = deep_merge(initial_state, initial_environment)
del environment_composite, initial_environment
metadata = config.to_dict()
metadata.pop("initial_state", None)
metadata["git_hash"] = get_git_revision_hash()
metadata["git_status"] = get_git_status()
# Since unique numpy updater is an class method, internal
# deepcopying in vivarium-core causes this warning to appear
warnings.filterwarnings(
"ignore",
message="Incompatible schema "
"assignment at .+ Trying to assign the value <bound method "
"UniqueNumpyUpdater\.updater .+ to key updater, which already "
"has the value <bound method UniqueNumpyUpdater\.updater",
)
engine = Engine(
processes=composite.processes,
topology=composite.topology,
initial_state=initial_state,
experiment_id=experiment_id,
emitter=emitter_config,
progress_bar=config["progress_bar"],
metadata=metadata,
profile=config["profile"],
initial_global_time=config.get("start_time", 0.0),
)
# Unnecessary reference to initial_state
engine.initial_state = None
# Tidy up namespace and free memory
del composite, initial_state, experiment_id, emitter_config
gc.collect()
# Save states while running if needed
if config["save"]:
colony_save_states(engine, config)
else:
engine.update(config["total_time"])
engine.end()
if config["profile"]:
report_profiling(engine.stats)
return engine
def test_run_simulation():
# Clear the emitter's data in case it has been filled by another test.
SharedRamEmitter.saved_data.clear()
config = SimConfig()
spatial_config_path = os.path.join(CONFIG_DIR_PATH, "spatial.json")
config.update_from_json(spatial_config_path)
config.update_from_dict(
{
"total_time": 30,
"divide": True,
"emitter": "shared_ram",
"parallel": False,
"engine_process_reports": [
("listeners", "mass"),
],
}
)
engine = run_simulation(config)
data = engine.emitter.get_data()
assert min(data.keys()) == 0
assert max(data.keys()) == 30
assert np.all(np.array(data[0]["fields"]["GLC"]) == 1)
assert np.any(np.array(data[29]["fields"]["GLC"]) != 1)
mass_path = ("agents", "0", "listeners", "mass", "cell_mass")
assert get_in(data[29], mass_path) > get_in(data[0], mass_path)
if __name__ == "__main__":
config = SimConfig()
config.update_from_cli()
run_simulation(config)