Source code for ecoli.processes.environment.diffusion_field

"""
===============
Diffusion Field
===============
"""

import sys
import os
import argparse
import copy

import numpy as np
from scipy import constants
from scipy.ndimage import convolve

from vivarium.core.process import Process, assoc_path
from vivarium.core.engine import Engine
from vivarium.core.composition import PROCESS_OUT_DIR
from vivarium.library.units import units, remove_units
from vivarium.library.topology import get_in
from vivarium.library.dict_utils import deep_merge

from ecoli.library.lattice_utils import (
    get_bin_site,
    get_bin_volume,
    make_gradient,
    apply_exchanges,
    ExchangeAgent,
    make_diffusion_schema,
)
from ecoli.analysis.colony.snapshots import plot_snapshots

NAME = "diffusion_field"

# laplacian kernel for diffusion
LAPLACIAN_2D = np.array([[0.0, 1.0, 0.0], [1.0, -4.0, 1.0], [0.0, 1.0, 0.0]])
AVOGADRO = constants.N_A


[docs] class DiffusionField(Process): """ Diffusion in 2-dimensional fields of molecules with agent exchange Agent uptake and secretion occurs at agent locations. Notes: * Diffusion constant of glucose in 0.5 and 1.5 percent agarose gel is around :math:`6 * 10^{-10} \\frac{m^2}{s}` (Weng et al. 2005. Transport of glucose and poly(ethylene glycol)s in agarose gels). * Conversion to micrometers: :math:`6 * 10^{-10} \\frac{m^2}{s}=600 \\frac{micrometers^2}{s}`. """ name = NAME defaults = { "time_step": 1, "molecules": ["glc"], "initial_state": {}, "n_bins": [10, 10], "bounds": [10 * units.um, 10 * units.um], "depth": 3000.0 * units.um, # um "diffusion": 5e-1 * units.um**2 / units.sec, "gradient": {}, "exchanges_path": ("boundary", "exchanges"), "external_path": ("boundary", "external"), "location_path": ("boundary", "location"), } def __init__(self, parameters=None): super().__init__(parameters) # initial state self.molecule_ids = self.parameters["molecules"] self.initial = self.parameters["initial_state"] # parameters self.n_bins = self.parameters["n_bins"] self.bounds = self.parameters["bounds"] depth = self.parameters["depth"] # diffusion diffusion = self.parameters["diffusion"] bins_x = self.n_bins[0] bins_y = self.n_bins[1] length_x = self.bounds[0] length_y = self.bounds[1] dx = length_x / bins_x dy = length_y / bins_y dx2 = dx * dy self.diffusion = diffusion / dx2 self.diffusion_dt = 0.01 * units.sec # self.diffusion_dt = 0.5 * dx ** 2 * dy ** 2 / (2 * self.diffusion * (dx ** 2 + dy ** 2)) # volume, to convert between counts and concentration self.bin_volume = get_bin_volume(self.n_bins, self.bounds, depth) # initialize gradient fields gradient = self.parameters["gradient"] if gradient: unitless_bounds = [bound.to(units.um).magnitude for bound in self.bounds] gradient_fields = make_gradient(gradient, self.n_bins, unitless_bounds) self.initial.update(gradient_fields) self.exchanges_path = tuple(self.parameters["exchanges_path"]) self.external_path = tuple(self.parameters["external_path"]) self.location_path = tuple(self.parameters["location_path"])
[docs] def initial_state(self, config=None): return { "fields": { field: self.initial.get(field, self.ones_field()) for field in self.molecule_ids }, }
[docs] def ports_schema(self): return make_diffusion_schema( self.exchanges_path, self.external_path, self.location_path, self.parameters["bounds"], self.parameters["n_bins"], self.parameters["depth"], self.molecule_ids, self.ones_field(), )
[docs] def next_update(self, timestep, states): fields = states["fields"] agents = states["agents"] # make new fields for the updated state new_fields = copy.deepcopy(fields) ################### # apply exchanges # ################### new_fields, agent_updates = apply_exchanges( agents, new_fields, self.exchanges_path, self.location_path, self.n_bins, self.bounds, self.bin_volume, ) # diffuse field new_fields = self.diffuse(new_fields, timestep) # get total delta from exchange, diffusion, reaction delta_fields = { mol_id: new_fields[mol_id] - field for mol_id, field in fields.items() } # get each agent's new local environment local_environments = self.get_local_environments(agents, new_fields) update = { "fields": delta_fields, "agents": agent_updates, } deep_merge(update["agents"], local_environments) return update
[docs] def get_bin_site(self, location): return get_bin_site(location, self.n_bins, self.bounds)
[docs] def get_single_local_environments(self, location, fields): bin_site = self.get_bin_site(location) local_environment = {} for mol_id, field in fields.items(): local_environment[mol_id] = { "_value": field[bin_site] * units.mM, "_updater": "set", } return local_environment
[docs] def get_local_environments(self, agents, fields): local_environments = {} if agents: for agent_id, specs in agents.items(): assoc_path( local_environments, (agent_id,) + self.external_path, self.get_single_local_environments( get_in(specs, self.location_path), fields, ), ) return local_environments
[docs] def ones_field(self): return np.ones((self.n_bins[0], self.n_bins[1]), dtype=np.float64)
# diffusion functions
[docs] def diffusion_delta(self, field, timestep): """calculate concentration changes cause by diffusion""" field_new = field.copy() t = 0.0 dt = min(timestep, self.diffusion_dt.to(units.sec).magnitude) diffusion = self.diffusion.to(1 / units.sec).magnitude while t < timestep: field_new += ( diffusion * dt * convolve(field_new, LAPLACIAN_2D, mode="reflect") ) t += dt return field_new - field, field_new
[docs] def diffuse(self, fields, timestep): new_fields = {} for mol_id, field in fields.items(): # run diffusion if molecule field is not uniform if len(set(field.flatten())) != 1: _, new_field = self.diffusion_delta(field, timestep) else: new_field = field new_fields[mol_id] = new_field return new_fields
# testing
[docs] def get_random_field_config(): n_bins = (20, 20) return { "molecules": ["glc"], "initial_state": {"glc": 1.0 * np.random.rand(n_bins[0], n_bins[1])}, "n_bins": n_bins, "bounds": (20, 20) * units.um, "depth": 1e-2 * units.um, "diffusion": 1e-2 * units.um**2 / units.sec, # slow diffusion }
[docs] def get_gaussian_config(): return { "molecules": ["glc"], "n_bins": (20, 20), "bounds": (20, 20) * units.um, "depth": 100 * units.um, "gradient": { "type": "gaussian", "molecules": {"glc": {"center": [0.5, 0.5], "deviation": 1}}, }, }
[docs] def get_exponential_config(): return { "molecules": ["glc"], "n_bins": (20, 20), "bounds": (20, 20) * units.um, "depth": 100 * units.um, "gradient": { "type": "exponential", "molecules": { "glc": {"center": [1.0, 1.0], "base": 1 + 1e-3, "scale": 10.0} }, }, }
def test_all(): run_diffusion_field( config=get_random_field_config(), total_time=60, filename="random" ) run_diffusion_field( config=get_gaussian_config(), total_time=60, filename="gaussian" ) run_diffusion_field( config=get_exponential_config(), total_time=60, filename="exponential" )
[docs] def plot_fields(data, config, out_dir="out", filename="fields"): unitless_config = remove_units(config) fields = {time: time_data["fields"] for time, time_data in data.items()} plot_snapshots( unitless_config["bounds"], fields=fields, out_dir=out_dir, filename=filename )
[docs] def run_diffusion_field(config=None, total_time=100, filename="snapshots"): config = config or {} diff_process = DiffusionField(config) # make the toy exchange agent agent_id = "0" agent_params = { "mol_ids": ["glc"], "default_exchange": 100, "max_move": 1.0, } agent_process = ExchangeAgent(agent_params) # get initial fields initial_fields = diff_process.initial_state() # put them together in a simulation sim = Engine( processes={ "diff": diff_process, "agents": {agent_id: {"exchange": agent_process}}, }, topology={ "diff": {port: (port,) for port in diff_process.ports_schema().keys()}, "agents": {agent_id: {"exchange": {"boundary": ("boundary",)}}}, }, initial_state=initial_fields, ) sim.update(total_time) # plot data = sim.emitter.get_data_unitless() # add empty angle back in for the plot (this is undesirable) for t in data.keys(): data[t]["agents"][agent_id]["boundary"]["angle"] = 0.0 data[t]["agents"][agent_id]["boundary"]["length"] = 1.0 data[t]["agents"][agent_id]["boundary"]["width"] = 1.0 out_dir = os.path.join(PROCESS_OUT_DIR, "environment", NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) plot_snapshots( n_snapshots=6, bounds=get_in(data, (max(data), "dimensions", "bounds")), agents={time: d["agents"] for time, d in data.items()}, fields={time: d["fields"] for time, d in data.items()}, out_dir=out_dir, filename=filename, )
# python ecoli/processes/environment/diffusion_field.py if __name__ == "__main__": # test_all() parser = argparse.ArgumentParser(description="diffusion_field") parser.add_argument("--random", "-r", action="store_true", default=False) parser.add_argument("--gaussian", "-g", action="store_true", default=False) parser.add_argument("--exponential", "-e", action="store_true", default=False) args = parser.parse_args() no_args = len(sys.argv) == 1 if no_args: test_all() if args.random: run_diffusion_field( config=get_random_field_config(), total_time=60, filename="random" ) if args.gaussian: run_diffusion_field( config=get_gaussian_config(), total_time=60, filename="gaussian" ) if args.exponential: run_diffusion_field( config=get_exponential_config(), total_time=60, filename="exponential" )