Source code for wholecell.tests.io.test_tsv

"""Unit test for the tsv module."""

from io import BytesIO
from itertools import zip_longest
import unittest

from wholecell.io import tsv

# Silence Sphinx autodoc warning
unittest.TestCase.__module__ = "unittest"


FIELD_NAMES = ["id", "ourLocation", "\u20ac:xyz", "mass (units.g)"]
INPUT_ROWS = [
    b"id\tourLocation\t\xe2\x82\xac:xyz\tmass (units.g)",
    b"G6660-MONOMER\t[x]\tLocation information from Lopez Campistrous 2005.\t98.6",
    b"2.71828\t[c]\tLocation from \xe2\x8a\x972011.\t12",
]
FIELD_NAMES_WITH_PRIVATE_FIELD = ["id", "ourLocation", "_\u20ac:xyz", "mass (units.g)"]
INPUT_ROWS_WITH_PRIVATE_FIELD = [
    b"id\tourLocation\t_\xe2\x82\xac:xyz\tmass (units.g)",
    b"G6660-MONOMER\t[x]\tLocation information from Lopez Campistrous 2005.\t98.6",
    b"2.71828\t[c]\tLocation from \xe2\x8a\x972011.\t12",
]


[docs] def _remove_private_fields(fieldnames): return [fieldname for fieldname in fieldnames if not fieldname.startswith("_")]
[docs] def _expected_row(index, private_field=False): values = INPUT_ROWS[index].decode("utf-8").split("\t") row_dict = {k: v for k, v in zip_longest(FIELD_NAMES, values, fillvalue=404)} if private_field: row_dict = { field_name: row_dict[field_name] for field_name in _remove_private_fields(FIELD_NAMES_WITH_PRIVATE_FIELD) } return row_dict
[docs] class Test_Tsv(unittest.TestCase): def test_reader(self): byte_stream = BytesIO(b"\n".join(INPUT_ROWS)) reader = tsv.reader(byte_stream) for row, expected in zip_longest(reader, INPUT_ROWS, fillvalue=404): assert row == expected.decode("utf-8").split("\t") def test_writer(self): byte_stream = BytesIO() writer = tsv.writer(byte_stream) for row in INPUT_ROWS: writer.writerow(row.decode("utf-8").split("\t")) data = byte_stream.getvalue() assert data == b"\r\n".join(INPUT_ROWS + [b""]) def test_dict_reader(self): byte_stream = BytesIO(b"\n".join(INPUT_ROWS)) reader = tsv.dict_reader(byte_stream) row_dict = next(reader) assert row_dict == _expected_row(1) row_dict = next(reader) assert row_dict == _expected_row(2) with self.assertRaises(StopIteration): next(reader) assert reader.fieldnames == FIELD_NAMES reader.fieldnames = FIELD_NAMES[1:] assert reader.fieldnames == FIELD_NAMES[1:] def test_dict_reader_with_private_field(self): byte_stream = BytesIO(b"\n".join(INPUT_ROWS_WITH_PRIVATE_FIELD)) reader = tsv.dict_reader(byte_stream) row_dict = next(reader) assert row_dict == _expected_row(1, private_field=True) row_dict = next(reader) assert row_dict == _expected_row(2, private_field=True) with self.assertRaises(StopIteration): next(reader) assert set(reader.fieldnames) == set( _remove_private_fields(FIELD_NAMES_WITH_PRIVATE_FIELD) ) reader.fieldnames = FIELD_NAMES_WITH_PRIVATE_FIELD[1:] assert set(reader.fieldnames) == set( _remove_private_fields(FIELD_NAMES_WITH_PRIVATE_FIELD[1:]) ) def test_dict_reader_with_initial_fieldnames(self): byte_stream = BytesIO(b"\n".join(INPUT_ROWS) + b"\n") reader = tsv.dict_reader(byte_stream, fieldnames=FIELD_NAMES) row_dict = next(reader) assert row_dict == _expected_row(0) remaining_rows = list(reader) assert len(remaining_rows) == 2 def test_dict_writer(self): field_names = FIELD_NAMES[2:] value2 = b"\xe2\x8a\x97xxx.".decode("utf-8") byte_stream = BytesIO() writer = tsv.dict_writer(byte_stream, field_names, lineterminator="\n") writer.writeheader() writer.writerow({field_names[0]: 94022, field_names[1]: value2}) writer.writerow({field_names[0]: '["c"]', field_names[1]: value2.upper()}) data = byte_stream.getvalue() lines = data.split(b"\n") assert len(lines) == 4 assert lines[0] == b"\xe2\x82\xac:xyz\tmass (units.g)" assert lines[1] == b"94022\t" + value2.encode("utf-8") assert lines[2] == b'"[""c""]"\t' + value2.encode("utf-8").upper() assert lines[3] == b""