Source code for ecoli.library.test_parquet_emitter

import atexit
import os
import re
import tempfile
import shutil
import numpy as np
import polars as pl
import pytest
import time
import math
import datetime
from unittest.mock import Mock, patch
from concurrent.futures import ThreadPoolExecutor, Future
from queue import Queue

from ecoli.library.parquet_emitter import (
    json_to_parquet,
    np_dtype,
    flatten_dict,
    union_pl_dtypes,
    ParquetEmitter,
)


[docs] class TestHelperFunctions: def test_flatten_dict(self): # Simple dictionary assert flatten_dict({"a": 1, "b": 2}) == {"a": 1, "b": 2} # Nested dictionary assert flatten_dict({"a": {"b": 1, "c": 2}, "d": 3}) == { "a__b": 1, "a__c": 2, "d": 3, } # Deeply nested dictionary assert flatten_dict({"a": {"b": {"c": {"d": 1}}}, "e": 2}) == { "a__b__c__d": 1, "e": 2, } # Empty dictionary assert flatten_dict({}) == {} # Dictionary with mixed value types nested = flatten_dict({"a": [1, 2, 3], "b": {"c": np.array([4, 5, 6])}}) assert nested["a"] == [1, 2, 3] np.testing.assert_array_equal(nested["b__c"], np.array([4, 5, 6])) def test_np_dtype(self): # Basic types np_type = np_dtype(1.0, "float_field") assert np_type == np.float64 np_type = np_dtype(True, "bool_field") assert np_type == np.bool_ np_type = np_dtype("text", "string_field") assert np_type == np.dtypes.StringDType # Integer with different encodings np_type = np_dtype(42, "int_field") assert np_type == np.int64 np_type = np_dtype(42, "listeners__ribosome_data__mRNA_TU_index") assert np_type == np.uint16 np_type = np_dtype(42, "listeners__monomer_counts") assert np_type == np.uint32 # Arrays with various dimensions np_type = np_dtype(np.array([1, 2, 3]), "array1d_field") assert np_type == np.int64 np_type = np_dtype(np.array([[1, 2], [3, 4]]), "array2d_field") assert np_type == np.int64 # Empty arrays still have a dtype np_type = np_dtype(np.array([]), "empty_array_field") assert np_type == np.float64 # Raise error for empty lists to fall back to Polars serialization with pytest.raises(ValueError, match="empty_list_field has unsupported"): np_type = np_dtype([[], [], None], "empty_list_field") # Raise error for none to fall back to Polars serialization with pytest.raises(ValueError, match="none_field has unsupported"): np_type = np_dtype(None, "none_field") # Invalid types with pytest.raises(ValueError, match="complex_field has unsupported type"): np_dtype(complex(1, 2), "complex_field") def test_union_pl_dtypes(self): # Basic types with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes(pl.Int32, pl.Int64, "fail") with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes(pl.Float32, pl.String, "fail") # Nested types with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes(pl.List(pl.Int16), pl.List(pl.Int64), "nest") with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes(pl.List(pl.UInt16), pl.List(pl.String), "nest_fail") with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes( pl.List(pl.List(pl.UInt16)), pl.List(pl.String), "nest_fail" ) with pytest.raises( TypeError, match=re.escape("Incompatible inner types for field") ): union_pl_dtypes( pl.List(pl.UInt16), pl.List(pl.Array(pl.String, (1,))), "nest_fail" ) assert union_pl_dtypes( pl.List(pl.UInt16), pl.List(pl.Int64), "force_u32", pl.UInt32 ) == pl.List(pl.UInt32) # Forced types: a bit scary but we assume user knows what they are doing assert union_pl_dtypes(pl.Int16, pl.UInt8, "force_u16", pl.UInt16) == pl.UInt16 assert union_pl_dtypes(pl.UInt16, pl.Int64, "force_u32", pl.UInt32) == pl.UInt32 assert ( union_pl_dtypes(pl.UInt16, pl.String, "force_u32", pl.UInt32) == pl.UInt32 ) assert union_pl_dtypes( pl.List(pl.UInt16), pl.List(pl.String), "force_u32", pl.UInt32 ) == pl.List(pl.UInt32) assert union_pl_dtypes( pl.List(pl.UInt16), pl.List(pl.Int64), "force_u32", pl.UInt32 ) == pl.List(pl.UInt32) assert union_pl_dtypes( pl.Array(pl.UInt16, (1, 1)), pl.List(pl.List(pl.Int64)), "force_u16", pl.UInt16, ) == pl.List(pl.List(pl.UInt16)) # Null merge assert union_pl_dtypes(pl.Null, pl.Int64, "null_merge") == pl.Int64 assert union_pl_dtypes(pl.Null, pl.Float64, "force_u16", pl.UInt16) == pl.UInt16 assert union_pl_dtypes( pl.Null, pl.List(pl.Int64), "force_u16", pl.UInt16 ) == pl.List(pl.UInt16) assert union_pl_dtypes( pl.List(pl.Null), pl.List(pl.List(pl.Float32)), "null_merge" ) == pl.List(pl.List(pl.Float32)) assert union_pl_dtypes( pl.Array(pl.Null, (1, 1, 1)), pl.List(pl.Array(pl.Float32, (1, 1))), "null_merge", ) == pl.List(pl.List(pl.List(pl.Float32))) assert union_pl_dtypes( pl.List(pl.Null), pl.List(pl.String), "force_u16", pl.UInt16 ) == pl.List(pl.UInt16) assert union_pl_dtypes( pl.List(pl.Null), pl.List(pl.List(pl.Int32)), "force_u32", pl.UInt32 ) == pl.List(pl.List(pl.UInt32)) assert union_pl_dtypes( pl.List(pl.Null), pl.List(pl.List(pl.List(pl.Int32))), "null_merge" ) == pl.List(pl.List(pl.List(pl.Int32))) assert union_pl_dtypes( pl.List(pl.Null), pl.List(pl.List(pl.List(pl.Int32))), "force_u32", pl.UInt32, ) == pl.List(pl.List(pl.List(pl.UInt32)))
[docs] def compare_nested(a: list, b: list) -> bool: """ Compare two lists for equality, including special handling for NaN. """ if isinstance(a, list) and isinstance(b, list): if len(a) != len(b): return False return all(compare_nested(a[i], b[i]) for i in range(len(a))) if a != b: try: return math.isnan(a) and math.isnan(b) except TypeError: return False return True
[docs] class TestParquetEmitter:
[docs] @pytest.fixture def temp_dir(self): """Create a temporary directory for testing.""" tmp = tempfile.mkdtemp() yield tmp shutil.rmtree(tmp)
def test_initialization(self, temp_dir): """Test ParquetEmitter initialization with different configs.""" # Test with out_dir emitter = ParquetEmitter({"out_dir": temp_dir}) emitter.experiment_id = "test_exp" emitter.partitioning_path = "path/to/output" assert emitter.out_uri == os.path.abspath(temp_dir) assert emitter.batch_size == 400 # Test with out_uri and custom batch size emitter = ParquetEmitter({"out_uri": "gs://bucket/path", "batch_size": 100}) emitter.experiment_id = "test_exp" emitter.partitioning_path = "path/to/output" assert emitter.out_uri == "gs://bucket/path" assert emitter.batch_size == 100 # GCSFS uses asyncio and cannot schedule futures after interpreter shutdown # so _finalize hook with raise an error that is ignored. Here we just # unregister the hook to avoid cluttering the pytest log atexit.unregister(emitter._finalize) def test_emit_configuration(self, temp_dir): """Test emitting configuration data.""" emitter = ParquetEmitter({"out_dir": temp_dir}) # Setup ThreadPoolExecutor mock future = Future() future.set_result(None) emitter.executor.submit = Mock(return_value=future) # Test with basic config data config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 100, "agent_id": "1", "nested": {"value": 42}, "metadata": {"meta1": "value1", "meta2": "value2"}, }, } emitter.emit(config_data) # Verify partitioning path assert emitter.experiment_id == "test_exp" assert "experiment_id=test_exp" in emitter.partitioning_path assert "variant=1" in emitter.partitioning_path # Verify json_to_parquet was called assert emitter.executor.submit.called args, _ = emitter.executor.submit.call_args assert args[0] == json_to_parquet def test_emit_simulation_data(self, temp_dir): """Test emitting simulation data with various types.""" emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 2}) # Configuration emit to initialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) emitter.last_batch_future.result() # Create test data with various types sim_data1 = { "table": "simulation", "data": { "time": 1.0, "agents": { "agent1": { "int_field": 42, "float_field": 3.14, "bool_field": True, "string_field": "hello", "array_field": np.array([1, 2, 3]), "nested": {"value": 100}, } }, }, } # First emit emitter.emit(sim_data1) assert emitter.num_emits == 1 emitter.last_batch_future.result() # Check that fields were properly encoded assert "int_field" in emitter.buffered_emits assert emitter.buffered_emits["int_field"][0] == 42 assert "float_field" in emitter.buffered_emits assert emitter.buffered_emits["float_field"][0] == 3.14 assert "bool_field" in emitter.buffered_emits assert emitter.buffered_emits["bool_field"][0] assert "string_field" in emitter.buffered_emits assert emitter.buffered_emits["string_field"][0] == "hello" assert "array_field" in emitter.buffered_emits np.testing.assert_array_equal( emitter.buffered_emits["array_field"][0], np.array([1, 2, 3]) ) assert "nested__value" in emitter.buffered_emits assert emitter.buffered_emits["nested__value"][0] == 100 # Second emit to trigger batch writing emitter.emit(sim_data1) assert emitter.num_emits == 2 emitter.last_batch_future.result() # Check output t = pl.read_parquet( os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, "*.pq", ) ) assert t["int_field"].to_list() == [42] * 2 assert t["float_field"].to_list() == [3.14] * 2 assert t["bool_field"].to_list() == [True] * 2 assert all(t["string_field"] == ["hello"] * 2) np.testing.assert_array_equal(t["array_field"].to_list(), [[1, 2, 3]] * 2) assert all(t["nested__value"] == [100] * 2) assert emitter.buffered_emits == {} def test_variable_length_arrays(self, temp_dir): """Test handling arrays with changing dimensions.""" emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 3}) # Configuration emit to initialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) emitter.last_batch_future.result() sim_data1 = { "table": "simulation", "data": { "time": 1.0, "agents": { "agent1": { "dynamic_array": np.array([1, 2, 3]), "ragged_nd": [[1, 2, 3], [1, 2], [1]], } }, }, } emitter.emit(sim_data1) emitter.last_batch_future.result() # Verify arrays were stored correctly assert "dynamic_array" in emitter.buffered_emits assert emitter.buffered_emits["dynamic_array"].shape[1:] == (3,) assert "ragged_nd" in emitter.buffered_emits assert all( emitter.buffered_emits["ragged_nd"][0] == pl.Series([[1, 2, 3], [1, 2], [1]]) ) # Second emit with different shapes sim_data2 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "dynamic_array": np.array([4, 5, 6, 7]), "ragged_nd": [[1], [1, 2], [1, 2, 3]], } }, }, } emitter.emit(sim_data2) emitter.last_batch_future.result() # Verify conversion to variable length type assert isinstance(emitter.buffered_emits["dynamic_array"], list) assert emitter.buffered_emits["dynamic_array"][0] == [1, 2, 3] assert emitter.buffered_emits["dynamic_array"][1].to_list() == [4, 5, 6, 7] assert all( emitter.buffered_emits["ragged_nd"][1] == pl.Series([[1], [1, 2], [1, 2, 3]]) ) # Write to Parquet and check output emitter.emit(sim_data2) emitter.last_batch_future.result() t = pl.read_parquet( os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, "*.pq", ) ) assert t["dynamic_array"].to_list() == [ [1, 2, 3], [4, 5, 6, 7], [4, 5, 6, 7], ] assert t["ragged_nd"].to_list() == [ [[1, 2, 3], [1, 2], [1]], [[1], [1, 2], [1, 2, 3]], [[1], [1, 2], [1, 2, 3]], ] def test_extreme_data_types(self, temp_dir): """Test with extreme data types and edge cases.""" emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 2}) # Create test data with extreme values and special cases sim_data = { "table": "configuration", "data": { "time": 1.0, "experiment_id": "test_exp", "variant": 1, "lineage_seed": 100, "agent_id": "1", "agents": { "agent1": { # Extreme values "max_int": np.iinfo(np.int64).max, "min_int": np.iinfo(np.int64).min, "max_float": np.finfo(np.float64).max, "tiny_float": 1e-100, "nan_value": np.nan, "inf_value": np.inf, # Special cases "empty_array": np.array([]), "zero_dim_array": np.array(42), "unicode_string": "Unicode: 日本語", "very_long_string": "x" * 10000, # Nested structures "deep_nesting": {"level1": {"level2": {"level3": [1, 2, 3]}}}, "ragged_nullable": [None, [1, 2], [None, 1, 2, 3]], # Python datetime objects "datetime_list": [ datetime.datetime(2000, 12, 25), datetime.datetime(2001, 4, 1, 12), datetime.datetime(2002, 1, 1, 0, 1), datetime.datetime(2003, 2, 14, 5, 5, 5), datetime.datetime(2003, 7, 4, 7, 8, 9, 10), ], "time_list": [ datetime.time(1), datetime.time(2, 3), datetime.time(4, 5, 6), datetime.time(7, 8, 9, 10), ], "datetime": datetime.datetime(1776, 7, 4), # Test bytes "npbytes": np.array([b"test bytes"])[0], "pybytes": b"test bytes", "npbytes_list": np.array([b"test1", b"test2"]), "pybytes_list": [b"test1", b"test2"], } }, }, } # Try to emit the extreme data # First configuration emit to set variables emitter.emit(sim_data) emitter.last_batch_future.result() # Then simulation emit sim_data["table"] = "simulation" emitter.emit(sim_data) emitter.last_batch_future.result() assert emitter.num_emits == 1 # Verify fields were processed assert "max_int" in emitter.buffered_emits assert emitter.buffered_emits["max_int"][0] == np.iinfo(np.int64).max assert "min_int" in emitter.buffered_emits assert emitter.buffered_emits["min_int"][0] == np.iinfo(np.int64).min assert "max_float" in emitter.buffered_emits assert emitter.buffered_emits["max_float"][0] == np.finfo(np.float64).max assert "tiny_float" in emitter.buffered_emits assert emitter.buffered_emits["tiny_float"][0] == 1e-100 assert "nan_value" in emitter.buffered_emits assert np.isnan(emitter.buffered_emits["nan_value"][0]) assert "unicode_string" in emitter.buffered_emits assert emitter.buffered_emits["unicode_string"][0] == "Unicode: 日本語" assert "deep_nesting__level1__level2__level3" in emitter.buffered_emits assert np.array_equal( emitter.buffered_emits["deep_nesting__level1__level2__level3"][0], np.array([1, 2, 3], dtype=int), ) sim_data_2 = { "table": "simulation", "data": { "time": 2.0, "experiment_id": "test_exp", "variant": 1, "lineage_seed": 100, "agent_id": "1", "agents": { "agent1": { # Shuffle extreme values "max_int": np.iinfo(np.int64).min, "min_int": np.iinfo(np.int64).max, "max_float": np.finfo(np.float64).min, "tiny_float": 1e100, "nan_value": np.inf, "inf_value": np.nan, # More special cases "empty_array": np.array([np.nan]), "zero_dim_array": np.array(0), "unicode_string": "Unicode: 日本語 再び", "very_long_string": "x" * 100000, # Nested structures "deep_nesting": { "level1": { # Add a new field mid-batch "level2": {"level3": [1, 2, 3, 4], "level4": [5, 6, 7]} } }, "ragged_nullable": [ [1, 3, 4], [None, None, 1], None, [1, 2, None], ], # Python datetime objects "datetime_list": [datetime.datetime(2000, 12, 25)], "time_list": [datetime.time(1)], "datetime": datetime.datetime(2000, 12, 25), # Test bytes "npbytes": np.array([b"short"])[0], "pybytes": b"short", "npbytes_list": np.array([b"much longer bytestring", b"1"]), "pybytes_list": [b"much longer bytestring", b"1"], } }, }, } emitter.emit(sim_data_2) emitter.last_batch_future.result() assert emitter.buffered_emits == {} out_path = os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, f"{emitter.num_emits}.pq", ) output_pl = pl.read_parquet(out_path) output_data = { "max_int": [np.iinfo(np.int64).max, np.iinfo(np.int64).min], "min_int": [np.iinfo(np.int64).min, np.iinfo(np.int64).max], "max_float": [np.finfo(np.float64).max, np.finfo(np.float64).min], "tiny_float": [1e-100, 1e100], "nan_value": [np.nan, np.inf], "inf_value": [np.inf, np.nan], "empty_array": [[], [np.nan]], "zero_dim_array": [42, 0], "unicode_string": ["Unicode: 日本語", "Unicode: 日本語 再び"], "very_long_string": ["x" * 10000, "x" * 100000], "deep_nesting__level1__level2__level3": [[1, 2, 3], [1, 2, 3, 4]], "deep_nesting__level1__level2__level4": [None, [5, 6, 7]], "ragged_nullable": [ [None, [1, 2], [None, 1, 2, 3]], [[1, 3, 4], [None, None, 1], None, [1, 2, None]], ], "datetime_list": [ [ datetime.datetime(2000, 12, 25), datetime.datetime(2001, 4, 1, 12), datetime.datetime(2002, 1, 1, 0, 1), datetime.datetime(2003, 2, 14, 5, 5, 5), datetime.datetime(2003, 7, 4, 7, 8, 9, 10), ], [datetime.datetime(2000, 12, 25)], ], "time_list": [ [ datetime.time(1), datetime.time(2, 3), datetime.time(4, 5, 6), datetime.time(7, 8, 9, 10), ], [datetime.time(1)], ], "datetime": [ datetime.datetime(1776, 7, 4), datetime.datetime(2000, 12, 25), ], # Test bytes "npbytes": [b"test bytes", b"short"], "pybytes": [b"test bytes", b"short"], # Note the truncation for the NumPy bytes array "npbytes_list": [[b"test1", b"test2"], [b"much\x20", b"1"]], "pybytes_list": [[b"test1", b"test2"], [b"much longer bytestring", b"1"]], } for key, value in output_data.items(): assert compare_nested(output_pl[key].to_list(), value), ( f"Mismatch in field {key}" ) def test_finalize(self, temp_dir): """Test _finalize method that handles remaining data.""" emitter = ParquetEmitter({"out_dir": temp_dir}) emitter.experiment_id = "test_exp" emitter.partitioning_path = "path/to/output" # Add data to buffered_emits emitter.buffered_emits = { "field1": np.zeros((emitter.batch_size,), dtype=np.int64), "field2": np.zeros((emitter.batch_size,), dtype=np.float64), } emitter.pl_types = { "field1": pl.Int64, "field2": pl.Float64, } emitter.buffered_emits["field1"][0] = 10 emitter.buffered_emits["field2"][0] = 20.5 emitter.num_emits = 1 # Only one emit happened # Mock json_to_parquet with patch( "ecoli.library.parquet_emitter.json_to_parquet" ) as mock_json_to_parquet: # Test _finalize emitter._finalize() # Verify json_to_parquet was called with truncated data mock_json_to_parquet.assert_called_once() args, _ = mock_json_to_parquet.call_args assert args[0]["field1"].shape[0] == 1 # Only 1 item should remain assert args[0]["field1"][0] == 10 assert args[0]["field2"][0] == 20.5 # Test success flag emitter.success = True emitter._finalize() assert os.path.exists( os.path.join( emitter.out_uri, emitter.experiment_id, "success", emitter.partitioning_path, "s.pq", ) ) def test_multiple_agents(self, temp_dir): emitter = ParquetEmitter({"out_dir": temp_dir}) emitter.experiment_id = "test_exp" emitter.partitioning_path = "path/to/output" # Create data with multiple agents sim_data = { "table": "simulation", "data": { "time": 1.0, "agents": {"agent1": {"field1": 10}, "agent2": {"field1": 20}}, }, } # Should return early without processing emitter.emit(sim_data) assert emitter.num_emits == 0 assert emitter.buffered_emits == {} def test_batch_processing(self, temp_dir): """Test multiple emits and batch processing.""" # Small batch size for testing emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 3}) # Configuration emit to initialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) emitter.last_batch_future.result() # Create simulation data sim_data = { "table": "simulation", "data": {"time": 1.0, "agents": {"agent1": {"value": 10}}}, } # Emit 4 times (should trigger batch processing after 3) for i in range(4): sim_data["data"]["time"] = float(i) sim_data["data"]["agents"]["agent1"]["value"] = i * 10 emitter.emit(sim_data) emitter.last_batch_future.result() # Verify batch was processed assert emitter.num_emits == 4 # One value should remain in buffer assert len(emitter.buffered_emits["value"]) == emitter.batch_size assert emitter.buffered_emits["value"][0] == 30 # Last value (3*10)
[docs] class TestParquetEmitterEdgeCases:
[docs] @pytest.fixture def temp_dir(self): """Create a temporary directory for testing.""" tmp = tempfile.mkdtemp() yield tmp shutil.rmtree(tmp)
@patch("ecoli.library.parquet_emitter.ThreadPoolExecutor") def test_multithreaded_buffer_clearing(self, mock_executor_class, temp_dir): """ Test to verify that clearing buffers after submitting to ThreadPoolExecutor doesn't cause race conditions with the worker thread. """ # Create a real executor and a queue to track what's passed to json_to_parquet real_executor = ThreadPoolExecutor(max_workers=1) data_capture_queue = Queue() # Setup a custom submit function that will capture the dictionaries # being passed to json_to_parquet before they can be cleared def capture_submit(func, *args, **kwargs): # Make a deep copy of the dictionaries to capture their state emit_dict_copy = { k: v.copy() if hasattr(v, "copy") else v for k, v in args[0].items() } pl_types_copy = args[2].copy() # Put the copies in our queue for later inspection data_capture_queue.put((emit_dict_copy, pl_types_copy)) # Create a future that will complete after a delay to simulate # the worker thread taking time to process future = Future() # Submit the real work to a real executor def delayed_execution(): time.sleep(0.1) # Delay to ensure main thread moves on result = func(*args, **kwargs) future.set_result(result) return result real_executor.submit(delayed_execution) return future # Configure our mock executor to use the capture_submit function mock_executor = Mock() mock_executor.submit.side_effect = capture_submit mock_executor_class.return_value = mock_executor # Initialize the emitter with a small batch size emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 2}) # Configuration emit to initialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) emitter.last_batch_future.result() # First emit with simple data sim_data1 = { "table": "simulation", "data": { "time": 1.0, "agents": {"agent1": {"field1": np.array([1, 2, 3]), "field2": 42}}, }, } emitter.emit(sim_data1) # Second emit that will trigger batch writing # This data has the same structure sim_data2 = { "table": "simulation", "data": { "time": 2.0, "agents": {"agent1": {"field1": np.array([4, 5, 6]), "field2": 43}}, }, } emitter.emit(sim_data2) # At this point, the batch should have been submitted to the executor # and the buffers cleared in the main thread # Now immediately emit data with a bunch of additional fields # This should not cause any issues even though the previous data is still # being processed by the worker thread sim_data3 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { f"field{i}": np.array([7, 8, 9, 10]) for i in range(1, 10) } }, }, } emitter.emit(sim_data3) # Verify the captured data matches what was in buffers before clearing captured_data, captured_types = data_capture_queue.get(timeout=1) assert captured_data["experiment_id"] == "test_exp" assert captured_data["variant"] == 1 assert captured_data["lineage_seed"] == 1 assert captured_types == { "experiment_id": pl.String, "variant": pl.Int64, "lineage_seed": pl.Int64, "agent_id": pl.String, "time": pl.Float64, } captured_data, captured_types = data_capture_queue.get(timeout=1) assert len(captured_data["field1"]) == emitter.batch_size assert captured_data["field1"][0].tolist() == [1, 2, 3] assert captured_data["field1"][1].tolist() == [4, 5, 6] assert captured_types == { "time": pl.Float64, "field1": pl.List(pl.Int64), "field2": pl.Int64, } # Changed type for field2 to list so should fail with pytest.raises(pl.exceptions.InvalidOperationError): emitter._finalize() atexit.unregister(emitter._finalize) # Cleanup the real executor real_executor.shutdown() def test_variable_shape_detection_at_boundaries(self, temp_dir): """ Test the fixed vs variable shape field detection logic specifically at the boundary points (start of sim, after disk write). """ # Use a small batch size to quickly hit the boundary emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 3}) # Setup: Emit configuration data to intitialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) # PHASE 1: Test at start of sim (first emit) # Start with a variable-shape field, the code should assume it's fixed-shape sim_data1 = { "table": "simulation", "data": { "time": 1.0, "agents": { "agent1": { "dynamic_array": np.array([1, 2, 3]), "subtle_array": np.array([[1], [2], [3]]), } }, }, } emitter.emit(sim_data1) # Verify it was stored as a fixed-shape numpy array assert isinstance(emitter.buffered_emits["dynamic_array"], np.ndarray) assert emitter.buffered_emits["dynamic_array"].shape[1:] == (3,) # PHASE 2: Immediately send different shape data # This should trigger conversion to variable-shape sim_data2 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "dynamic_array": np.array([1, 2, 3, 4, 5]), "subtle_array": np.array([[1], [2], [3]]), } }, }, } emitter.emit(sim_data2) # Verify it was converted to a list assert isinstance(emitter.buffered_emits["dynamic_array"], list) # PHASE 3: Send one more emit to trigger batch writing sim_data3 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { "dynamic_array": np.array([1, 2, 3, 4, 5, 6, 7]), "subtle_array": np.array([[1], [2], [3]]), } }, }, } emitter.emit(sim_data3) # PHASE 4: subtle_array changed shape but we are at the start of a new batch. # ParquetEmitter is designed to assume all arrays are fixed-shape until # proven otherwise (better performance and memory usage). The distinction # between a fixed-shape and variable-shape array is purely an implmentation # detail of the buffering logic. In the end, both are written to Parquet # as variable-shape arrays. As such, even though subtle_array as a whole # is variable in shape, as long as it takes on a consistent shape within # a batch of emits, ParquetEmitter is more than happy to treat it as # fixed-shape and reap the performance benefits. sim_data4 = { "table": "simulation", "data": { "time": 4.0, "agents": { "agent1": { "dynamic_array": np.array([1]), "subtle_array": np.array([[1], [2], [3], [4], [5]]), } }, }, } emitter.emit(sim_data4) # Should be instantiated as variable length from last batch assert isinstance(emitter.buffered_emits["dynamic_array"], list) assert emitter.buffered_emits["dynamic_array"][0].to_list() == [1] # Should be treated as fixed-shape still assert isinstance(emitter.buffered_emits["subtle_array"], np.ndarray) assert emitter.buffered_emits["subtle_array"].shape[1:] == (5, 1) # Trigger another Parquet write and read them both back to confirm emitter.emit(sim_data4) emitter.emit(sim_data4) emitter.last_batch_future.result() t = pl.read_parquet( os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, "*.pq", ) ) assert t["dynamic_array"].to_list() == [ [1, 2, 3], [1, 2, 3, 4, 5], [1, 2, 3, 4, 5, 6, 7], [1], [1], [1], ] assert t["subtle_array"].to_list() == [ [[1], [2], [3]], [[1], [2], [3]], [[1], [2], [3]], [[1], [2], [3], [4], [5]], [[1], [2], [3], [4], [5]], [[1], [2], [3], [4], [5]], ] def test_expected_failures(self, temp_dir): """ Test a few cases that are expected to fail. """ # Use a small batch size to quickly hit the boundary emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 3}) # Setup: Emit configuration data to intitialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) # Test initial null/empty list (null types), empty NumPy array (typed) sim_data1 = { "table": "simulation", "data": { "time": 1.0, "agents": { "agent1": { "init_null": None, "init_empty_list": [], "init_empty_array": np.array([]), # Setup initial value for later test "3d_array": np.random.rand(2, 3, 4), "another_3d_array": np.random.rand(2, 3, 4), } }, }, } emitter.emit(sim_data1) assert isinstance(emitter.buffered_emits["init_null"], list) assert emitter.buffered_emits["init_null"][0] is None assert emitter.pl_types["init_null"] == pl.Null assert isinstance(emitter.buffered_emits["init_empty_list"], list) assert emitter.buffered_emits["init_empty_list"][0].dtype == pl.Null assert emitter.pl_types["init_empty_list"] == pl.List(pl.Null) assert isinstance(emitter.buffered_emits["init_empty_array"], np.ndarray) assert emitter.buffered_emits["init_empty_array"].dtype == np.float64 assert emitter.pl_types["init_empty_array"] == pl.List(pl.Float64) # Try adding another dimension to empty array sim_data2 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "init_empty_array": [[]], } }, }, } with pytest.raises( TypeError, match=re.escape( "Incompatible inner types for field init_empty_array: Float64 and List(Null)." ), ): emitter.emit(sim_data2) # Try adding 2D array with non-nulls to 3D array field sim_data3 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "3d_array": [[1.0, 2.0]], } }, }, } with pytest.raises( TypeError, match=re.escape( "Incompatible inner types for field 3d_array: List(Float64) and Float64." ), ): emitter.emit(sim_data3) # Try NumPy unsupported datetime64 resolution sim_data4 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { "datetime_arr": np.array( [ np.datetime64("2023-01-01T01"), np.datetime64("2023-01-02T02:02"), ] ), } }, }, } with pytest.raises(ValueError, match="incorrect NumPy datetime resolution"): emitter.emit(sim_data4) # Try NumPy void sim_data5 = { "table": "simulation", "data": { "time": 4.0, "agents": { "agent1": { "npvoid": np.array([b"test bytes"], dtype=np.void)[0], } }, }, } with pytest.raises(ValueError): emitter.emit(sim_data5) # Try NumPy datetime64 in Python list sim_data6 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { "mixed_datetime": [ np.datetime64("2023-01-01"), np.datetime64("2023-01-02"), ], } }, }, } with pytest.raises( TypeError, match=re.escape("not yet implemented: Nested object types") ): emitter.emit(sim_data6) # Try list of NumPy arrays sim_data7 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { "mixed_nested": [ np.array([1, 2, 3]), np.array([4, 5]), [6], None, ], } }, }, } with pytest.raises( TypeError, match=re.escape("failed to determine supertype of object and list[i64]"), ): emitter.emit(sim_data7) # Try shape-varying 3D NumPy array # Polars can gracefully handle nested Python lists wihout explicit type # information, but not NumPy arrays. For example: # WORKS: pl.Series([[[1, 2], [3, 4]]]) # FAILS: pl.Series([np.array([[1, 2], [3, 4]])]) # This is thankfully not an issue for 1D NumPy arrays, which are the # only type of ragged NumPy arrays in vEcoli. I do not think it # makes much sense to have a ND NumPy array field with variable # shape anyways as it would still be constrained to a data cube. # Nested Python lists would let you deviate from a strict data cube # and even have null values, if desired. sim_data8 = { "table": "simulation", "data": { "time": 3.0, "agents": { "agent1": { "another_3d_array": np.zeros((10, 10, 10)), } }, }, } with pytest.raises( ValueError, match=re.escape("cannot parse numpy data type dtype('O')"), ): emitter.emit(sim_data8) def test_nested_nullable(self, temp_dir): """Test handling nullable nested types that increase in depth.""" emitter = ParquetEmitter({"out_dir": temp_dir, "batch_size": 4}) # Configuration emit to initialize variables config_data = { "table": "configuration", "data": { "experiment_id": "test_exp", "variant": 1, "lineage_seed": 1, "agent_id": "1", }, } emitter.emit(config_data) emitter.last_batch_future.result() sim_data1 = { "table": "simulation", "data": { "time": 0.0, "agents": { "agent1": { "nullable_nested": None, } }, }, } emitter.emit(sim_data1) emitter.last_batch_future.result() # Verify arrays were stored correctly assert isinstance(emitter.buffered_emits["nullable_nested"], list) assert emitter.buffered_emits["nullable_nested"][0] is None assert emitter.pl_types["nullable_nested"] == pl.Null sim_data2 = { "table": "simulation", "data": { "time": 1.0, "agents": { "agent1": { "nullable_nested": [None, None], } }, }, } emitter.emit(sim_data2) emitter.last_batch_future.result() # Verify arrays were stored correctly assert isinstance(emitter.buffered_emits["nullable_nested"], list) assert emitter.buffered_emits["nullable_nested"][1].dtype == pl.Null assert emitter.pl_types["nullable_nested"] == pl.List(pl.Null) # One level deeper sim_data3 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "nullable_nested": [None, [None], [], [None, None], []], } }, }, } emitter.emit(sim_data3) emitter.last_batch_future.result() # Verify arrays were stored correctly assert emitter.buffered_emits["nullable_nested"][2].dtype == pl.List(pl.Null) assert emitter.pl_types["nullable_nested"] == pl.List(pl.List(pl.Null)) # One level deeper and defines non-null values sim_data4 = { "table": "simulation", "data": { "time": 2.0, "agents": { "agent1": { "nullable_nested": [ [], [["wow", "this", "is"], [], ["deep"], None], None, [[], None], ], } }, }, } emitter.emit(sim_data4) emitter.last_batch_future.result() # Check output t = pl.read_parquet( os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, "4.pq", ) ) assert t["nullable_nested"].to_list() == [ None, [None, None], [None, [None], [], [None, None], []], [[], [["wow", "this", "is"], [], ["deep"], None], None, [[], None]], ] emitter.emit(sim_data1) emitter.last_batch_future.result() # Should remember that we fully defined the type before assert isinstance(emitter.buffered_emits["nullable_nested"], list) assert emitter.buffered_emits["nullable_nested"][0] is None assert emitter.pl_types["nullable_nested"] == pl.List( pl.List(pl.List(pl.String)) ) # Emit until batch size and check output for _ in range(3): emitter.emit(sim_data1) emitter.last_batch_future.result() # Check output t = pl.read_parquet( os.path.join( emitter.out_uri, emitter.experiment_id, "history", emitter.partitioning_path, "8.pq", ) ) assert t["nullable_nested"].to_list() == [None] * 4 assert t["nullable_nested"].dtype == pl.List(pl.List(pl.List(pl.String)))