Source code for ecoli.processes.spatiality.diffusion_network

"""
=================
Diffusion Network
=================

``DiffusionNetwork`` models Brownian diffusion using a network of nodes
and edges. Each node acts as a cellular compartment and each edge connects
those compartments, indicating where diffusion can occur.

This :term:`process class` models diffusion based off of Fick's law.
The following equation is used:

* Diffusion: :math:`\\frac{dc}{dt} = \\frac{DA}{V} * \\frac{dc}{dx}`

 * :math:`D`: Diffusion constant
 * :math:`A`: Cross-sectional area of edge
 * :math:`V`: Volume of node

This diffusion equation is solved using implicit Euler to derive a
matrix, M, that updates the concentration, c(t), on each timestep.

* Concentration update: :math:`c_{t+1} = M^{-1} * c_{t}`

This process takes in molecular weights in order to solve for
molecule hydrodynamic radii, which is it turn used to solve for
diffusion constants. These calculations assume that all
molecules are spherical proteins. For more information on how
this is done, see ``calculate_rp_from_mw`` and
``compute_diffusion_constants_from_rp``.  However, if a
molecule radius is known, it can be passed in as ``radii``.
Similarly, if the diffusion constant is known, it can be passed
in as a property of an edge as ``diffusion_constants``.

.. note::
 This model treats all molecule classes as concentrations and is
 a deterministic solution. This model should only be used when there
 is a sufficient number of molecules such that they can be treated
 deterministically.

"""

import os
import numpy as np
import matplotlib.pyplot as plt
import scipy.constants

from iteround import saferound

from vivarium.core.process import Process
from vivarium.core.composition import (
    simulate_process,
    PROCESS_OUT_DIR,
)

NAME = "diffusion_network"


[docs] class DiffusionNetwork(Process): """Models diffusion between a network of connected nodes. :term:`Ports`: * **nodes**: Expects a :term:`store` which is a dict of node names (the keys of the dict) to a dict, which has the key value pairs for ``length``, ``volume``, and ``molecules``. Args: parameters: A dictionary of configuration options. The following configuration options may be provided: * **nodes** (:py:class:`list`): A list of node names. * **edges** (:py:class:`dict`): Maps edge names (the keys of the dict) to a dict (the values of the dict), which must include the key-value pairs of ``nodes`` to a list of nodes each edge connects, and ``cross_sectional_area`` to the area of that edge. Additionally, known diffusion constants can be included as ``diffusion_constants`` in units of um^2/s, and edge-specific scaling of the diffusion constants can be included as, ``diffusion_scaling_constant``. * **mw** (:py:class:`dict`): Maps from names of molecules (the keys of the dict) to their molecular weights in units of fg (the values of the dict). * **mesh_size** (:py:class:`float`): Mesh size in units of nm. * **time_step** (:py:class:`float`): The time step used in units of s. * **radii** (:py:class:`dict`): Maps from molecule names of molecules (the keys of the dict) to their known hydrodynamic radii in units of nm (the values of the dict). This is an optional parameter. * **temp** (:py:class:`float`): Temperature of experiment in units of K. This is an optional parameter. """ name = NAME defaults = { "nodes": [], "edges": {}, "mw": {}, # in fg "mesh_size": 50, # in nm "time_step": 0.1, # in s "radii": {}, # nm "temp": 310.15, # in K } def __init__(self, parameters=None): super().__init__(parameters) self.nodes = np.asarray(self.parameters["nodes"]) self.edges = self.parameters["edges"] self.mw = self.parameters["mw"] self.molecule_ids = self.parameters["mw"].keys() self.mesh_size = self.parameters["mesh_size"] self.radii = self.parameters["radii"] self.temp = self.parameters["temp"] self.remainder = np.zeros((len(self.nodes), len(self.molecule_ids))) # get molecule radii by molecular weights self.rp = calculate_rp_from_mw(self.molecule_ids, self.mw) for mol_id, r in self.radii.items(): self.rp[np.where(np.asarray(list(self.molecule_ids)) == mol_id)[0][0]] = r # get diffusion constants per molecule self.diffusion_constants = compute_diffusion_constants_from_rp( self.molecule_ids, self.rp, self.mesh_size, self.edges, self.temp )
[docs] def ports_schema(self): """ Dynamically constructs ports -- one port for each node in ``DiffusionNetwork.parameters['nodes']``. Each node has variables ``volume``, ``length``, and a ``molecules`` dict with molecule names mapped to their counts. """ schema = { node_id: { "volume": { "_default": 1.0, }, "length": { "_default": 1.0, }, "molecules": { "*": { "_default": 0, "_emit": True, "_updater": "accumulate", } }, } for node_id in self.parameters["nodes"] } return schema
[docs] def next_update(self, timestep, state): M = np.asarray([np.identity(len(self.nodes)) for mol in self.molecule_ids]) # construct M matrix based off of graph, all edges assumed bidirectional for edge_id, edge in self.edges.items(): node_index_1 = np.where(self.nodes == edge["nodes"][0])[0][0] node_index_2 = np.where(self.nodes == edge["nodes"][1])[0][0] cross_sectional_area = edge["cross_sectional_area"] vol_1 = state[edge["nodes"][0]]["volume"] vol_2 = state[edge["nodes"][1]]["volume"] dx = ( state[edge["nodes"][0]]["length"] / 2 + state[edge["nodes"][1]]["length"] / 2 ) diffusion_constants = array_from(self.diffusion_constants[edge_id]) alpha = diffusion_constants * (cross_sectional_area / dx) * timestep M[:, node_index_1, node_index_1] += alpha / vol_1 M[:, node_index_2, node_index_2] += alpha / vol_2 M[:, node_index_1, node_index_2] -= alpha / vol_1 M[:, node_index_2, node_index_1] -= alpha / vol_2 # Calculates final concentration after one timestep c_initial = np.asarray( [ np.multiply(array_from(state[node]["molecules"]), array_from(self.mw)) / state[node]["volume"] for node in state ] ) c_final = np.asarray( [np.matmul(np.linalg.inv(a), c_initial[:, i]) for i, a in enumerate(M)] ).T # Calculates final counts volumes = np.asarray([state[node]["volume"] for node in state]) count_initial = np.asarray( [array_from(state[node]["molecules"]) for node in state] ) count_final_unrounded = ( np.asarray( [ np.divide(node * volumes[i], array_from(self.mw)) for i, node in enumerate(c_final) ] ) + self.remainder ) count_final = np.asarray( [saferound(col, 0) for col in count_final_unrounded.T] ).T # Keeps track of remainder after rounding counts to integers self.remainder = count_final_unrounded - count_final delta = np.subtract(count_final, count_initial) # Ensures conservation of molecules assert np.array_equal( np.ndarray.sum(count_initial, axis=0), np.ndarray.sum(count_final, axis=0) ), "Molecule count is not conserved" update = { node_id: { "molecules": array_to( self.molecule_ids, delta[np.where(self.nodes == node_id)[0][0]].astype(int), ), } for node_id in self.nodes } return update
[docs] def calculate_request(self, timestep, states): return {}
# TODO(vivarium): change this to multiple tests and add asserts def test_diffusion_network_process(out_dir=None): # initialize the process by passing parameters n = int(1e6) molecule_ids = [str(np.round(i, 1)) for i in np.arange(0.1, 19.6, 0.1)] parameters = { "nodes": ["cytosol_front", "nucleoid", "cytosol_rear"], "edges": { "1": { "nodes": ["cytosol_front", "nucleoid"], "cross_sectional_area": np.pi * 0.3**2, "mesh": True, }, "2": { "nodes": ["nucleoid", "cytosol_rear"], "cross_sectional_area": np.pi * 0.3**2, "mesh": True, }, "3": { "nodes": ["cytosol_front", "cytosol_rear"], "cross_sectional_area": np.pi * 0.3**2, }, }, "mw": {str(np.round(i, 1)): np.round(i, 1) for i in np.arange(0.1, 19.6, 0.1)}, "mesh_size": 50, "radii": { str(np.round(i, 1)): np.round(i, 1) for i in np.arange(0.1, 19.6, 0.1) }, } diffusion_network_process = DiffusionNetwork(parameters) # run the simulation sim_settings = { "total_time": 10, "initial_state": { "cytosol_front": { "length": 0.5, "volume": 0.25, "molecules": {mol_id: n for mol_id in molecule_ids}, }, "nucleoid": { "length": 1.0, "volume": 0.5, "molecules": {mol_id: 0 for mol_id in molecule_ids}, }, "cytosol_rear": { "length": 0.5, "volume": 0.25, "molecules": {mol_id: 0 for mol_id in molecule_ids}, }, }, } output = simulate_process(diffusion_network_process, sim_settings) # rp = diffusion_network_process.rp # diffusion_constants = diffusion_network_process.diffusion_constants if out_dir: # plot the simulation output plot_output(output, sim_settings["initial_state"], out_dir) # Plots the diffusion constants by molecule sizes for edges with and without mesh
[docs] def plot_diff_range(diffusion_constants, rp, out_dir="out"): plt.figure() plt.plot(np.multiply(rp, 2), array_from(diffusion_constants["1"]), color="#d8b365") plt.plot(np.multiply(rp, 2), array_from(diffusion_constants["3"]), color="#5ab4ac") plt.yscale("log") plt.xlabel(r"Molecule size ($nm$)") plt.ylabel(r"Diffusion constant ($\mu m^2/s$)") plt.title("Diffusion constants of molecules") plt.legend(["with 50 nm mesh", "without mesh"]) out_file = out_dir + "/diffusion_constants.png" plt.tight_layout() plt.savefig(out_file, dpi=300)
# Plots the normalized concentrations for the largest and smallest molecules
[docs] def plot_output(output, nodes, out_dir="out"): plt.figure() colors = ["#d8b365", "#5ab4ac", "#018571"] large_total = ( array_from(output["cytosol_front"]["molecules"])[-1][0] + array_from(output["nucleoid"]["molecules"])[-1][0] + array_from(output["cytosol_rear"]["molecules"])[-1][0] ) small_total = ( array_from(output["cytosol_front"]["molecules"])[0][0] + array_from(output["nucleoid"]["molecules"])[0][0] + array_from(output["cytosol_rear"]["molecules"])[0][0] ) plt.plot( output["time"], np.divide( np.divide( array_from(output["cytosol_front"]["molecules"])[-1], nodes["cytosol_front"]["volume"], ), large_total, ), color=colors[0], ) plt.plot( output["time"], np.divide( np.divide( array_from(output["nucleoid"]["molecules"])[-1], nodes["nucleoid"]["volume"], ), large_total, ), color=colors[1], ) plt.plot( output["time"], np.divide( np.divide( array_from(output["cytosol_rear"]["molecules"])[-1], nodes["cytosol_rear"]["volume"], ), large_total, ), color=colors[2], ) plt.plot( output["time"], np.divide( np.divide( array_from(output["cytosol_front"]["molecules"])[0], nodes["cytosol_front"]["volume"], ), small_total, ), color=colors[0], linestyle="dashed", ) plt.plot( output["time"], np.divide( np.divide( array_from(output["nucleoid"]["molecules"])[0], nodes["nucleoid"]["volume"], ), small_total, ), color=colors[1], linestyle="dashed", ) plt.plot( output["time"], np.divide( np.divide( array_from(output["cytosol_rear"]["molecules"])[0], nodes["cytosol_rear"]["volume"], ), small_total, ), color=colors[2], linestyle="dashed", ) plt.xlabel("time (s)") plt.ylabel("Molecule counts") plt.title("Diffusion over compartments") plt.legend( [ "Cytosol front: large molecule", "Nucleoid: large molecule", "Cytosol rear: large molecule", "Cytosol front: small molecule", "Nucleoid: small molecule", "Cytosol rear: small molecule", ] ) out_file = out_dir + "/diffusion_large_small.png" plt.savefig(out_file)
# This function is modified from spatial_tool.py from WCM
[docs] def calculate_rp_from_mw(molecule_ids, mw): """ This function compute the hydrodynamic radius of a macromolecules from its molecular weight. It is important to note that the hydrodynamic diameter is mainly used for computation of diffusion constant, and can be different from the observed diameter under microscopes or the radius of gyration, especially for loose polymers such as RNAs. This function is not E coli specific. References: Bioinformatics (2012). doi:10.1093/bioinformatics/bts537 Args: molecule_ids: List of molecule ids. mw: molecular weight of the macromolecules, units: fg. Returns: the hydrodynamic radius (in unit of nm) of the macromolecules using the following formula: * ``rp = 0.0515*MW^(0.392) nm`` (Hong & Lei 2008) (protein) These parameters are also possible for other macromolecule types, however all molecules are currently assumed to be proteins. * ``rp = 0.0566*MW^(0.38) nm`` (Werner 2011) (RNA) * ``rp = 0.024*MW^(0.57) nm`` (Robertson et al 2006) (linear DNA) * ``rp = 0.0125*MW^(0.59) nm`` (Robertson et al 2006) (circular DNA) * ``rp = 0.0145*MW^(0.57) nm`` (Robertson et al 2006) (supercoiled DNA) """ dic_rp = { "protein": (0.0515, 0.392), "RNA": (0.0566, 0.38), "linear_DNA": (0.024, 0.57), "circular_DNA": (0.0125, 0.59), "supercoiled_DNA": (0.0145, 0.57), } r_p0, rp_power = dic_rp["protein"] fg_to_kDa = 602217364.34 mw_subset = {key: value for key, value in mw.items() if key in molecule_ids} r_p = np.multiply( r_p0, np.power(np.multiply(array_from(mw_subset), fg_to_kDa), rp_power) ) return r_p
# This function is modified from spatial_tool.py from WCM
[docs] def compute_diffusion_constants_from_rp(molecule_ids, rp, mesh_size, edges, temp): """ .. WARNING:: The default values of the parameters are E coli specific. This function computes the hypothesized diffusion constant of macromolecules within the nucleoid and the cytoplasm region. In literature, there is no known differentiation between the diffusion constant of a molecule in the nucleoid and in the cytoplasm up to the best of our knowledge in 2020. However, there is a good reason why we can assume that previously reported diffusion constant are in fact the diffusion constant of a protein in the nucleoid region: 1. The image traces of a protein within a bacteria usually cross the nucleoid regions. 2. The nucleoid region, compared to the cytoplasm, should be the main limiting factor restricting the magnitude of diffusion constant. 3. The same theory of diffusion constant has been implemented to mammalian cells, and the term 'rh', the average hydrodynamic radius of the biggest crowders, are different in mammalian cytoplasm, and it seems to reflect the hydrodynamic radius of the actin filament (note: the hydrodynamic radius of actin filament should be computed based on the average length of actin fiber, and is not equal to the radius of the actin filament itself.) (ref: Nano Lett. 2011, 11, 2157-2163). As for E coli, the 'rh' term = 40nm, which may correspond to the 80nm DNA fiber. On the other hand, for the diffusion constant of E coli in the true cytoplasm, we will expect the value of 'rh' term to be approximately 10 nm, which correspond to the radius of active ribosomes. Using these terms for scaling a baseline diffusion constant (calculated from Enstein-Stokes equation), a cytosol-specific diffusion calculation can be obtained). Ref: Kalwarczyk, T., Tabaka, M. & Holyst, R. Bioinformatics (2012). doi:10.1093/bioinformatics/bts537 This function computes the hypothesized diffusion constant of macromolecules within the nucleoid region by scaling the hypothesized cytoplasm diffusion constant. There is evidence that as molecules grow in size, they become more excluded from E. coli's nucleoid because the DNA polymers form a meshgrid with an estimated mesh size of 50 nm (ref: Xiang et al., bioRxiv (2020)). The equation implemented here assumes that molecules move through the network by encountering openings between the DNA meshgrid greater than the hydrodynamic radius. Ref: Brian Amsden Macromolecules (1999). doi:10.1021/ma980922a .. code-block:: text D_0 = K_B*T/(6*pi*eta_0*rp) ln(D_0/D_cyto) = ln(eta/eta_0) = (xi^2/Rh^2 + xi^2/rp^2)^(-a/2) D_0 = the diffusion constant of a macromolecule in pure solvent eta_0 = the viscosity of pure solvent, in this case, water eta = the size-dependent viscosity experienced by the macromolecule. xi = average distance between the surface of proteins rh = average hydrodynamic radius of the biggest crowders a = some constant of the order of 1 rp = hydrodynamic radius of probed molecule In this formula, since we allow the changes in temperature, we also consider the viscosity changes of water under different temperature: .. code-block:: text eta_0 = A*10^(B/(T-C)) A = 2.414*10^(-5) Pa*sec B = 247.8 K C = 140 K Ref: Dortmund Data Bank Args: molecule_ids: List of molecule ids. rp: List of radii corresponding to each molecule id. unit: nm mesh_size: Size of meshgrid openings. unit: nm edges: Dictionary of edges. temp: The temperature of interest. unit: K. Returns: the diffusion constant of the macromolecule in units ``um**2/sec`` """ if temp is None: temp = 310.15 parameters = (0.51, 0.53, 10) # unpack constants required for the calculation xi, a, rh_cyto = parameters # unit: nm, 1, nm ro = mesh_size / 2 rh = rh_cyto diffusion_constants = {} K_B = scipy.constants.Boltzmann # Boltzmann constant, unit: J/K # calculate viscosity of water based on temperature a_visc = 2.414 * 10 ** (-5) # unit: Pa*sec b_visc = 247.8 # unit: K c_visc = 140 # unit: K eta_0 = a_visc * 10 ** (b_visc / (temp - c_visc)) # unit: Pa*sec # conversions m2_to_um2 = 1e12 nm_to_m = 1e-9 # compute dc (diffusion constant) dc_0 = np.multiply( np.divide(K_B * temp, np.multiply(6 * np.pi * eta_0 * nm_to_m, rp)), m2_to_um2 ) dc_cyto = np.multiply( dc_0, np.exp( -np.power(np.add(xi**2 / rh**2, np.divide(xi**2, np.square(rp))), (-a / 2)) ), ) # compute impact from mesh when nucleoid is a node dc_nuc = np.multiply( dc_cyto, np.exp(np.multiply((-np.pi / 4), np.square(np.divide(rp, ro)))) ) for edge_id, edge in edges.items(): scaling_factor = 1 mesh = False if "mesh" in edge: mesh = edge["mesh"] if "diffusion_scaling_constant" in edge: scaling_factor = edge["diffusion_scaling_constant"] if mesh: diffusion_constants[edge_id] = array_to( molecule_ids, dc_nuc * scaling_factor ) else: diffusion_constants[edge_id] = array_to( molecule_ids, dc_cyto * scaling_factor ) if "diffusion_constants" in edge: for mol_id, dc in edge["diffusion_constants"].items(): diffusion_constants[edge_id][mol_id] = dc return diffusion_constants
# Helper function
[docs] def array_from(d): return np.array(list(d.values()))
# Helper function
[docs] def array_to(keys, array): return {key: array[index] for index, key in enumerate(keys)}
if __name__ == "__main__": # make an output directory to save plots out_dir = os.path.join(PROCESS_OUT_DIR, NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) test_diffusion_network_process(out_dir)