ecoli.processes.engine_process

EngineProcess

Tunnel Ports

Sometimes, a process inside the EngineProcess might need to be wired to a store outside the EngineProcess, or an outside process might need to be wired to an inside store. We handle this with _tunnels_.

Here is a state hierarchy showing how a tunnel connects an outside process (A) to an inside store (store1). We call this a “tunnel in” because the exterior process is tunneling into EngineProcess to see an internal store. Essentially, EngineProcess has a port for the tunnel in that connects to some store in the outer simulation here called dummy_store. The process A connects to that same store. When the next_update method of EngineProcess runs, the value of store1 in the inner simulation is read and copied through the tunnel in port to dummy_store, where it can be read by process A. Conversely, if process A modifies the value of dummy_store, the next_update method of EngineProcess ensures that store1 reflects that change the next time it is run. It is the user’s responsibility to define the topology of EngineProcess such that the tunnel in port points to the store where the outer simulation process (here, process A) expects the data to be (here, dummy_store).

         ________________
        /      |         \    EngineProcess
dummy_store  +---+    +-------------------------+
    :  :.....| A |    |                         |
    :        +---+    |        /\               |
    :                 |       /  \              |
    :                 |  +---+    store1        |
    :                 |  | B |       ^          |
    :                 |  +---+       |          |
    :                 |              |          |
    :.............tunnel_in <-- next_update     |
                      |                         |
                      +-------------------------+

Here is another example where a tunnel connects an inside process (B) to an outside store (store2). We call this a “tunnel out” because the interior process is tunneling outside of EngineProcess to see an external store. In this case, EngineProcess has a port for the tunnel out that connects to store2 in the outer simulation. When the next_update method of EngineProcess is called, the value of store2 is read and copied to a store in the inner simulation (here, dummy_store) where it is read by process B. Unlike tunnels in, all tunnels out are automatically created and wired because EngineProcess only needs to have the complete inner simulation topology to know exactly what inner process ports connect to stores in the outer simulation and what paths those outer simulation stores have.

      /\
     /  \        EngineProcess
    /    +-------------------------+
 store2  |                         |
   :     |        /\               |
   :     |       /  \              |
   :     |  +---+    dummy_store   |
...:     |  | B |.......:    ^     |
:        |  +---+            |     |
:        |                   |     |
:..tunnel_out <----- next_update   |
         |                         |
         +-------------------------+

In these diagrams, processes are boxes, stores are labeled nodes in the tree, solid lines show the state hierarchy, dotted lines show the topology wiring, and tunnel_out/tunnel_in are ports for EngineProcess.

These tunnels are the only way that the EngineProcess exchanges information with the outside simulation.

class ecoli.processes.engine_process.EngineProcess(parameters=None)[source]

Bases: Process

Process that wraps a Vivarium simulation by taking the following options in its parameter dictionary:

  • inner_composer: a composer for the inner simulation

  • inner_composer_config: a configuration dictionary for that composer

  • outer_composer: a composer that can be used upon division to create a new EngineProcess (including a new inner simulation using the inner composer) plus as any other processes that make up a single simulation

  • outer_composer_config: a configuration dictionary for that composer

  • tunnels_in: a mapping from port names to paths of stores in the inner simulation that processes in the outer simulation would like access to. EcoliEngineProcess has a custom generate_topology() method to ensure that each port name is wired to a user-specified store in the outer simulation. Then, every time the EngineProcess is run, the value from the outer simulation store is copied to the store located at the path in the inner simulation, the inner simulation is incremented, and the final value of the store in the inner simulation is copied to the store that the port points to in the outer simulation. This allows outer simulation processes to indirectly read and modify the value of stores in the inner simulation.

  • stub_schemas: a mapping from process names to a mapping from paths in the inner simulation to the schema to use for the store at that path. See SchemaStub for more details.

  • tunnel_out_schemas: a mapping from names of ports for tunnels out to schemas to use for the stores that those ports point to. Helpful for ensuring consistency in schemas specified for these stores.

These options allow EngineProcess to create a full inner simulation, increment it in sync with the process time step and the rest of the outer simulation, and read/modify values in both the inner and outer simulations over time.

calculate_timestep(states)[source]

The time step for EngineProcess is always set to be the shortest time step for an inner simulation process. This ensures that we never accidentally skip over a scheduled update for a Process/Step.

create_emitter()[source]

Since EngineProcesses are often parallelized with multiprocessing, we wait to create an emitter for each EngineProcess instance until multiprocessing is initiated to avoid unusual locking or other issues.

This is the first thing called by next_update() the first time it is run.

defaults: Dict[str, Any] = {'agent_id': '0', 'divide': False, 'division_threshold': None, 'division_variable': None, 'emit_paths': (), 'experiment_id': '', 'inner_composer': None, 'inner_composer_config': {}, 'inner_emitter': 'null', 'inner_same_timestep': False, 'outer_composer': None, 'outer_composer_config': {}, 'seed': 0, 'start_time': 0, 'stub_schemas': {}, 'tunnel_out_schemas': {}, 'tunnels_in': {}}
initial_state(config=None)[source]

To ensure that the stores pointed to by the tunnel in ports accurately reflect the values of the stores they tunnel to in the inner simulation, we define a custom initial_state() method for EngineProcess that populates those stores at the start of the simulation with their corresponding inner simulation store values.

next_update(timestep, states)[source]

Update the inner simulation store values for tunnels in and out then emit the inner simulation state. We emit at the start of next_update() because the inner simulation is in-sync with the outer simulation only after the stores in the internal state that tunnel out have been synchronized with their corresponding stores in the outer simulation.

Increment the inner simulation by the shortest time possible to guarantee at least one Process/Step ran. If division is enabled, check to see if the division variable has reached the threshold. If so, divide the cell state into two daughter states and create two new inner simulations using the inner composer configuration, the outer composer, and the outer composer configuration.

Finally, figure out what updates to apply to the outer simulation stores for tunnels in/out in order to mutate them the same way that their corresponding stores in the inner simulation were mutated (if at all) over the course of the increment in inner simulation time.

ports_schema()[source]
send_command(command, args=None, kwargs=None, run_pre_check=True)[source]

Override to handle special command ‘get_inner_state’ which lets engine process pull out a dictionary containing the entire inner simulation state.

Return type:

None

class ecoli.processes.engine_process.SchemaStub(parameters=None)[source]

Bases: Step

Stub process for providing schemas to an inner simulation.

When using ecoli.processes.engine_process.EngineProcess, there may be processes in the outer simulation whose schemas you are expecting to affect variables in the inner simulation. You can include this stub process in your inner simulation to provide those schemas from the outer simulation.

The process takes a single parameter, ports_schema, whose value is the process’s ports schema to be provided to the inner simulation.

We run these as Steps otherwise they could influence when other Steps run. For example, if a SchemaStub was a Process with timestep 1 while all other Processes had timestep 2, Steps like mass listener would run after every second instead of every 2 seconds, which may not be desired.

Parameters:

parameters (dict | None)

defaults: dict[str, Any] = {'ports_schema': {}}
next_update(timestep, states)[source]
ports_schema()[source]
class ecoli.processes.engine_process._InnerComposer(config=None)[source]

Bases: Composer

Base class for composer classes.

Composers generate composites.

All composer classes must inherit from this class.

Parameters:

config (dict | None) – Dictionary of configuration options that can override the class defaults.

generate_processes(config)[source]
generate_topology(config)[source]
class ecoli.processes.engine_process._OuterComposer(config=None)[source]

Bases: Composer

Base class for composer classes.

Composers generate composites.

All composer classes must inherit from this class.

Parameters:

config (dict | None) – Dictionary of configuration options that can override the class defaults.

generate_processes(config)[source]
generate_topology(config)[source]
class ecoli.processes.engine_process._ProcA(parameters=None)[source]

Bases: Process

Parameters:

parameters (dict | None)

next_update(timestep, states)[source]

Each timestep, port_a += port_c.

ports_schema()[source]
class ecoli.processes.engine_process._ProcB(parameters=None)[source]

Bases: Process

Parameters:

parameters (dict | None)

next_update(timestep, states)[source]

Each timestep, port_b += 1.

ports_schema()[source]
class ecoli.processes.engine_process._ProcC(parameters=None)[source]

Bases: Process

Parameters:

parameters (dict | None)

next_update(timestep, states)[source]

Each timestep, port_c += port_b.

ports_schema()[source]
ecoli.processes.engine_process._get_path_net_depth(path)[source]

Resolve a tuple path to figure out its depth, subtracting one for every “..” encountered.

Parameters:

path (tuple[str])

Return type:

int

ecoli.processes.engine_process._inverse_update(initial_state, final_state, store, updater_registry_reverse)[source]

Given a dictionary containing the current values contained inside a potentially nested store and a dictionary containing the final values inside that same store, calculate an update that can be passed such that calling the updater of said store with the calculated update causes the values in the store to change from their current values to the provided final values.

Parameters:
  • initial_state (Any) – Current values (potentially nested) in store

  • final_state (Any) – Final values (potentially nested) in store that we desire

  • store (Store) – Store (potentially nested) that we are trying to mutate

  • updater_registry_reverse (dict[Callable, str]) – A mapping from updater functions to the string names they are registered as in updater_registry

Returns:

Update dictionary that when used to update store by calling its (or its sub-stores) updaters, causes its values to change from initial_state to final_state

ecoli.processes.engine_process.cap_tunneling_paths(topology, outer=())[source]

For ports in the inner simulation that point to stores outside, this function caps those stores to point to a dummy store that is populated with the value of the actual store in the outer simulation in the next_update() method of EngineProcess.

Parameters:
  • topology (dict[str, Any]) – Topology of inner simulation in EngineProcess. Mutated in place such that ports pointing to stores located outside the inner simulation are capped and instead point to top-level stores called “port_name_tunnel”

  • outer (tuple[str, ...]) – Current port path inside topology (for recursively travelling topology with this function)

Returns:

Mapping from paths to ports that were capped as described above to the names of the new top-level stores they point to.

Return type:

dict[tuple[str, …], str]