Source code for ecoli.processes.chemotaxis.flagella_motor

"""
========================
Flagella Motor Processes
========================

``FlagellaMotor`` :term:`process` generates thrust and torque based on number of
flagella and their individual motor states.

References:
 CheY phosphorylation model from:
    `Kollmann, M., Lovdok, L., Bartholome, K., Timmer, J., & Sourjik, V. (2005).
    Design principles of a bacterial signalling network. Nature.`
 Veto model of motor activity from:
    `Mears, P. J., Koirala, S., Rao, C. V., Golding, I., & Chemla, Y. R. (2014).
    Escherichia coli swimming is robust against variations in flagellar number. Elife.`
 Rotational state of an individual flagellum from:
    `Sneddon, M. W., Pontius, W., & Emonet, T. (2012). Stochastic coordination of multiple
    actuators reduces latency and improves chemotactic response in bacteria.`

"""

import os
import copy
import random
import math

import numpy as np

from vivarium.core.process import Process
from vivarium.core.composition import simulate_process, PROCESS_OUT_DIR
from vivarium.core.emitter import timeseries_from_data
from vivarium.plots.simulation_output import plot_simulation_output

from ecoli.library.schema import create_unique_indexes

# plots
from matplotlib import colors, pyplot as plt
from matplotlib.patches import Patch


NAME = "flagella_motor"


[docs] class FlagellaMotor(Process): """Flagellar motor activity :term:`Ports`: * **internal_counts** * **flagella** * **internal** * **boundary** * **membrane** """ name = NAME expected_pmf = ( -140 ) # PMF ~170mV at pH 7, ~140mV at pH 7.7 (Berg H, E. coli in motion, 2004, pg 113) expected_flagella = 4 expected_thrust = 0.5 # (pN) Hughes MP & Morgan H. (1999) Measurement of bacterial flagellar thrust. defaults = { "time_step": 0.01, "n_flagella": expected_flagella, # CheY phosphorylation parameters "k_y": 100.0, # 1/uM/s "k_z": 30.0, # / [CheZ], "gamma_y": 0.1, # rate constant "k_s": 0.45, # scaling coefficient "adapt_precision": 8, # scales CheY_P to cluster activity # rotational state of individual flagella # parameters from Sneddon, Pontius, and Emonet (2012) "omega": 1.3, # (1/s) characteristic motor switch time "g_0": 40, # (k_B*T) free energy barrier for CCW-->CW "g_1": 40, # (k_B*T) free energy barrier for CW-->CCW "K_D": 3.06, # binding constant of CheY-P to base of the motor # added parameters "ccw_to_cw_leak": 0.2, # 1/s rate of spontaneous switch to cw # motile force parameters "flagellum_thrust": expected_thrust / math.log(expected_flagella + 1), "tumble_jitter": 60.0, "tumble_scaling": 0.5 / expected_pmf, # scale to expected PMF "run_scaling": 1.0 / expected_pmf, # scale to expected PMF # initial state "initial_state": { "internal": { # response regulator proteins "CheY": 2.59, "CheY_P": 2.59, # (uM) mean concentration of CheY-P "CheZ": 1.0, # (uM) phosphatase "CheA": 1.0, # (uM) # sensor activity "chemoreceptor_activity": 1 / 3, # cell motile state "motile_state": 1, # 1 for tumble, 0 for run }, "membrane": { "PMF": expected_pmf, }, "boundary": { "thrust": 0, "torque": 0, }, }, # seed for unique ID generation "seed": 0, } def __init__(self, parameters=None): super().__init__(parameters) self.random_state = np.random.RandomState(seed=self.parameters["seed"])
[docs] def ports_schema(self): ports = [ "internal_counts", "flagella", "internal", "boundary", "membrane", ] schema = {port: {} for port in ports} # internal_counts of flagella (n_flagella) schema["internal_counts"]["flagella"] = { "_default": self.parameters["n_flagella"], "_emit": True, } # flagella schema["flagella"] = { "_divider": "split_dict", "*": {"_default": 1, "_updater": "set", "_emit": True}, } # internal state_emit = [ "chemoreceptor_activity", "motile_state", "CheA", "CheY_P", "CheY", ] state_set_updater = [ "motile_state", "CheA", "CheY_P", "CheY", ] for state, default in self.parameters["initial_state"]["internal"].items(): schema["internal"][state] = {"_default": default} if state in state_emit: schema["internal"][state].update({"_emit": True}) if state in state_set_updater: schema["internal"][state].update({"_updater": "set"}) # boundary (thrust and torque) for state, default in self.parameters["initial_state"]["boundary"].items(): schema["boundary"][state] = { "_default": default, "_emit": True, "_updater": "set", } # membrane for state in ["PMF", "protons_flux_accumulated"]: schema["membrane"][state] = { "_default": self.parameters["initial_state"]["membrane"].get(state, 0.0) } return schema
[docs] def next_update(self, timestep, states): # get flagella sub-compartments and current flagella protein counts flagella = states["flagella"] n_flagella = states["internal_counts"]["flagella"] # proton motive force PMF = states["membrane"]["PMF"] # internal states internal = states["internal"] P_on = internal["chemoreceptor_activity"] CheY_0 = internal["CheY"] CheY_P_0 = internal["CheY_P"] # parameters adapt_precision = self.parameters["adapt_precision"] k_y = self.parameters["k_y"] k_s = self.parameters["k_s"] k_z = self.parameters["k_z"] gamma_y = self.parameters["gamma_y"] ## Kinase activity # relative steady-state concentration of phosphorylated CheY. # CheZ combined in k_z as dephosphorylation rate of CheY-P new_CheY_P = ( adapt_precision * k_y * k_s * P_on / (k_y * k_s * P_on + k_z + gamma_y) ) dCheY_P = new_CheY_P - CheY_P_0 # TODO -- add an assert here instead CheY_P = max(new_CheY_P, 0.0) # keep value positive CheY = max(CheY_0 - dCheY_P, 0.0) # keep value positive ## Update flagella subcompartments # check number of flagella proteins, compare with sub-compartments add/delete accordingly flagella_update = {} new_flagella = int(n_flagella) - len(flagella) if new_flagella < 0: flagella_update["_delete"] = [] remove = random.sample(list(flagella.keys()), abs(new_flagella)) for flagella_id in remove: flagella_update["_delete"].append((flagella_id,)) elif new_flagella > 0: flagella_update["_add"] = [] new_flagella_indexes = create_unique_indexes( new_flagella, self.random_state ) for index in range(new_flagella): flagella_id = new_flagella_indexes[index] flagella_update["_add"].append( {"key": str(flagella_id), "state": random.choice([-1, 1])} ) # update individual flagella states for flagella_id, motor_state in flagella.items(): new_motor_state = self.update_flagellum(motor_state, CheY_P, timestep) flagella_update.update({flagella_id: new_motor_state}) ## get cell motile state. # if any flagella is rotating CW, the cell tumbles. # flagella motor state: -1 for CCW, 1 for CW # motile state: -1 for run, 1 for tumble, 0 for no state if any(state == 1 for state in flagella_update.values()): motile_state = 1 [thrust, torque] = self.tumble(n_flagella, PMF) elif len(flagella_update) > 0: motile_state = -1 [thrust, torque] = self.run(n_flagella, PMF) else: motile_state = 0 thrust = 0 torque = 0 return { "flagella": flagella_update, "internal": {"motile_state": motile_state, "CheY_P": CheY_P, "CheY": CheY}, "boundary": {"thrust": thrust, "torque": torque}, }
[docs] def update_flagellum(self, motor_state, CheY_P, timestep): """ calculate the rotational state of a individual flagellum .. note:: TODO -- normal, semi, curly states from Sneddon """ g_0 = self.parameters["g_0"] # (k_B*T) free energy barrier for CCW-->CW g_1 = self.parameters["g_1"] # (k_B*T) free energy barrier for CW-->CCW K_D = self.parameters["K_D"] # binding constant of CheY-P to base of the motor omega = self.parameters["omega"] # (1/s) characteristic motor switch time # free energy barrier delta_g = g_0 / 4 - g_1 / 2 * (CheY_P / (CheY_P + K_D)) # switching frequency CW_to_CCW = omega * math.exp(delta_g) CCW_to_CW = omega * math.exp(-delta_g) CW_bias = CCW_to_CW / (CCW_to_CW + CW_to_CCW) CCW_bias = CW_to_CCW / (CW_to_CCW + CCW_to_CW) # don't let ccw_to_cw get under leak value if CW_bias < self.parameters["ccw_to_cw_leak"]: CW_bias = self.parameters["ccw_to_cw_leak"] # flagella motor state: -1 for CCW, 1 for CW if motor_state == -1: # switch probability as function of the time step switch_rate = -math.log(1 - CW_bias) prob_switch = 1 - math.exp(-switch_rate * timestep) if np.random.random(1)[0] <= prob_switch: new_motor_state = 1 else: new_motor_state = -1 elif motor_state == 1: # switch probability as function of the time step switch_rate = -math.log(1 - CCW_bias) prob_switch = 1 - math.exp(-switch_rate * timestep) if np.random.random(1)[0] <= prob_switch: new_motor_state = -1 else: new_motor_state = 1 return new_motor_state
[docs] def tumble(self, n_flagella, PMF): """ thrust scales with log(n_flagella) because only the thickness of the bundle is affected """ thrust = ( self.parameters["tumble_scaling"] * PMF * self.parameters["flagellum_thrust"] * math.log(n_flagella + 1) ) torque = random.normalvariate(0, self.parameters["tumble_jitter"]) return [thrust, torque]
[docs] def run(self, n_flagella, PMF): """ thrust scales with log(n_flagella) because only the thickness of the bundle is affected """ thrust = ( self.parameters["run_scaling"] * PMF * self.parameters["flagellum_thrust"] * math.log(n_flagella + 1) ) torque = 0.0 return [thrust, torque]
# test functions
[docs] def get_chemoreceptor_activity_timeline( total_time=2, time_step=0.01, rate=1.0, initial_value=1.0 / 3.0 ): val = copy.copy(initial_value) timeline = [(0, {("internal", "chemoreceptor_activity"): initial_value})] t = 0 while t < total_time: val += random.choice((-1, 1)) * rate * time_step if val < 0: val = 0 if val > 1: val = 1 timeline.append((t, {("internal", "chemoreceptor_activity"): val})) t += time_step return timeline
def test_variable_flagella(out_dir="out", return_data=False): total_time = 30 time_step = 0.01 initial_flagella = 2 # make timeline timeline = get_chemoreceptor_activity_timeline( total_time=total_time, time_step=time_step, rate=2.0, ) timeline_flagella = [ (10, {("internal_counts", "flagella"): initial_flagella + 1}), (20, {("internal_counts", "flagella"): initial_flagella + 2}), ] timeline.extend(timeline_flagella) # run simulation process_config = {"n_flagella": initial_flagella} process = FlagellaMotor(process_config) settings = { "return_raw_data": True, "timeline": {"timeline": timeline, "time_step": time_step}, } raw_data = simulate_process(process, settings) if return_data: return raw_data
[docs] def plot_activity( data, settings=None, out_dir="out", filename="motor_control", ): if not settings: settings = {} if settings.get("data_type") == "timeseries": timeseries = data else: timeseries = timeseries_from_data(data) aspect_ratio = settings.get("aspect_ratio", 1) fontsize = settings.get("fontsize", 12) CheY_vec = timeseries["internal"]["CheY"] CheY_P_vec = timeseries["internal"]["CheY_P"] motile_state_vec = timeseries["internal"]["motile_state"] thrust_vec = timeseries["boundary"]["thrust"] flagella_activity = timeseries.get("flagella", {}) time_vec = timeseries["time"] # make flagella activity grid flagella_ids = list(flagella_activity.keys()) flagella_indexes = {} next_index = 0 activity_grid = np.zeros((len(flagella_ids), len(time_vec))) total_CW = np.zeros((len(time_vec))) for time_index, (time, time_data) in enumerate(data.items()): time_flagella = time_data.get("flagella", {}) for flagella_id, rotation_states in time_flagella.items(): # get flagella_index by order of appearance if flagella_id not in flagella_indexes: flagella_indexes[flagella_id] = next_index next_index += 1 flagella_index = flagella_indexes[flagella_id] modified_rotation_state = 0 CW_rotation_state = 0 if rotation_states == -1: modified_rotation_state = 1 elif rotation_states == 1: modified_rotation_state = 2 CW_rotation_state = 1 activity_grid[flagella_index, time_index] = modified_rotation_state total_CW += np.array(CW_rotation_state) # grid for cell state motile_state_grid = np.zeros((1, len(time_vec))) motile_state_grid[0, :] = motile_state_vec # set up colormaps # cell motile state cmap1 = colors.ListedColormap(["steelblue", "lightgray", "darkorange"]) bounds1 = [-1, -1 / 3, 1 / 3, 1] norm1 = colors.BoundaryNorm(bounds1, cmap1.N) motile_legend_elements = [ Patch(facecolor="steelblue", edgecolor="k", label="Run"), Patch(facecolor="darkorange", edgecolor="k", label="Tumble"), Patch(facecolor="lightgray", edgecolor="k", label="N/A"), ] # rotational state cmap2 = colors.ListedColormap(["lightgray", "steelblue", "darkorange"]) bounds2 = [0, 0.5, 1.5, 2] norm2 = colors.BoundaryNorm(bounds2, cmap2.N) rotational_legend_elements = [ Patch(facecolor="steelblue", edgecolor="k", label="CCW"), Patch(facecolor="darkorange", edgecolor="k", label="CW"), Patch(facecolor="lightgray", edgecolor="k", label="N/A"), ] # plot results cols = 1 rows = 4 width = 3 height = width / aspect_ratio plt.figure(figsize=(width, height)) plt.rc("font", size=fontsize) # define subplots ax1 = plt.subplot(rows, cols, 1) ax3 = plt.subplot(rows, cols, 2) ax4 = plt.subplot(rows, cols, 3) ax5 = plt.subplot(rows, cols, 4) # plot Che-P state ax1.plot(time_vec, CheY_vec, label="CheY") ax1.plot(time_vec, CheY_P_vec, label="CheY_P") ax1.legend(loc="center left", bbox_to_anchor=(1, 0.5)) ax1.set_xticks([]) ax1.spines["right"].set_visible(False) ax1.spines["top"].set_visible(False) ax1.set_xlim(time_vec[0], time_vec[-1]) ax1.set_ylabel("concentration \n (uM)") # plot flagella states in a grid if len(activity_grid) > 0: ax3.imshow( activity_grid, interpolation="nearest", aspect="auto", cmap=cmap2, norm=norm2, # extent=[-1,1,-1,1] extent=[time_vec[0], time_vec[-1], len(flagella_ids) + 0.5, 0.5], ) plt.locator_params(axis="y", nbins=len(flagella_ids)) ax3.set_yticks(list(range(1, len(flagella_ids) + 1))) ax3.set_xticks([]) ax3.set_ylabel("flagella #") ax3.tick_params(axis="both", labelsize=fontsize) # legend ax3.legend( title="activity", handles=rotational_legend_elements, loc="center left", bbox_to_anchor=(1, 0.5), ) else: # no flagella ax3.set_axis_off() # plot cell motile state ax4.imshow( motile_state_grid, interpolation="nearest", aspect="auto", cmap=cmap1, norm=norm1, extent=[time_vec[0], time_vec[-1], 0, 1], ) ax4.set_yticks([]) ax4.set_xticks([]) # legend ax4.legend( title="motile state", handles=motile_legend_elements, loc="center left", bbox_to_anchor=(1, 0.5), ) # plot motor thrust ax5.plot(time_vec, thrust_vec) ax5.set_xlim(time_vec[0], time_vec[-1]) ax5.spines["right"].set_visible(False) ax5.spines["top"].set_visible(False) ax5.set_ylabel("thrust (pN)") ax5.set_xlabel("time (sec)") # save figure fig_path = os.path.join(out_dir, filename) plt.subplots_adjust(wspace=0.7, hspace=0.3) plt.savefig(fig_path + ".png", bbox_inches="tight")
[docs] def plot_motor_PMF(output, out_dir="out", figname="motor_PMF"): # motile_state = output["motile_state"] thrust = output["thrust"] # motile_torque = output["motile_torque"] PMF = output["PMF"] # plot results cols = 1 rows = 1 plt.figure(figsize=(10 * cols, 2 * rows)) # define subplots ax1 = plt.subplot(rows, cols, 1) # plot motile_state ax1.plot(PMF, thrust) ax1.set_xlabel("PMF (mV)") ax1.set_ylabel("force") # save figure fig_path = os.path.join(out_dir, figname) plt.subplots_adjust(wspace=0.7, hspace=0.3) plt.savefig(fig_path + ".png", bbox_inches="tight")
[docs] def plot_signal_transduction( timeseries, plot_config, out_dir="out", filename="signal_transduction" ): if plot_config.get("ligand_id"): ligand_id = plot_config["ligand_id"] ligand = {ligand_id: timeseries["boundary"]["external"][ligand_id]} else: ligand = timeseries["boundary"]["external"] aspect_ratio = plot_config.get("aspect_ratio", 1) fontsize = plot_config.get("fontsize", 12) chemoreceptor_activity = timeseries["internal"]["chemoreceptor_activity"] CheY_P = timeseries["internal"]["CheY_P"] motile_state = timeseries["internal"]["motile_state"] time_vec = timeseries["time"] # grid for cell state motile_state_grid = np.zeros((1, len(time_vec))) motile_state_grid[0, :] = motile_state # set up colormaps # cell motile state cmap1 = colors.ListedColormap(["steelblue", "lightgray", "darkorange"]) bounds1 = [-1, -1 / 3, 1 / 3, 1] norm1 = colors.BoundaryNorm(bounds1, cmap1.N) motile_legend_elements = [ Patch(facecolor="steelblue", edgecolor="k", label="Run"), Patch(facecolor="darkorange", edgecolor="k", label="Tumble"), Patch(facecolor="lightgray", edgecolor="k", label="N/A"), ] # plot results cols = 1 rows = 4 width = 3 height = width / aspect_ratio plt.figure(figsize=(width, height)) plt.rc("font", size=12) ax1 = plt.subplot(rows, cols, 1) ax2 = plt.subplot(rows, cols, 2) ax3 = plt.subplot(rows, cols, 3) ax4 = plt.subplot(rows, cols, 4) for ligand_id, ligand_vec in ligand.items(): ax1.plot(time_vec, ligand_vec, "steelblue") ax2.plot(time_vec, chemoreceptor_activity, "steelblue") ax3.plot(time_vec, CheY_P, "steelblue") # plot cell motile state ax4.imshow( motile_state_grid, interpolation="nearest", aspect="auto", cmap=cmap1, norm=norm1, extent=[time_vec[0], time_vec[-1], 0, 1], ) ax1.set_xticklabels([]) ax1.spines["right"].set_visible(False) ax1.spines["top"].set_visible(False) ax1.tick_params(right=False, top=False) ax1.set_ylabel("external ligand \n (mM) ", fontsize=fontsize) ax2.set_xticklabels([]) ax2.spines["right"].set_visible(False) ax2.spines["top"].set_visible(False) ax2.tick_params(right=False, top=False) ax2.set_ylabel("cluster activity \n P(on)", fontsize=fontsize) ax3.set_xticklabels([]) ax3.spines["right"].set_visible(False) ax3.spines["top"].set_visible(False) ax3.tick_params(right=False, top=False) ax3.set_ylabel("CheY-P", fontsize=fontsize) ax4.spines["right"].set_visible(False) ax4.spines["top"].set_visible(False) ax4.tick_params(right=False, top=False) ax4.set_xlabel("time (s)", fontsize=fontsize) ax4.set_ylabel("motile state", fontsize=fontsize) # legend ax4.legend( title="motile state", handles=motile_legend_elements, loc="center left", bbox_to_anchor=(1, 0.5), ) fig_path = os.path.join(out_dir, filename) plt.subplots_adjust(wspace=0.7, hspace=0.3) plt.savefig(fig_path + ".png", bbox_inches="tight")
[docs] def main(): out_dir = os.path.join(PROCESS_OUT_DIR, NAME) if not os.path.exists(out_dir): os.makedirs(out_dir) data = test_variable_flagella(out_dir, return_data=True) # plot plot_settings = {} timeseries = timeseries_from_data(data) plot_simulation_output(timeseries, plot_settings, out_dir) plot_activity(data, plot_settings, out_dir)
if __name__ == "__main__": main()