Source code for ecoli.processes.environment.multibody_physics

"""
==========================
Multibody physics process
==========================
"""

import os

import random
import math

import numpy as np

import matplotlib.pyplot as plt
import matplotlib.patches as patches

# vivarium imports
from vivarium.library.units import units, remove_units
from vivarium.core.process import Process
from vivarium.core.composition import (
    process_in_experiment,
    simulate_experiment,
    PROCESS_OUT_DIR,
)

# vivarium-cell imports
from ecoli.processes.environment.derive_globals import volume_from_length
from ecoli.library.pymunk_multibody import PymunkMultibody
from ecoli.analysis.colony.snapshots import (
    plot_snapshots,
    format_snapshot_data,
)


NAME = "multibody"

DEFAULT_BOUNDS = [40, 40] * units.um
DEFAULT_LENGTH_UNIT = units.um
DEFAULT_MASS_UNIT = units.fg
DEFAULT_VELOCITY_UNIT = units.um / units.s
DEFAULT_VOLUME_UNIT = DEFAULT_LENGTH_UNIT**3

# constants
PI = math.pi


[docs] def random_body_position(body): # pick a random point along the boundary width, length = body.dimensions if random.randint(0, 1) == 0: # force along ends if random.randint(0, 1) == 0: # force on the left end location = (random.uniform(0, width), 0) else: # force on the right end location = (random.uniform(0, width), length) else: # force along length if random.randint(0, 1) == 0: # force on the bottom end location = (0, random.uniform(0, length)) else: # force on the top end location = (width, random.uniform(0, length)) return location
[docs] def daughter_locations(value, state): parent_length = state["length"] parent_angle = state["angle"] pos_ratios = [-0.25, 0.25] daughter_locations = [] for daughter in range(2): dx = parent_length * pos_ratios[daughter] * math.cos(parent_angle) dy = parent_length * pos_ratios[daughter] * math.sin(parent_angle) location = [value[0] + dx, value[1] + dy] daughter_locations.append(location) return daughter_locations
[docs] class Multibody(Process): """Simulates collisions and forces between agent bodies with a multi-body physics engine. :term:`Ports`: * ``agents``: The store containing all agent sub-compartments. Each agent in this store has values for location, angle, length, width, mass, thrust, and torque. Arguments: initial_parameters(dict): Accepts the following configuration keys: jitter_force: force applied to random positions along agent bodies to mimic thermal fluctuations. Produces Brownian motion. agent_shape (:py:class:`str`): agents can take the shapes ``rectangle``, ``segment``, or ``circle``. bounds (:py:class:`list`): size of the environment in micrometers, with ``[x, y]``. mother_machine (:py:class:`bool`): if set to ``True``, mother machine barriers are introduced. animate (:py:class:`bool`): interactive matplotlib option to animate multibody. To run with animation turned on set True, and use the TKAgg matplotlib backend: .. code-block:: console $ MPLBACKEND=TKAgg uv run vivarium/processes/snapshots.py Notes: * rotational diffusion in liquid medium with viscosity = 1 mPa.s: :math:`Dr = 3.5 \\pm0.3 rad^{2}/s` (Saragosti, et al. 2012. Modeling E. coli tumbles by rotational diffusion.) * translational diffusion in liquid medium with viscosity = 1 mPa.s: :math:`Dt = 100 um^{2}/s` (Saragosti, et al. 2012. Modeling E. coli tumbles by rotational diffusion.) """ name = NAME defaults = { "jitter_force": 1e-4, # pN "agent_shape": "segment", "bounds": DEFAULT_BOUNDS, "length_unit": DEFAULT_LENGTH_UNIT, "mass_unit": DEFAULT_MASS_UNIT, "velocity_unit": DEFAULT_VELOCITY_UNIT, "boundary_key": "boundary", "mother_machine": False, "animate": False, "seed": 0, } def __init__(self, parameters=None): super().__init__(parameters) # multibody parameters jitter_force = self.parameters["jitter_force"] self.agent_shape = self.parameters["agent_shape"] self.bounds = self.parameters["bounds"] self.mother_machine = self.parameters["mother_machine"] # units self.length_unit = self.parameters["length_unit"] self.mass_unit = self.parameters["mass_unit"] self.velocity_unit = self.parameters["velocity_unit"] # make the multibody object if self.mother_machine: assert isinstance(self.mother_machine, dict), ( "mother_machine must be a dictionary with keys " "spacer_thickness, channel_height, channel_space" ) multibody_config = { "agent_shape": self.agent_shape, "jitter_force": jitter_force, "bounds": remove_units(self.bounds), "barriers": self.mother_machine, "physics_dt": self.parameters["timestep"] / 10, "seed": self.parameters["seed"], } self.physics = PymunkMultibody(multibody_config) # interactive plot for visualization self.animate = self.parameters["animate"] if self.animate: plt.ion() self.ax = plt.gca() self.ax.set_aspect("equal")
[docs] def ports_schema(self): glob_schema = { "*": { self.parameters["boundary_key"]: { "location": { "_emit": True, "_default": [0.5 * bound for bound in self.bounds], "_updater": "set", "_divider": { "divider": daughter_locations, "topology": { "length": ( "..", "length", ), "angle": ( "..", "angle", ), }, }, }, "length": {"_emit": True, "_default": 2.0 * units.um}, "width": {"_emit": True, "_default": 1.0 * units.um}, "angle": { "_emit": True, "_default": 0.0, "_updater": "set", }, "mass": { "_emit": True, "_default": 1339 * units.fg, }, "thrust": { "_default": 0.0, "_updater": "set", }, "torque": { "_default": 0.0, "_updater": "set", }, } } } schema = {"agents": glob_schema} return schema
[docs] def next_update(self, timestep, states): agents = states["agents"] # animate before update agents = remove_units(agents) if self.animate: self.animate_frame(agents) # update multibody with new agents self.physics.update_bodies(agents) # run simulation self.physics.run(timestep) # get new agent positions agent_positions = self.physics.get_body_positions() update = {"agents": agent_positions} # for mother machine configurations, remove agents above the channel height if self.mother_machine: channel_height = self.mother_machine["channel_height"] delete_agents = [] for agent_id, position in agent_positions.items(): location = position["boundary"]["location"] y_loc = location[1] if y_loc > channel_height: # cell has moved past the channels delete_agents.append(agent_id) if delete_agents: update["agents"] = { agent_id: position for agent_id, position in agent_positions.items() if agent_id not in delete_agents } update["agents"]["_delete"] = [agent_id for agent_id in delete_agents] for agent in update["agents"].values(): agent["boundary"]["location"] *= units.um return update
## matplotlib interactive plot
[docs] def animate_frame(self, agents): plt.cla() for agent_id, data in agents.items(): # location, orientation, length data = data["boundary"] x_center = data["location"][0] y_center = data["location"][1] angle = data["angle"] / PI * 180 + 90 # rotate 90 degrees to match field length = data["length"] width = data["width"] # get bottom left position x_offset = width / 2 y_offset = length / 2 theta_rad = math.radians(angle) dx = x_offset * math.cos(theta_rad) - y_offset * math.sin(theta_rad) dy = x_offset * math.sin(theta_rad) + y_offset * math.cos(theta_rad) x = x_center - dx y = y_center - dy if self.agent_shape == "rectangle" or self.agent_shape == "segment": # Create a rectangle rect = patches.Rectangle( (x, y), width, length, angle=angle, linewidth=1, edgecolor="b" ) self.ax.add_patch(rect) elif self.agent_shape == "circle": # Create a circle circle = patches.Circle((x, y), width, linewidth=1, edgecolor="b") self.ax.add_patch(circle) bounds = remove_units(self.bounds) plt.xlim([0, bounds[0]]) plt.ylim([0, bounds[1]]) plt.draw() plt.pause(0.01)
# configs
[docs] def make_random_position(bounds): unitless_bounds = [bound.to(units.um).magnitude for bound in bounds] return [ np.random.uniform(0, unitless_bounds[0]), np.random.uniform(0, unitless_bounds[1]), ] * units.um
[docs] def single_agent_config(config): # cell dimensions width = 1.0 * units.um length = 2.0 * units.um volume = volume_from_length(length, width) bounds = config.get("bounds", DEFAULT_BOUNDS) location = config.get("location") if location: location = [loc * bounds[n] for n, loc in enumerate(location)] else: location = make_random_position(bounds) return { "boundary": { "location": location, "angle": np.random.uniform(0, 2 * PI), "volume": volume, "length": length, "width": width, "mass": 1339 * units.fg, "thrust": 0, "torque": 0, } }
[docs] def agent_body_config(config): agent_ids = config["agent_ids"] agent_config = {agent_id: single_agent_config(config) for agent_id in agent_ids} return {"agents": agent_config}
default_gd_config = {"bounds": DEFAULT_BOUNDS} default_gd_config.update( agent_body_config({"bounds": DEFAULT_BOUNDS, "agent_ids": ["1", "2"]}) )
[docs] class InvokeUpdate(object): def __init__(self, update): self.update = update
[docs] def get(self, timeout=0): return self.update
# tests and simulations def test_multibody(n_agents=1, time=10, return_data=False): agent_ids = [str(agent_id) for agent_id in range(n_agents)] multibody_config = { "agents": agent_body_config({"bounds": DEFAULT_BOUNDS, "agent_ids": agent_ids}) } multibody = Multibody(multibody_config) # initialize agent's boundary state initial_agents_state = multibody_config["agents"] initial_state = {"agents": initial_agents_state} experiment = process_in_experiment(multibody, initial_state=initial_state) # run experiment settings = {"timestep": 1, "total_time": time, "return_raw_data": True} data = simulate_experiment(experiment, settings) if return_data: return data def test_growth_division( config=default_gd_config, growth_rate=0.05, growth_rate_noise=0.001, division_volume=0.4**3 * units.fL, total_time=10, timestep=1, experiment_settings=None, return_data=False, ): if not experiment_settings: experiment_settings = {} initial_agents_state = config["agents"] # make the process multibody = Multibody(config) experiment = process_in_experiment(multibody, experiment_settings) experiment.state._update_subschema( ("agents",), { "boundary": { "mass": {"_updater": "set", "_divider": "split"}, "length": {"_updater": "set", "_divider": "split"}, "volume": {"_updater": "set", "_divider": "split"}, } }, ) experiment.state._apply_subschemas() # make initial agent state experiment.state.set_value({"agents": initial_agents_state}) agents_store = experiment.state.get_path(["agents"]) # emit initial state experiment._emit_store_data() # run simulation time = 0 while time < total_time: experiment.update(timestep) time += timestep agents_state = agents_store.get_value() invoked_update = [] for agent_id, state in agents_state.items(): state = state["boundary"] length = state["length"] width = state["width"] mass = state["mass"] # .magnitude # update growth_rate2 = ( growth_rate + np.random.normal(0.0, growth_rate_noise) ) * timestep new_mass = mass + mass * growth_rate2 new_length = length + length * growth_rate2 new_volume = volume_from_length(new_length, width) if new_volume > division_volume: daughter_ids = [str(agent_id) + "0", str(agent_id) + "1"] daughter_updates = [] for daughter_id in daughter_ids: daughter_updates.append( { "key": daughter_id, "processes": {}, "topology": {}, "initial_state": {}, } ) update = { "_divide": {"mother": agent_id, "daughters": daughter_updates} } else: update = { agent_id: { "boundary": { "volume": new_volume, "length": new_length, "mass": new_mass, } } } # * units.fg invoked_update.append((InvokeUpdate({"agents": update}), None)) # update experiment experiment._send_updates(invoked_update) experiment.end() if return_data: return experiment.emitter.get_data_unitless()
[docs] def run_growth_division( out_dir="out", animate=True, ): n_agents = 2 agent_ids = [str(agent_id) for agent_id in range(n_agents)] # configure the multibody process bounds = DEFAULT_BOUNDS multibody_config = { "animate": animate, # 'jitter_force': 1e0, "bounds": bounds, } body_config = {"bounds": bounds, "agent_ids": agent_ids} multibody_config.update(agent_body_config(body_config)) # experiment settings experiment_settings = {"progress_bar": False, "display_info": False} # run the test gd_data = test_growth_division( config=multibody_config, growth_rate=0.05, growth_rate_noise=0.001, division_volume=volume_from_length(4, 1) * units.fL, total_time=100, experiment_settings=experiment_settings, return_data=True, ) agents, fields = format_snapshot_data(gd_data) return plot_snapshots(bounds, agents=agents, fields=fields, out_dir=out_dir)
def test_daughter_locations(): locations = daughter_locations( [2, 2], {"length": 2 * math.sqrt(2), "angle": -math.pi / 4}, ) assert locations == [[1.5, 2.5], [2.5, 1.5]]
[docs] def main(): out_dir = os.path.join(PROCESS_OUT_DIR, NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) run_growth_division(out_dir)
if __name__ == "__main__": main()