Source code for reconstruction.ecoli.knowledge_base_raw

KnowledgeBase for Ecoli
Whole-cell knowledge base for Ecoli. Contains all raw, un-fit data processed
directly from CSV flat files.


import io
import os
import json
from typing import List, Dict
import warnings

from reconstruction.spreadsheets import read_tsv
from import tsv
from wholecell.utils import units  # used by eval()

FLAT_DIR = os.path.join(os.path.dirname(__file__), "flat")
    # "transcription_units.tsv",  # special cased in the constructor
    os.path.join("mass_fractions", "glycogen_fractions.tsv"),
    os.path.join("mass_fractions", "ion_fractions.tsv"),
    os.path.join("mass_fractions", "LPS_fractions.tsv"),
    os.path.join("mass_fractions", "lipid_fractions.tsv"),
    os.path.join("mass_fractions", "murein_fractions.tsv"),
    os.path.join("mass_fractions", "soluble_fractions.tsv"),
    os.path.join("trna_data", "trna_ratio_to_16SrRNA_0p4.tsv"),
    os.path.join("trna_data", "trna_ratio_to_16SrRNA_0p7.tsv"),
    os.path.join("trna_data", "trna_ratio_to_16SrRNA_1p6.tsv"),
    os.path.join("trna_data", "trna_ratio_to_16SrRNA_1p07.tsv"),
    os.path.join("trna_data", "trna_ratio_to_16SrRNA_2p5.tsv"),
    os.path.join("trna_data", "trna_growth_rates.tsv"),
    os.path.join("rna_seq_data", "rnaseq_rsem_tpm_mean.tsv"),
    os.path.join("rna_seq_data", "rnaseq_rsem_tpm_std.tsv"),
    os.path.join("rna_seq_data", "rnaseq_seal_rpkm_mean.tsv"),
    os.path.join("rna_seq_data", "rnaseq_seal_rpkm_std.tsv"),
    os.path.join("rrna_options", "remove_rrff", "genes_removed.tsv"),
    os.path.join("rrna_options", "remove_rrff", "rnas_removed.tsv"),
    os.path.join("rrna_options", "remove_rrff", "transcription_units_modified.tsv"),
        "rrna_options", "remove_rrna_operons", "transcription_units_added.tsv"
        "rrna_options", "remove_rrna_operons", "transcription_units_removed.tsv"
    os.path.join("condition", "tf_condition.tsv"),
    os.path.join("condition", "condition_defs.tsv"),
    os.path.join("condition", "environment_molecules.tsv"),
    os.path.join("condition", "timelines_def.tsv"),
    os.path.join("condition", "media_recipes.tsv"),
    os.path.join("condition", "media", "5X_supplement_EZ.tsv"),
    os.path.join("condition", "media", "MIX0-55.tsv"),
    os.path.join("condition", "media", "MIX0-57.tsv"),
    os.path.join("condition", "media", "MIX0-58.tsv"),
    os.path.join("condition", "media", "MIX0-844.tsv"),
    os.path.join("base_codes", "amino_acids.tsv"),
    os.path.join("base_codes", "dntp.tsv"),
    os.path.join("base_codes", "nmp.tsv"),
    os.path.join("base_codes", "ntp.tsv"),
    os.path.join("adjustments", "amino_acid_pathways.tsv"),
    os.path.join("adjustments", "balanced_translation_efficiencies.tsv"),
    os.path.join("adjustments", "translation_efficiencies_adjustments.tsv"),
    os.path.join("adjustments", "rna_expression_adjustments.tsv"),
    os.path.join("adjustments", "rna_deg_rates_adjustments.tsv"),
    os.path.join("adjustments", "protein_deg_rates_adjustments.tsv"),
    os.path.join("adjustments", "relative_metabolite_concentrations_changes.tsv"),
SEQUENCE_FILE = "sequence.fasta"
    os.path.join("new_gene_data", "new_gene_baseline_expression_parameters.tsv"),

    "amino_acid_export_kms": "amino_acid_export_kms_removed",
    "amino_acid_uptake_rates": "amino_acid_uptake_rates_removed",
    "complexation_reactions": "complexation_reactions_removed",
    "equilibrium_reactions": "equilibrium_reactions_removed",
    "fold_changes": "fold_changes_removed",
    "fold_changes_nca": "fold_changes_removed",
    "metabolic_reactions": "metabolic_reactions_removed",
    "metabolite_concentrations": "metabolite_concentrations_removed",
    "ppgpp_regulation": "ppgpp_regulation_removed",
    "transcriptional_attenuation": "transcriptional_attenuation_removed",
    "trna_charging_reactions": "trna_charging_reactions_removed",
    "complexation_reactions": "complexation_reactions_modified",
    "metabolic_reactions": "metabolic_reactions_modified",

    "complexation_reactions": "complexation_reactions_added",
    "equilibrium_reactions": "equilibrium_reactions_added",
    "metabolic_reactions": "metabolic_reactions_added",
    "metabolites": "metabolites_added",
    "ppgpp_regulation": "ppgpp_regulation_added",
    "trna_charging_reactions": "trna_charging_reactions_added",

[docs] class DataStore(object): def __init__(self): pass
[docs] class KnowledgeBaseEcoli(object): """KnowledgeBaseEcoli""" def __init__( self, operons_on: bool, remove_rrna_operons: bool, remove_rrff: bool, stable_rrna: bool, new_genes_option: str = "off", ): self.operons_on = operons_on self.stable_rrna = stable_rrna self.new_genes_option = new_genes_option if not operons_on and remove_rrna_operons: warnings.warn( "Setting the 'remove_rrna_operons' option to 'True'" " has no effect on the simulations when the 'operon'" " option is set to 'off'." ) self.compartments: List[ dict ] = [] # mypy can't track setattr(self, attr_name, rows) self.transcription_units: List[dict] = [] # Make copies to prevent issues with sticky global variables when # running multiple operon workflows through Fireworks self.list_of_dict_filenames: List[str] = LIST_OF_DICT_FILENAMES.copy() self.list_of_parameter_filenames: List[str] = LIST_OF_PARAMETER_FILENAMES.copy() self.removed_data: Dict[str, str] = REMOVED_DATA.copy() self.modified_data: Dict[str, str] = MODIFIED_DATA.copy() self.added_data: Dict[str, str] = ADDED_DATA.copy() self.new_gene_added_data: Dict[str, str] = {} self.parameter_file_attribute_names: List[str] = [ os.path.splitext(os.path.basename(filename))[0] for filename in self.list_of_parameter_filenames ] if self.operons_on: self.list_of_dict_filenames.append("transcription_units.tsv") if remove_rrna_operons: # Use alternative file with all rRNA transcription units if # remove_rrna_operons option was used self.removed_data.update( { "transcription_units": "rrna_options.remove_rrna_operons.transcription_units_removed", } ) self.added_data.update( { "transcription_units": "rrna_options.remove_rrna_operons.transcription_units_added", } ) else: self.removed_data.update( { "transcription_units": "transcription_units_removed", } ) self.added_data.update( { "transcription_units": "transcription_units_added", } ) if remove_rrff: self.list_of_parameter_filenames.append( os.path.join( "rrna_options", "remove_rrff", "mass_parameters_modified.tsv" ) ) self.removed_data.update( { "genes": "rrna_options.remove_rrff.genes_removed", "rnas": "rrna_options.remove_rrff.rnas_removed", } ) self.modified_data.update( { "mass_parameters": "rrna_options.remove_rrff.mass_parameters_modified", } ) if self.operons_on: self.modified_data.update( { "transcription_units": "rrna_options.remove_rrff.transcription_units_modified", } ) if self.new_genes_option != "off": new_gene_subdir = new_genes_option new_gene_path = os.path.join("new_gene_data", new_gene_subdir) assert os.path.isdir( os.path.join(FLAT_DIR, new_gene_path) ), "This new_genes_data subdirectory is invalid." nested_attr = "new_gene_data." + new_gene_subdir + "." # These files do not need to be joined to existing files self.list_of_dict_filenames.append( os.path.join(new_gene_path, "insertion_location.tsv") ) self.list_of_dict_filenames.append( os.path.join(new_gene_path, "gene_sequences.tsv") ) # These files need to be joined to existing files new_gene_shared_files = [ "genes", "rnas", "proteins", "rna_half_lives", "protein_half_lives_measured", ] for f in new_gene_shared_files: file_path = os.path.join(new_gene_path, f + ".tsv") # If these files are empty, fill in with default values at a # later point assert os.path.isfile(os.path.join(FLAT_DIR, file_path)), ( f"File {f}.tsv must be present in the new_genes_data" f" subdirectory {new_gene_subdir}." ) self.list_of_dict_filenames.append(file_path) self.new_gene_added_data.update({f: nested_attr + f}) rnaseq_path = os.path.join(new_gene_path, "rnaseq_rsem_tpm_mean.tsv") if os.path.isfile(os.path.join(FLAT_DIR, rnaseq_path)): self.list_of_dict_filenames.append(rnaseq_path) self.new_gene_added_data.update( { "rna_seq_data.rnaseq_rsem_tpm_mean": nested_attr + "rnaseq_rsem_tpm_mean" } ) # Load raw data from TSV files for filename in self.list_of_dict_filenames: self._load_tsv(FLAT_DIR, os.path.join(FLAT_DIR, filename)) for filename in self.list_of_parameter_filenames: self._load_parameters(FLAT_DIR, os.path.join(FLAT_DIR, filename)) self.genome_sequence = self._load_sequence( os.path.join(FLAT_DIR, SEQUENCE_FILE) ) self._prune_data() self._join_data() self._modify_data() if self.new_genes_option != "off": self._check_new_gene_ids(nested_attr) insert_pos = self._update_gene_insertion_location(nested_attr) insertion_sequence = self._get_new_gene_sequence(nested_attr) insert_end = self._update_gene_locations(nested_attr, insert_pos) self.new_gene_added_data.update({"genes": nested_attr + "genes"}) self.genome_sequence = ( self.genome_sequence[:insert_pos] + insertion_sequence + self.genome_sequence[insert_pos:] ) assert ( self.genome_sequence[insert_pos : (insert_end + 1)] == insertion_sequence ) self.added_data = self.new_gene_added_data self._join_data()
[docs] def _load_tsv(self, dir_name, file_name): path = self for sub_path in file_name[len(dir_name) + 1 :].split(os.path.sep)[:-1]: if not hasattr(path, sub_path): setattr(path, sub_path, DataStore()) path = getattr(path, sub_path) attr_name = file_name.split(os.path.sep)[-1].split(".")[0] setattr(path, attr_name, []) rows = read_tsv(file_name) setattr(path, attr_name, rows)
[docs] def _load_sequence(self, file_path): from Bio import SeqIO with open(file_path, "r") as handle: for record in SeqIO.parse(handle, "fasta"): return record.seq
[docs] def _load_parameters(self, dir_name, file_name): path = self for sub_path in file_name[len(dir_name) + 1 :].split(os.path.sep)[:-1]: if not hasattr(path, sub_path): setattr(path, sub_path, DataStore()) path = getattr(path, sub_path) attr_name = file_name.split(os.path.sep)[-1].split(".")[0] param_dict = {} with, "rb") as csvfile: reader = tsv.dict_reader(csvfile) for row in reader: value = json.loads(row["value"]) if row["units"] != "": # `eval()` the units [risky!] then strip it to just a unit # since `a_list * a_float` (like `1.0 [1/s]`) fails, and # `a_list * an_int` repeats the list, which is also broken. unit = eval(row["units"]) # risky! unit = units.getUnit(unit) # strip value = value * unit param_dict[row["name"]] = value setattr(path, attr_name, param_dict)
[docs] def _prune_data(self): """ Remove rows that are specified to be removed. Data will only be removed if all data in a row in the file specifying rows to be removed matches the same columns in the raw data file. """ # Check each pair of files to be removed for data_attr, attr_to_remove in self.removed_data.items(): # Build the set of data to identify rows to be removed data_to_remove = getattr(self, attr_to_remove.split(".")[0]) for attr in attr_to_remove.split(".")[1:]: data_to_remove = getattr(data_to_remove, attr) removed_cols = list(data_to_remove[0].keys()) ids_to_remove = set() for row in data_to_remove: ids_to_remove.add(tuple([row[col] for col in removed_cols])) # Remove any matching rows data = getattr(self, data_attr) n_entries = len(data) removed_ids = set() for i, row in enumerate(data[::-1]): checked_id = tuple([row[col] for col in removed_cols]) if checked_id in ids_to_remove: data.pop(n_entries - i - 1) removed_ids.add(checked_id) # Print warnings for entries that were marked to be removed that # does not exist in the original data file. Fold changes are # excluded since the original entries are split between two files. if not data_attr.startswith("fold_changes"): for unremoved_id in ids_to_remove - removed_ids: print( f"Warning: Could not remove row {unremoved_id} " f"in flat file {data_attr} because the row does not " f"exist." )
[docs] def _join_data(self): """ Add rows that are specified in additional files. Data will only be added if all the loaded columns from both datasets match. """ # Join data for each file with data to be added for data_attr, attr_to_add in self.added_data.items(): # Get datasets to join data = getattr(self, data_attr.split(".")[0]) for attr in data_attr.split(".")[1:]: data = getattr(data, attr) added_data = getattr(self, attr_to_add.split(".")[0]) for attr in attr_to_add.split(".")[1:]: added_data = getattr(added_data, attr) if added_data: # Some new gene additional files may be empty # Check columns are the same for each dataset col_diff = set(data[0].keys()).symmetric_difference( added_data[0].keys() ) if col_diff: raise ValueError( f"Could not join datasets {data_attr} and {attr_to_add} " f"because columns do not match (different columns: {col_diff})." ) # Join datasets for row in added_data: data.append(row)
[docs] def _modify_data(self): """ Modify entires in rows that are specified to be modified. Rows must be identified by their entries in the first column (usually the ID column). """ # Check each pair of files to be modified for data_attr, modify_attr in self.modified_data.items(): # Build the set of data to identify rows to be modified data_to_modify = getattr(self, modify_attr.split(".")[0]) for attr in modify_attr.split(".")[1:]: data_to_modify = getattr(data_to_modify, attr) data = getattr(self, data_attr) # If modifying a parameter file, replace values in dictionary if data_attr in self.parameter_file_attribute_names: for key, value in data_to_modify.items(): if key not in data: raise ValueError( f"Could not modify data {data_attr}" f"with {modify_attr} because the name {key} does " f"not exist in {data_attr}." ) data[key] = value # If modifying a table file, replace rows else: id_col_name = list(data_to_modify[0].keys())[0] id_to_modified_cols = {} for row in data_to_modify: id_to_modified_cols[row[id_col_name]] = row # Modify any matching rows with identical IDs if list(data[0].keys())[0] != id_col_name: raise ValueError( f"Could not modify data {data_attr} with " f"{modify_attr} because the names of the first columns " f"do not match." ) modified_entry_ids = set() for i, row in enumerate(data): if row[id_col_name] in id_to_modified_cols: modified_cols = id_to_modified_cols[row[id_col_name]] for col_name in data[i]: if col_name in modified_cols: data[i][col_name] = modified_cols[col_name] modified_entry_ids.add(row[id_col_name]) # Check for entries in modification data that do not exist in # original data id_diff = set(id_to_modified_cols.keys()).symmetric_difference( modified_entry_ids ) if id_diff: raise ValueError( f"Could not modify data {data_attr} with " f"{modify_attr} because of one or more entries in " f"{modify_attr} that do not exist in {data_attr} " f"(nonexistent entries: {id_diff})." )
[docs] def _check_new_gene_ids(self, nested_attr): """ Check to ensure each new gene, RNA, and protein id starts with NG. """ nested_data = getattr(self, nested_attr[:-1].split(".")[0]) for attr in nested_attr[:-1].split(".")[1:]: nested_data = getattr(nested_data, attr) new_genes_data = getattr(nested_data, "genes") new_RNA_data = getattr(nested_data, "rnas") new_protein_data = getattr(nested_data, "proteins") for row in new_genes_data: assert row["id"].startswith("NG"), "ids of new genes must start with NG" for row in new_RNA_data: assert row["id"].startswith("NG"), "ids of new gene rnas must start with NG" for row in new_protein_data: assert row["id"].startswith( "NG" ), "ids of new gene proteins must start with NG" return
[docs] def _update_gene_insertion_location(self, nested_attr): """ Update insertion location of new genes to prevent conflicts. """ genes_data = getattr(self, "genes") tu_data = getattr(self, "transcription_units") dna_sites_data = getattr(self, "dna_sites") nested_data = getattr(self, nested_attr[:-1].split(".")[0]) for attr in nested_attr[:-1].split(".")[1:]: nested_data = getattr(nested_data, attr) insert_loc_data = getattr(nested_data, "insertion_location") assert ( len(insert_loc_data) == 1 ), "each noncontiguous insertion should be in its own directory" insert_pos = insert_loc_data[0]["insertion_pos"] if not tu_data: # Check if specified insertion location is in another gene data_to_check = genes_data.copy() else: # Check if specified insertion location is in a transcription unit data_to_check = tu_data.copy() # Add important DNA sites to the list of locations to check # TODO: Check for other DNA sites if we include any in the future sites_data_to_check = [ site for site in dna_sites_data if site["common_name"] == "oriC" or site["common_name"] == "TerC" ] data_to_check += sites_data_to_check conflicts = [ row for row in data_to_check if ((row["left_end_pos"] is not None) and (row["left_end_pos"] != "")) and ((row["right_end_pos"] is not None) and (row["left_end_pos"] != "")) and (row["left_end_pos"] < insert_pos) and (row["right_end_pos"] >= insert_pos) ] # Change insertion location to after conflicts if conflicts: shift = max([sub["right_end_pos"] for sub in conflicts]) - insert_pos + 1 insert_pos = insert_pos + shift return insert_pos
[docs] def _update_global_coordinates(self, data, insert_pos, insert_len): """ Updates the left and right end positions for all elements in data if their positions will be impacted by the new gene insertion. Args: data: Data attribute to update insert_pos: Location of new gene insertion insert_len: Length of new gene insertion """ for row in data: left = row["left_end_pos"] right = row["right_end_pos"] if ( (left is not None and left != "") and (right is not None and right != "") and left >= insert_pos ): row.update({"left_end_pos": left + insert_len}) row.update({"right_end_pos": right + insert_len})
[docs] def _update_gene_locations(self, nested_attr, insert_pos): """ Modify positions of original genes based upon the insertion location of new genes. Returns end position of the gene insertion. """ genes_data = getattr(self, "genes") tu_data = getattr(self, "transcription_units") dna_sites_data = getattr(self, "dna_sites") nested_data = getattr(self, nested_attr[:-1].split(".")[0]) for attr in nested_attr[:-1].split(".")[1:]: nested_data = getattr(nested_data, attr) new_genes_data = getattr(nested_data, "genes") new_genes_data = sorted(new_genes_data, key=lambda d: d["left_end_pos"]) for i in range(len(new_genes_data) - 1): assert ( new_genes_data[i + 1]["left_end_pos"] == new_genes_data[i]["right_end_pos"] + 1 ), "gaps in new gene insertions are not supported at this time" insert_end = new_genes_data[-1]["right_end_pos"] + insert_pos insert_len = insert_end - insert_pos + 1 # Update global positions of original genes self._update_global_coordinates(genes_data, insert_pos, insert_len) # Update global positions of transcription units if tu_data: self._update_global_coordinates(tu_data, insert_pos, insert_len) # Update DNA site positions # (including the origin and terminus of replication) self._update_global_coordinates(dna_sites_data, insert_pos, insert_len) # Change relative insertion positions to global in reference genome for row in new_genes_data: left = row["left_end_pos"] right = row["right_end_pos"] row.update({"left_end_pos": left + insert_pos}) row.update({"right_end_pos": right + insert_pos}) return insert_end
[docs] def _get_new_gene_sequence(self, nested_attr): """ Determine genome sequnce for insertion using the sequences and relative locations of the new genes. """ from Bio import Seq nested_data = getattr(self, nested_attr[:-1].split(".")[0]) for attr in nested_attr[:-1].split(".")[1:]: nested_data = getattr(nested_data, attr) new_genes_data = getattr(nested_data, "genes") seq_data = getattr(nested_data, "gene_sequences") insertion_seq = Seq.Seq("") new_genes_data = sorted(new_genes_data, key=lambda d: d["left_end_pos"]) assert ( new_genes_data[0]["left_end_pos"] == 0 ), "first gene in new sequence must start at relative coordinate 0" for gene in new_genes_data: if gene["direction"] == "+": seq_row = next( (row for row in seq_data if row["id"] == gene["id"]), None ) seq_string = seq_row["gene_seq"] seq_addition = Seq.Seq(seq_string) insertion_seq += seq_addition else: seq_row = next( (row for row in seq_data if row["id"] == gene["id"]), None ) seq_string = seq_row["gene_seq"] seq_addition = Seq.Seq(seq_string) insertion_seq += seq_addition.reverse_complement() assert len(seq_addition) == ( gene["right_end_pos"] - gene["left_end_pos"] + 1 ), ( "left and right end positions must agree with actual " "sequence length for " + gene["id"] ) return insertion_seq