Source code for ecoli.processes.environment.lysis

"""
=====
Lysis
=====
"""

import os
import random
import numpy as np
from scipy import constants

from vivarium.core.process import Step, Process
from vivarium.core.composer import Composer
from vivarium.core.engine import Engine
from vivarium.library.units import units
from ecoli.processes.environment.multibody_physics import PI
from ecoli.processes.environment.local_field import LocalField
from ecoli.library.lattice_utils import (
    get_bin_site,
    get_bin_volume,
    count_to_concentration,
)
from ecoli.library.schema import bulk_name_to_idx, counts, numpy_schema


AVOGADRO = constants.N_A / units.mol


[docs] class Lysis(Step): name = "lysis" defaults = { "secreted_molecules": [], "bin_volume": 1e-6 * units.L, } def __init__(self, parameters=None): super().__init__(parameters) self.agent_id = self.parameters["agent_id"] self.bin_volume = self.parameters["bin_volume"] # Helper indices for Numpy indexing self.secreted_mol_idx = None
[docs] def ports_schema(self): fields_schema = { mol_id: { "_default": np.ones(1), } for mol_id in self.parameters["secreted_molecules"] } return { "trigger": {"_default": False}, "agents": {}, "internal": numpy_schema("bulk"), "fields": { **fields_schema, "_output": True, }, "location": { "_default": [0.5, 0.5] * units.um, }, "dimensions": { "bounds": { "_default": [1, 1] * units.um, }, "n_bins": { "_default": [1, 1], }, "depth": { "_default": 1 * units.um, }, }, }
[docs] def next_update(self, timestep, states): if self.secreted_mol_idx is None: self.secreted_mol_idx = { mol_name: bulk_name_to_idx(mol_name, states["internal"]["id"]) for mol_name in self.parameters["secreted_molecules"] } if states["trigger"]: location = states["location"] n_bins = states["dimensions"]["n_bins"] bounds = states["dimensions"]["bounds"] depth = states["dimensions"]["depth"] # get bin volume bin_site = get_bin_site(location, n_bins, bounds) bin_volume = get_bin_volume(n_bins, bounds, depth) # apply internal states to fields internal = states["internal"] delta_fields = {} for mol_id, mol_idx in self.secreted_mol_idx.items(): value = counts(internal, mol_idx) # delta concentration exchange = value concentration = count_to_concentration(exchange, bin_volume).to( units.mM ) delta_field = np.zeros((n_bins[0], n_bins[1]), dtype=np.float64) delta_field[bin_site[0], bin_site[1]] += concentration.to( units.mM ).magnitude delta_fields[mol_id] = {"_value": delta_field, "_updater": "accumulate"} # remove agent and apply delta to field return {"agents": {"_delete": [self.agent_id]}, "fields": delta_fields} return {}
[docs] def mass_from_count(count, mw): mol = count / AVOGADRO return mw * mol
[docs] class ToyTransportBurst(Process): """ Toy process for testing Lysis. Uptakes a molecule from a field, and triggers lysis. """ defaults = { "uptake_rate": {"GLC": 1}, "molecular_weights": {"GLC": 1 * units.fg / units.mol}, "burst_mass": 2000 * units.fg, } def __init__(self, parameters=None): super().__init__(parameters) self.molecules = list(self.parameters["uptake_rate"].keys()) # Helper indices for Numpy arrays self.molecule_idx = None
[docs] def ports_schema(self): return { "external": { key: { "_default": 0.0 * units.mM, "_emit": True, } for key in self.molecules }, "exchanges": { key: { "_default": 0.0, "_emit": True, } for key in self.molecules }, "internal": numpy_schema("bulk"), "mass": { "_default": 0.0 * units.fg, }, "length": { "_default": 0.0 * units.um, }, "burst_trigger": { "_default": False, "_updater": "set", "_emit": True, }, }
[docs] def next_update(self, timestep, states): if self.molecule_idx is None: self.molecule_idx = bulk_name_to_idx( self.molecules, states["internal"]["id"] ) added = np.zeros(len(self.molecules)) exchanged = {} added_mass = 0.0 * units.fg for i, (mol_id, e_state) in enumerate(states["external"].items()): exchange_concs = e_state * self.parameters["uptake_rate"][mol_id] # NOTE: This is not correct. We are just hacking this # together for testing. exchange_counts = exchange_concs.magnitude added[i] = exchange_counts exchanged[mol_id] = -1 * exchange_counts added_mass += mass_from_count( exchange_counts, self.parameters["molecular_weights"][mol_id] ) if states["mass"] + added_mass >= self.parameters["burst_mass"]: return {"burst_trigger": True} # extend length relative to mass added_length = added_mass * states["length"] / states["mass"] return { "internal": [(self.molecule_idx, added.astype(int))], "exchanges": exchanged, "mass": added_mass, "length": added_length, }
[docs] class LysisAgent(Composer): """ Agent that uptakes a molecule from a lattice environment, bursts upon reaching a set mass, and spills the molecules back into the environment """ defaults = { "lysis": {"secreted_molecules": ["GLC"]}, "transport_burst": { "uptake_rate": { "GLC": 2, }, "molecular_weights": { "GLC": 1e22 * units.fg / units.mol, }, "burst_mass": 2000 * units.fg, }, "local_field": {}, "boundary_path": ("boundary",), "fields_path": ("..", "..", "fields"), "dimensions_path": ( "..", "..", "dimensions", ), "agents_path": ( "..", "..", "agents", ), }
[docs] def generate_processes(self, config): return {"transport_burst": ToyTransportBurst(config["transport_burst"])}
[docs] def generate_steps(self, config): assert config["agent_id"] lysis_config = {"agent_id": config["agent_id"], **config["lysis"]} return { "local_field": LocalField(config["local_field"]), "lysis": Lysis(lysis_config), }
[docs] def generate_flow(self, config): return { "local_field": [], "lysis": [], }
[docs] def generate_topology(self, config): boundary_path = config["boundary_path"] fields_path = config["fields_path"] dimensions_path = config["dimensions_path"] agents_path = config["agents_path"] return { "transport_burst": { "internal": ("internal",), "exchanges": boundary_path + ("exchanges",), "external": boundary_path + ("external",), "mass": boundary_path + ("mass",), "length": boundary_path + ("length",), "burst_trigger": boundary_path + ("burst",), }, "local_field": { "exchanges": boundary_path + ("exchanges",), "location": boundary_path + ("location",), "fields": fields_path, "dimensions": dimensions_path, }, "lysis": { "trigger": boundary_path + ("burst",), "internal": ("internal",), "agents": agents_path, "fields": fields_path, "location": boundary_path + ("location",), "dimensions": dimensions_path, }, }
def test_lysis( n_cells=1, molecule_name="beta-lactam", total_time=60, emit_step=1, bounds=[25, 25] * units.um, n_bins=[5, 5], uptake_rate_max=25, return_data=False, ): from ecoli.composites.environment.lattice import Lattice lattice_composer = Lattice( { "reaction_diffusion": { "molecules": [molecule_name], "bounds": bounds, "n_bins": n_bins, "gradient": { "type": "uniform", "molecules": { molecule_name: 10.0 * units.mM, }, }, }, "multibody": { "bounds": bounds, }, } ) # initialize the composite with a lattice full_composite = lattice_composer.generate() # configure the agent composer agent_composer = LysisAgent( { "lysis": {"secreted_molecules": [molecule_name]}, "transport_burst": { "molecular_weights": { molecule_name: 1e22 * units.fg / units.mol, }, "burst_mass": 2000 * units.fg, }, } ) # make individual agents, with unique uptake rates agent_ids = [str(idx) for idx in range(n_cells)] for agent_id in agent_ids: uptake_rate = random.randrange(uptake_rate_max) agent_composite = agent_composer.generate( { "agent_id": agent_id, "transport_burst": { "uptake_rate": { molecule_name: uptake_rate, } }, } ) agent_path = ("agents", agent_id) full_composite.merge(composite=agent_composite, path=agent_path) # get initial state initial_state = full_composite.initial_state() initial_state["agents"] = {} for agent_id in agent_ids: agent_angle = random.uniform(0, 2 * PI) initial_state["agents"][agent_id] = { "boundary": {"angle": agent_angle}, "internal": np.array( [ ("beta-lactam", 0), ("hydrolyzed-beta-lactam", 0), ("EG10040-MONOMER[p]", 0), ], dtype=[("id", "U40"), ("count", int)], ), } # run the simulation and return the data sim = Engine( processes=full_composite.processes, steps=full_composite.steps, topology=full_composite.topology, flow=full_composite.flow, initial_state=initial_state, emit_step=emit_step, ) sim.update(total_time) data = sim.emitter.get_data_unitless() if return_data: return data
[docs] def main(): from ecoli.analysis.colony.snapshots import ( plot_snapshots, format_snapshot_data, make_video, ) bounds = [15, 15] * units.um molecule_name = "beta-lactam" data = test_lysis( n_cells=8, molecule_name=molecule_name, total_time=1000, emit_step=10, bounds=bounds, n_bins=[11, 11], return_data=True, ) # format the data for plot_snapshots agents, fields = format_snapshot_data(data) out_dir = os.path.join("out", "experiments", "lysis") os.makedirs(out_dir, exist_ok=True) plot_snapshots( bounds, agents=agents, fields=fields, n_snapshots=5, out_dir=out_dir, filename="lysis_snapshots", ) # make snapshot video make_video( data, bounds, plot_type="fields", out_dir=out_dir, filename="lysis_video", )
# uv run ecoli/processes/environment/lysis.py if __name__ == "__main__": main()