import argparse
import ast
import os
from itertools import combinations
import matplotlib
import numpy as np
import pandas as pd
import seaborn as sns
from scipy.stats import spearmanr
from esda.moran import Moran
from libpysal.weights import DistanceBand
from splot.esda import plot_moran
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from ecoli.analysis.antibiotics_colony import COUNTS_PER_FL_TO_NANOMOLAR
from ecoli.analysis.antibiotics_colony.amp_plots import PERIPLASMIC_VOLUME_FRACTION
[docs]
def make_spatial_correlation_plot(glc_data, column, to_conc=False):
# Filter to just last snapshot
max_t = glc_data.Time.max()
data = glc_data[glc_data.Time == max_t][["Boundary", "Volume", column]]
if to_conc:
data[column] = data[column] * COUNTS_PER_FL_TO_NANOMOLAR / data["Volume"]
location = data["Boundary"].apply(lambda b: np.array(b["location"]))
data["X"] = location.apply(lambda loc: loc[0])
data["Y"] = location.apply(lambda loc: loc[1])
location = data[["X", "Y"]].values
weights = DistanceBand(location, 3, alpha=-2.0, binary=False)
moran = Moran(data[column], weights, permutations=9999)
fig, ax = plot_moran(moran)
return fig, ax, moran
[docs]
def make_threshold_sweep_plot(glc_data, column, to_conc=False):
# Filter to just last snapshot
max_t = glc_data.Time.max()
data = glc_data[glc_data.Time == max_t][["Boundary", "Volume", column]]
if to_conc:
data[column] = data[column] * COUNTS_PER_FL_TO_NANOMOLAR / data["Volume"]
location = data["Boundary"].apply(lambda b: np.array(b["location"]))
data["X"] = location.apply(lambda loc: loc[0])
data["Y"] = location.apply(lambda loc: loc[1])
location = data[["X", "Y"]].values
thresholds = np.linspace(0, 50, 50)
i_values = []
max_i = 0
max_d = 0
for d in thresholds:
weights = DistanceBand(location, d, alpha=-2.0, binary=False)
moran = Moran(data[column], weights, permutations=9999)
i_values.append(moran.I)
if moran.I > max_i:
max_i = moran.I
max_d = d
fig, ax = plt.subplots()
ax.plot(thresholds, i_values)
ax.set_ylim(0, 0.06)
ax.set_title(f"I vs. distance threshold (max I attained at d={max_d})")
return fig, ax
[docs]
def load_data(
glc_data,
verbose=False,
):
# Load glc data
if verbose:
print("Loading Glucose data...")
glc_data = pd.read_csv(glc_data, dtype={"Agent ID": str, "Seed": str}, index_col=0)
# Convert string to actual dictionary
glc_data["Boundary"] = glc_data["Boundary"].apply(ast.literal_eval)
# Validate data:
# - glc_data must be in Glucose condition
glc_data_condition = glc_data["Condition"].unique()[0]
assert "Glucose" in glc_data_condition, "glc_data was not from Glucose condition!"
# Clean data:
# Add additional columns for periplasmic volume,
# concentration of AmpC in the periplasm
glc_data["Periplasmic Volume"] = PERIPLASMIC_VOLUME_FRACTION * glc_data["Volume"]
glc_data["AmpC conc"] = (
glc_data["AmpC monomer"] / glc_data["Periplasmic Volume"]
) * COUNTS_PER_FL_TO_NANOMOLAR
return glc_data
[docs]
def cli():
parser = argparse.ArgumentParser(
"Generate analysis plots for ampicillin colony sims."
)
parser.add_argument(
"glc_data",
type=str,
help="Locally saved dataframe file for glucose (before addition of ampicillin.)",
)
parser.add_argument(
"--outdir",
"-d",
default="out/analysis/paper_figures/figure_s4",
help="Directory in which to output the generated figures.",
)
parser.add_argument("--svg", "-s", action="store_true", help="Save as svg.")
parser.add_argument("--verbose", "-v", action="store_true")
args = parser.parse_args()
return args
[docs]
def main():
plt.rcParams["font.family"] = "Arial"
plt.rcParams["svg.fonttype"] = "none"
options = cli()
glc_data = load_data(
options.glc_data,
options.verbose,
)
agent_ids = glc_data.loc[
glc_data.loc[:, "Time"] == glc_data.loc[:, "Time"], "Agent ID"
]
norm_agent_ids = {}
for agent_id in agent_ids:
if len(agent_id) > 1:
parent_id = agent_id[:-1]
exp = 8 - len(parent_id)
else:
parent_id = "0"
exp = 8
binary_agent = int(parent_id, 2) * 2**exp
norm_agent_ids[agent_id] = binary_agent
norm_agent_id_col = [
norm_agent_ids[agent_id] if agent_id in norm_agent_ids else "0"
for agent_id in glc_data.loc[:, "Agent ID"]
]
glc_data["Lineage"] = norm_agent_id_col
# Ensure output directory exists
os.makedirs(options.outdir, exist_ok=True)
ext = ".svg" if options.svg else ".png"
# Variable, whether to convert to concentration
spatial_vars = {
"OmpF monomer": True,
"MarR monomer": True,
"AmpC conc": False,
"TolC monomer": True,
"Lineage": False,
}
# Plot relatedness vs. distance
if options.verbose:
print("Plotting distance vs. relatedness:")
fig, _ = make_relatedness_vs_distance_plot(glc_data)
fig.set_size_inches(3, 3)
fig.savefig(os.path.join(options.outdir, f"fig_s4_relatedness_vs_distance{ext}"))
# Compute and plot spatial autocorrelations
moran_results = {}
for col, to_conc in spatial_vars.items():
if options.verbose:
print(f"Computing and plotting spatial autocorrelation for {col}:")
fig, _, moran = make_spatial_correlation_plot(
glc_data, column=col, to_conc=to_conc
)
moran_results[col] = moran
fig.set_size_inches(6, 4)
fig.tight_layout()
fig.savefig(os.path.join(options.outdir, f"{col} Moran plot{ext}"))
# Plot threshold param sweep
fig, _ = make_threshold_sweep_plot(glc_data, column="OmpF monomer", to_conc=True)
fig.set_size_inches(4, 4)
fig.savefig(os.path.join(options.outdir, f"threshold_sweep{ext}"))
seed = glc_data.loc[:, "Seed"].iloc[0]
with open(os.path.join(options.outdir, f"Moran tests_{seed}.txt"), "w") as f:
for col, moran in moran_results.items():
f.write(f"\n{col}\n=============\n")
f.write(f"I = {moran.I}\n")
f.write(
f"Simulated p-value from {moran.permutations} permutations: p={moran.p_sim}\n"
)
f.write(
f"After Bonferroni correction with m={len(spatial_vars)} null hypotheses: {len(spatial_vars) * moran.p_sim}"
)
if __name__ == "__main__":
main()