Skip to content

test_write

Comprehensive tests for gwframe write functionality.

TestAdcWrite

Tests for writing ADC (FrAdcData) channels.

test_adc_framewriter

test_adc_framewriter(tmp_path)

Test writing ADC data through FrameWriter.

Source code in gwframe/tests/test_write.py
def test_adc_framewriter(self, tmp_path):
    """Test writing ADC data through FrameWriter."""
    tmp_file = tmp_path / "adc_writer.gwf"

    with gwframe.FrameWriter(str(tmp_file)) as writer:
        for i in range(3):
            data = np.random.randn(1024).astype(np.float64)
            frame = gwframe.Frame(start=1234567890.0 + i, duration=1.0, name="H1")
            frame.add_channel(
                "H1:TEST_ADC",
                data,
                sample_rate=1024.0,
                channel_type="adc",
            )
            writer.write_frame(frame)

    # Verify all frames
    for i in range(3):
        result = gwframe.read(str(tmp_file), "H1:TEST_ADC", frame_index=i)
        assert len(result.array) == 1024
        assert result.type == "adc"
        assert result.start == 1234567890.0 + i

test_adc_integer_dtypes

test_adc_integer_dtypes(tmp_path)

Test ADC write with integer data types (common for real ADC data).

Source code in gwframe/tests/test_write.py
def test_adc_integer_dtypes(self, tmp_path):
    """Test ADC write with integer data types (common for real ADC data)."""
    tmp_path / "adc_int.gwf"

    dtypes_to_test = [np.int16, np.int32, np.float32, np.float64]

    for i, dtype in enumerate(dtypes_to_test):
        data = np.arange(256, dtype=dtype)
        frame = gwframe.Frame(start=1234567890.0 + i, duration=1.0, name="H1")
        channel = f"H1:ADC_{dtype.__name__}"
        frame.add_channel(channel, data, sample_rate=256.0, channel_type="adc")

        outfile = tmp_path / f"adc_{dtype.__name__}.gwf"
        frame.write(str(outfile))

        result = gwframe.read(str(outfile), channel)
        np.testing.assert_array_equal(data, result.array)
        assert result.type == "adc"

test_adc_mixed_with_proc

test_adc_mixed_with_proc(tmp_path)

Test writing both proc and ADC channels in the same frame.

Source code in gwframe/tests/test_write.py
def test_adc_mixed_with_proc(self, tmp_path):
    """Test writing both proc and ADC channels in the same frame."""
    tmp_file = tmp_path / "mixed.gwf"
    proc_data = np.random.randn(16384).astype(np.float64)
    adc_data = np.random.randn(256).astype(np.float32)

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
    frame.add_channel(
        "H1:STRAIN", proc_data, sample_rate=16384.0, channel_type="proc"
    )
    frame.add_channel("H1:AUX_ADC", adc_data, sample_rate=256.0, channel_type="adc")
    frame.write(str(tmp_file))

    result = gwframe.read(str(tmp_file), channels=None)
    assert len(result) == 2
    np.testing.assert_array_equal(proc_data, result["H1:STRAIN"].array)
    np.testing.assert_array_equal(adc_data, result["H1:AUX_ADC"].array)
    assert result["H1:STRAIN"].type == "proc"
    assert result["H1:AUX_ADC"].type == "adc"

test_adc_read_frames_roundtrip

test_adc_read_frames_roundtrip(tmp_path)

Test that read_frames preserves ADC channel type on round-trip.

Source code in gwframe/tests/test_write.py
def test_adc_read_frames_roundtrip(self, tmp_path):
    """Test that read_frames preserves ADC channel type on round-trip."""
    tmp_file = tmp_path / "adc_roundtrip.gwf"
    data = np.random.randn(1024).astype(np.float64)

    # Write
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
    frame.add_channel("H1:TEST_ADC", data, sample_rate=1024.0, channel_type="adc")
    frame.write(str(tmp_file))

    # Round-trip via read_frames
    output_file = tmp_path / "adc_roundtrip_out.gwf"
    with gwframe.FrameWriter(str(output_file)) as writer:
        for f in gwframe.read_frames(str(tmp_file)):
            writer.write_frame(f)

    result = gwframe.read(str(output_file), "H1:TEST_ADC")
    np.testing.assert_array_equal(data, result.array)
    assert result.type == "adc"

test_adc_with_compression

test_adc_with_compression(tmp_path)

Test ADC data write with different compression schemes.

Source code in gwframe/tests/test_write.py
def test_adc_with_compression(self, tmp_path):
    """Test ADC data write with different compression schemes."""
    data = np.random.randn(4096).astype(np.float64)

    for compression in [Compression.RAW, Compression.GZIP]:
        frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
        frame.add_channel(
            "H1:TEST_ADC", data, sample_rate=4096.0, channel_type="adc"
        )

        outfile = tmp_path / f"adc_comp_{compression}.gwf"
        frame.write(str(outfile), compression=compression)

        result = gwframe.read(str(outfile), "H1:TEST_ADC")
        np.testing.assert_array_equal(data, result.array)

test_adc_write_bytes_roundtrip

test_adc_write_bytes_roundtrip()

Test writing ADC data to bytes and reading it back.

Source code in gwframe/tests/test_write.py
def test_adc_write_bytes_roundtrip(self):
    """Test writing ADC data to bytes and reading it back."""
    data = np.random.randn(16384).astype(np.float64)

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
    frame.add_channel("H1:TEST_ADC", data, sample_rate=16384.0, channel_type="adc")
    gwf_bytes = frame.write_bytes()

    result = gwframe.read_bytes(gwf_bytes, "H1:TEST_ADC")
    np.testing.assert_array_equal(data, result.array)
    assert result.type == "adc"
    assert result.sample_rate == 16384.0

test_adc_write_file_roundtrip

test_adc_write_file_roundtrip(tmp_path)

Test writing ADC data to file and reading it back.

Source code in gwframe/tests/test_write.py
def test_adc_write_file_roundtrip(self, tmp_path):
    """Test writing ADC data to file and reading it back."""
    tmp_file = tmp_path / "adc.gwf"
    data = np.random.randn(16384).astype(np.float64)

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
    frame.add_channel("H1:TEST_ADC", data, sample_rate=16384.0, channel_type="adc")
    frame.write(str(tmp_file))

    result = gwframe.read(str(tmp_file), "H1:TEST_ADC")
    np.testing.assert_array_equal(data, result.array)
    assert result.type == "adc"
    assert result.sample_rate == 16384.0
    assert result.start == 1234567890.0

test_adc_write_multiple_channels

test_adc_write_multiple_channels(tmp_path)

Test writing multiple ADC channels to a single frame.

Source code in gwframe/tests/test_write.py
def test_adc_write_multiple_channels(self, tmp_path):
    """Test writing multiple ADC channels to a single frame."""
    tmp_file = tmp_path / "multi_adc.gwf"
    data1 = np.random.randn(1024).astype(np.float32)
    data2 = np.random.randn(256).astype(np.float64)

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="H1")
    frame.add_channel("H1:ADC_FAST", data1, sample_rate=1024.0, channel_type="adc")
    frame.add_channel("H1:ADC_SLOW", data2, sample_rate=256.0, channel_type="adc")
    frame.write(str(tmp_file))

    result = gwframe.read(str(tmp_file), channels=None)
    assert len(result) == 2
    np.testing.assert_array_equal(data1, result["H1:ADC_FAST"].array)
    np.testing.assert_array_equal(data2, result["H1:ADC_SLOW"].array)
    assert result["H1:ADC_FAST"].sample_rate == 1024.0
    assert result["H1:ADC_SLOW"].sample_rate == 256.0

TestCompression

Tests for compression modes and settings.

test_compression_level

test_compression_level(tmp_path)

Test compression with different compression levels.

Source code in gwframe/tests/test_write.py
def test_compression_level(self, tmp_path):
    """Test compression with different compression levels."""
    tmp_file = tmp_path / "test_level.gwf"
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    original_data = np.random.randn(n_samples)

    # Write with GZIP compression level 9
    frame = gwframe.Frame(start=t0, duration=1.0, name="L1", run=1)
    frame.add_channel(
        "L1:TEST", original_data, sample_rate=sample_rate, unit="strain"
    )
    frame.write(str(tmp_file), compression=Compression.GZIP)

    # Read back and verify
    result = gwframe.read(str(tmp_file), "L1:TEST")
    assert np.allclose(original_data, result.array)

test_compression_modes_exposed

test_compression_modes_exposed()

Test that all compression modes are exposed.

Source code in gwframe/tests/test_write.py
def test_compression_modes_exposed(self):
    """Test that all compression modes are exposed."""
    required_modes = [
        "RAW",
        "GZIP",
        "DIFF_GZIP",
        "ZERO_SUPPRESS_WORD_2",
        "ZERO_SUPPRESS_WORD_4",
        "ZERO_SUPPRESS_WORD_8",
        "ZERO_SUPPRESS_OTHERWISE_GZIP",
        "BEST_COMPRESSION",
        # Aliases
        "ZERO_SUPPRESS_SHORT",
        "ZERO_SUPPRESS_INT_FLOAT",
    ]

    for mode in required_modes:
        assert hasattr(Compression, mode), f"Missing mode: {mode}"
        value = getattr(Compression, mode)
        assert isinstance(value, int), f"Mode {mode} is not an integer"

test_write_with_different_compressions

test_write_with_different_compressions(tmp_path)

Test writing with different compression modes.

Source code in gwframe/tests/test_write.py
def test_write_with_different_compressions(self, tmp_path):
    """Test writing with different compression modes."""
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    original_data = np.random.randn(n_samples)

    compression_modes = [
        ("raw", Compression.RAW),
        ("gzip", Compression.GZIP),
        ("diff_gzip", Compression.DIFF_GZIP),
    ]

    for name, compression in compression_modes:
        tmp_file = tmp_path / f"test_{name}.gwf"

        # Write with specific compression
        gwframe.write(
            str(tmp_file),
            original_data,
            start=t0,
            sample_rate=sample_rate,
            name="L1:TEST",
            unit="strain",
            compression=compression,
        )

        # Read back and verify
        result = gwframe.read(str(tmp_file), "L1:TEST")
        assert np.allclose(original_data, result.array)

TestFrameDictInterface

Tests for the Frame MutableMapping interface (setitem, delitem).

test_setitem_delitem_triggers_rebuild_on_write

test_setitem_delitem_triggers_rebuild_on_write(tmp_path)

Mutating a Frame via dict interface rebuilds the frame on write.

Source code in gwframe/tests/test_write.py
def test_setitem_delitem_triggers_rebuild_on_write(self, tmp_path):
    """Mutating a Frame via dict interface rebuilds the frame on write."""
    tmp_file = tmp_path / "dict_mutated.gwf"

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1", run=1)
    keep_data = np.arange(1000, dtype=np.float64)
    drop_data = np.full(500, 7.0, dtype=np.float64)
    frame.add_channel("L1:KEEP", keep_data, sample_rate=1000, unit="strain")
    frame.add_channel("L1:DROP", drop_data, sample_rate=500, unit="counts")

    # Replace KEEP with a new TimeSeries via __setitem__ and drop the other
    replacement_data = np.linspace(-1.0, 1.0, 2000, dtype=np.float64)
    frame["L1:KEEP"] = gwframe.TimeSeries(
        array=replacement_data,
        name="L1:KEEP",
        dtype=replacement_data.dtype,
        start=frame.start,
        dt=1.0 / 2000,
        duration=1.0,
        sample_rate=2000.0,
        unit="strain",
        type="proc",
    )
    del frame["L1:DROP"]

    assert frame._channels_modified is True
    assert len(frame) == 1
    assert "L1:DROP" not in frame

    frame.write(str(tmp_file))

    # Rebuild flag should be cleared after write
    assert frame._channels_modified is False

    # File should reflect the mutated state
    result = gwframe.read(str(tmp_file), channels=None)
    assert set(result.keys()) == {"L1:KEEP"}
    np.testing.assert_array_equal(replacement_data, result["L1:KEEP"].array)
    assert result["L1:KEEP"].sample_rate == 2000.0

test_start_setter_updates_written_frame

test_start_setter_updates_written_frame(tmp_path)

Reassigning Frame.start updates the GPS time of the written frame.

Source code in gwframe/tests/test_write.py
def test_start_setter_updates_written_frame(self, tmp_path):
    """Reassigning Frame.start updates the GPS time of the written frame."""
    tmp_file = tmp_path / "start_setter.gwf"
    original_start = 1234567890.0
    new_start = 1300000000.0

    frame = gwframe.Frame(start=original_start, duration=1.0, name="L1", run=1)
    data = np.arange(1000, dtype=np.float64)
    frame.add_channel("L1:TEST", data, sample_rate=1000, unit="strain")

    frame.start = new_start
    assert frame.start == new_start

    frame.write(str(tmp_file))

    # Verify file metadata reflects the new start time
    info = get_info(str(tmp_file))
    assert info.frames[0].start == new_start

    result = gwframe.read(str(tmp_file), "L1:TEST")
    assert result.start == new_start
    np.testing.assert_array_equal(data, result.array)

TestFrameMetadata

Tests for frame metadata (detector, run, etc.).

test_frame_with_full_metadata

test_frame_with_full_metadata(tmp_path)

Test creating frame with all metadata fields.

Source code in gwframe/tests/test_write.py
def test_frame_with_full_metadata(self, tmp_path):
    """Test creating frame with all metadata fields."""
    tmp_file = tmp_path / "metadata.gwf"

    frame = gwframe.Frame(
        start=1234567890.0,
        duration=1.0,
        name="L1",
        run=42,
        frame_number=100,
    )

    data = np.random.randn(1000)
    frame.add_channel(
        "L1:TEST",
        data,
        sample_rate=1000,
        unit="strain",
        channel_type="proc",
    )

    # Add history
    frame.add_history("gwframe", "Test frame created by gwframe")

    frame.write(str(tmp_file))

    # Read back and verify
    result = gwframe.read(str(tmp_file), "L1:TEST")
    assert len(result.array) == 1000
    assert result.start == 1234567890.0

test_frame_without_detector

test_frame_without_detector(tmp_path)

Test creating frame without detector name.

Source code in gwframe/tests/test_write.py
def test_frame_without_detector(self, tmp_path):
    """Test creating frame without detector name."""
    tmp_file = tmp_path / "no_detector.gwf"

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="TEST", run=1)

    data = np.random.randn(1000)
    frame.add_channel("TEST:CHANNEL", data, sample_rate=1000, unit="counts")

    frame.write(str(tmp_file))

    # Read back
    result = gwframe.read(str(tmp_file), "TEST:CHANNEL")
    assert len(result.array) == 1000

TestFrameNumber

Tests for frame number tracking and auto-increment.

test_default_frame_number

test_default_frame_number()

Test default frame_number is 0.

Source code in gwframe/tests/test_write.py
def test_default_frame_number(self):
    """Test default frame_number is 0."""
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1", run=1)

    assert frame.frame_number == 0

test_explicit_frame_number

test_explicit_frame_number()

Test creating frame with explicit frame_number.

Source code in gwframe/tests/test_write.py
def test_explicit_frame_number(self):
    """Test creating frame with explicit frame_number."""
    frame = gwframe.Frame(
        start=1234567890.0, duration=1.0, name="L1", run=1, frame_number=42
    )

    assert frame.frame_number == 42

test_framewriter_auto_increment

test_framewriter_auto_increment(tmp_path)

Test FrameWriter auto-increments frame numbers.

Source code in gwframe/tests/test_write.py
def test_framewriter_auto_increment(self, tmp_path):
    """Test FrameWriter auto-increments frame numbers."""
    tmp_file = tmp_path / "auto_increment.gwf"
    initial_frame_number = 10
    n_frames = 5

    with gwframe.FrameWriter(
        str(tmp_file), frame_number=initial_frame_number
    ) as writer:
        # Initial frame number should be set
        assert writer._frame_number == initial_frame_number

        # Write multiple frames
        for i in range(n_frames):
            data = np.random.randn(1000)
            writer.write(
                data,
                start=1234567890.0 + i,
                sample_rate=1000,
                name="L1:TEST",
                unit="counts",
            )
            # Frame number should increment after each write
            expected_frame_num = initial_frame_number + i + 1
            assert writer._frame_number == expected_frame_num

    # Verify frames were written (can't easily check frame numbers in file)
    for i in range(n_frames):
        result = gwframe.read(str(tmp_file), "L1:TEST", frame_index=i)
        assert len(result.array) == 1000

test_manual_frame_objects_with_framewriter

test_manual_frame_objects_with_framewriter(tmp_path)

Test writing manual Frame objects with specific frame numbers.

Source code in gwframe/tests/test_write.py
def test_manual_frame_objects_with_framewriter(self, tmp_path):
    """Test writing manual Frame objects with specific frame numbers."""
    tmp_file = tmp_path / "manual_frames.gwf"

    with gwframe.FrameWriter(str(tmp_file), frame_number=100) as writer:
        for i in range(3):
            # Create frame with explicit frame number
            frame = gwframe.Frame(
                start=1234567890.0 + i,
                duration=1.0,
                name="L1",
                run=1,
                frame_number=100 + i,
            )
            data = np.random.randn(1000)
            frame.add_channel("L1:TEST", data, sample_rate=1000, unit="strain")

            writer.write_frame(frame)

    # Verify all frames were written
    for i in range(3):
        result = gwframe.read(str(tmp_file), "L1:TEST", frame_index=i)
        assert len(result.array) == 1000

TestFrameWriter

Tests for FrameWriter context manager.

test_framewriter_multiple_channels

test_framewriter_multiple_channels(tmp_path)

Test writing multiple channels per frame.

Source code in gwframe/tests/test_write.py
def test_framewriter_multiple_channels(self, tmp_path):
    """Test writing multiple channels per frame."""
    tmp_file = tmp_path / "multichannel.gwf"

    with gwframe.FrameWriter(str(tmp_file)) as writer:
        # Create frame with multiple channels
        data1 = np.random.randn(1000)
        data2 = np.random.randn(500)

        # Write using manual Frame object
        frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1", run=1)
        frame.add_channel("L1:STRAIN", data1, sample_rate=1000, unit="strain")
        frame.add_channel("L1:AUX", data2, sample_rate=500, unit="counts")
        writer.write_frame(frame)

    # Read back both channels
    strain = gwframe.read(str(tmp_file), "L1:STRAIN")
    aux = gwframe.read(str(tmp_file), "L1:AUX")

    assert len(strain.array) == 1000
    assert len(aux.array) == 500
    assert strain.sample_rate == 1000.0
    assert aux.sample_rate == 500.0

test_framewriter_with_compression

test_framewriter_with_compression(tmp_path)

Test FrameWriter with compression.

Source code in gwframe/tests/test_write.py
def test_framewriter_with_compression(self, tmp_path):
    """Test FrameWriter with compression."""
    tmp_file = tmp_path / "compressed_multi.gwf"

    with gwframe.FrameWriter(str(tmp_file), compression=Compression.GZIP) as writer:
        for i in range(3):
            data = np.random.randn(1000)
            writer.write(
                data,
                start=1234567890.0 + i,
                sample_rate=1000,
                name="L1:TEST",
                unit="strain",
            )

    # Verify all frames can be read back
    for i in range(3):
        result = gwframe.read(str(tmp_file), "L1:TEST", frame_index=i)
        assert len(result.array) == 1000

test_write_multiple_frames

test_write_multiple_frames(tmp_path)

Test writing multiple frames to a single file.

Source code in gwframe/tests/test_write.py
def test_write_multiple_frames(self, tmp_path):
    """Test writing multiple frames to a single file."""
    tmp_file = tmp_path / "multiframe.gwf"
    n_frames = 5
    sample_rate = 1000.0
    n_samples = 1000
    t0_base = 1234567890.0

    # Write multiple frames
    with gwframe.FrameWriter(str(tmp_file)) as writer:
        for i in range(n_frames):
            data = np.full(n_samples, float(i))
            writer.write(
                data,
                start=t0_base + i,
                sample_rate=sample_rate,
                name="L1:TEST",
                unit="counts",
            )

    # Read back each frame
    for i in range(n_frames):
        result = gwframe.read(str(tmp_file), "L1:TEST", frame_index=i)
        assert np.all(result.array == float(i))
        assert result.start == t0_base + i

TestFrameWriterOpenClose

Tests for FrameWriter open()/close() API.

test_close_is_idempotent

test_close_is_idempotent(tmp_path)

Test that calling close() multiple times is safe.

Source code in gwframe/tests/test_write.py
def test_close_is_idempotent(self, tmp_path):
    """Test that calling close() multiple times is safe."""
    tmp_file = tmp_path / "idempotent.gwf"
    writer = gwframe.FrameWriter(str(tmp_file))
    writer.open()
    data = np.random.randn(1000)
    writer.write(data, start=1234567890.0, sample_rate=1000, name="L1:TEST")
    writer.close()
    writer.close()  # should be a no-op

    result = gwframe.read(str(tmp_file), "L1:TEST")
    np.testing.assert_array_equal(data, result.array)

test_open_close_bytesio

test_open_close_bytesio()

Test open()/close() with BytesIO destination.

Source code in gwframe/tests/test_write.py
def test_open_close_bytesio(self):
    """Test open()/close() with BytesIO destination."""
    from io import BytesIO

    buffer = BytesIO()
    writer = gwframe.FrameWriter(buffer)
    writer.open()
    data = np.random.randn(1000)
    writer.write(data, start=1234567890.0, sample_rate=1000, name="L1:TEST")
    writer.close()

    result = gwframe.read_bytes(buffer.getvalue(), "L1:TEST")
    np.testing.assert_array_equal(data, result.array)

test_open_close_file

test_open_close_file(tmp_path)

Test writing frames using open()/close() instead of context manager.

Source code in gwframe/tests/test_write.py
def test_open_close_file(self, tmp_path):
    """Test writing frames using open()/close() instead of context manager."""
    tmp_file = tmp_path / "open_close.gwf"
    writer = gwframe.FrameWriter(str(tmp_file))
    writer.open()
    for i in range(3):
        data = np.full(1000, float(i))
        writer.write(
            data,
            start=1234567890.0 + i,
            sample_rate=1000,
            name="L1:TEST",
        )
    writer.close()

    for i in range(3):
        result = gwframe.read(str(tmp_file), "L1:TEST", frame_index=i)
        assert np.all(result.array == float(i))

test_open_when_already_open_raises

test_open_when_already_open_raises(tmp_path)

Test that opening an already-open writer raises an error.

Source code in gwframe/tests/test_write.py
def test_open_when_already_open_raises(self, tmp_path):
    """Test that opening an already-open writer raises an error."""
    tmp_file = tmp_path / "double_open.gwf"
    writer = gwframe.FrameWriter(str(tmp_file))
    writer.open()
    with pytest.raises(RuntimeError, match="already open"):
        writer.open()
    writer.close()

TestSimpleWrite

Tests for simple write operations using gwframe.write().

test_write_basic

test_write_basic(tmp_path)

Test basic write and read-back.

Source code in gwframe/tests/test_write.py
def test_write_basic(self, tmp_path):
    """Test basic write and read-back."""
    tmp_file = tmp_path / "test.gwf"

    # Create test data
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    original_data = np.sin(np.linspace(0, 2 * np.pi, n_samples))

    # Write frame
    gwframe.write(
        str(tmp_file),
        original_data,
        start=t0,
        sample_rate=sample_rate,
        name="L1:TEST",
        unit="strain",
    )

    # Read back
    result = gwframe.read(str(tmp_file), "L1:TEST")

    # Verify
    assert np.allclose(original_data, result.array)
    assert result.sample_rate == sample_rate
    assert result.start == t0
    assert result.name == "L1:TEST"
    assert result.unit == "strain"

test_write_with_compression

test_write_with_compression(tmp_path)

Test writing with GZIP compression.

Source code in gwframe/tests/test_write.py
def test_write_with_compression(self, tmp_path):
    """Test writing with GZIP compression."""
    tmp_file = tmp_path / "test_compressed.gwf"

    # Create test data
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    original_data = np.sin(np.linspace(0, 2 * np.pi, n_samples))

    # Write with compression
    gwframe.write(
        str(tmp_file),
        original_data,
        start=t0,
        sample_rate=sample_rate,
        name="L1:TEST",
        unit="strain",
        compression=Compression.GZIP,
    )

    # Read back
    result = gwframe.read(str(tmp_file), "L1:TEST")

    # Verify data integrity after compression
    assert np.allclose(original_data, result.array)
    assert result.sample_rate == sample_rate

TestWriteBytes

Tests for write_bytes functionality.

test_frame_write_bytes_roundtrip

test_frame_write_bytes_roundtrip()

Test Frame.write_bytes() method.

Source code in gwframe/tests/test_write.py
def test_frame_write_bytes_roundtrip(self):
    """Test Frame.write_bytes() method."""
    # Create frame
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1")
    data = np.random.randn(16384)
    frame.add_channel("L1:TEST", data, sample_rate=16384)

    # Write to bytes
    gwf_bytes = frame.write_bytes()

    # Verify we got bytes
    assert isinstance(gwf_bytes, bytes)
    assert len(gwf_bytes) > 0

    # Read back
    result = gwframe.read_bytes(gwf_bytes, "L1:TEST")

    # Verify
    np.testing.assert_array_equal(data, result.array)
    assert result.sample_rate == 16384
    assert result.start == 1234567890.0

test_frame_writer_bytesio

test_frame_writer_bytesio()

Test FrameWriter with BytesIO destination.

Source code in gwframe/tests/test_write.py
def test_frame_writer_bytesio(self):
    """Test FrameWriter with BytesIO destination."""
    from io import BytesIO

    buffer = BytesIO()

    # Write multiple frames to BytesIO
    with gwframe.FrameWriter(buffer) as writer:
        for i in range(5):
            data = np.random.randn(1000)
            writer.write(
                data,
                start=1234567890.0 + i,
                sample_rate=1000,
                name="L1:TEST",
            )

    # Get bytes
    gwf_bytes = buffer.getvalue()

    # Verify we can read all 5 frames
    for i in range(5):
        result = gwframe.read_bytes(gwf_bytes, "L1:TEST", frame_index=i)
        assert len(result.array) == 1000
        assert result.start == 1234567890.0 + i

test_frame_writer_bytesio_equals_file

test_frame_writer_bytesio_equals_file(tmp_path)

Test that FrameWriter produces identical output for file and BytesIO.

Source code in gwframe/tests/test_write.py
def test_frame_writer_bytesio_equals_file(self, tmp_path):
    """Test that FrameWriter produces identical output for file and BytesIO."""
    from io import BytesIO

    tmp_file = tmp_path / "test.gwf"

    # Generate test data
    frames_data = [np.random.randn(1000) for _ in range(3)]

    # Write to file
    with gwframe.FrameWriter(str(tmp_file)) as writer:
        for i, data in enumerate(frames_data):
            writer.write(
                data, start=1234567890.0 + i, sample_rate=1000, name="L1:TEST"
            )

    # Write to BytesIO
    buffer = BytesIO()
    with gwframe.FrameWriter(buffer) as writer:
        for i, data in enumerate(frames_data):
            writer.write(
                data, start=1234567890.0 + i, sample_rate=1000, name="L1:TEST"
            )

    # Compare bytes
    with open(tmp_file, "rb") as f:
        file_bytes = f.read()

    buffer_bytes = buffer.getvalue()

    assert file_bytes == buffer_bytes

test_write_bytes_basic_roundtrip

test_write_bytes_basic_roundtrip()

Test basic round-trip: write_bytes -> read_bytes.

Source code in gwframe/tests/test_write.py
def test_write_bytes_basic_roundtrip(self):
    """Test basic round-trip: write_bytes -> read_bytes."""
    # Create test data
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    original_data = np.sin(np.linspace(0, 2 * np.pi, n_samples))

    # Write to bytes
    gwf_bytes = gwframe.write_bytes(
        original_data,
        start=t0,
        sample_rate=sample_rate,
        name="L1:TEST",
        unit="strain",
    )

    # Verify we got bytes
    assert isinstance(gwf_bytes, bytes)
    assert len(gwf_bytes) > 0

    # Read back from bytes
    result = gwframe.read_bytes(gwf_bytes, "L1:TEST")

    # Verify data integrity
    np.testing.assert_array_equal(original_data, result.array)
    assert result.sample_rate == sample_rate
    assert result.start == t0
    assert result.name == "L1:TEST"
    assert result.unit == "strain"

test_write_bytes_equals_write_file

test_write_bytes_equals_write_file(tmp_path)

Test that write_bytes produces identical output to write.

Source code in gwframe/tests/test_write.py
def test_write_bytes_equals_write_file(self, tmp_path):
    """Test that write_bytes produces identical output to write."""
    tmp_file = tmp_path / "test.gwf"

    # Create test data
    n_samples = 1000
    sample_rate = 1000.0
    t0 = 1234567890.0
    data = np.sin(np.linspace(0, 2 * np.pi, n_samples))

    # Write to file
    gwframe.write(
        str(tmp_file),
        data,
        start=t0,
        sample_rate=sample_rate,
        name="L1:TEST",
    )

    # Write to bytes with same parameters
    gwf_bytes = gwframe.write_bytes(
        data,
        start=t0,
        sample_rate=sample_rate,
        name="L1:TEST",
    )

    # Read file bytes
    with open(tmp_file, "rb") as f:
        file_bytes = f.read()

    # Compare
    assert gwf_bytes == file_bytes

test_write_bytes_multiple_channels

test_write_bytes_multiple_channels()

Test write_bytes with multiple channels.

Source code in gwframe/tests/test_write.py
def test_write_bytes_multiple_channels(self):
    """Test write_bytes with multiple channels."""
    # Create test data
    data1 = np.random.randn(1000)
    data2 = np.random.randn(1000)
    channels = {"L1:CHAN1": data1, "L1:CHAN2": data2}

    # Write to bytes
    gwf_bytes = gwframe.write_bytes(
        channels,
        start=1234567890.0,
        sample_rate=1000,
        name="L1",
    )

    # Read back
    result = gwframe.read_bytes(gwf_bytes, channels=None)

    # Verify both channels
    assert "L1:CHAN1" in result
    assert "L1:CHAN2" in result
    np.testing.assert_array_equal(data1, result["L1:CHAN1"].array)
    np.testing.assert_array_equal(data2, result["L1:CHAN2"].array)

test_write_bytes_with_compression

test_write_bytes_with_compression()

Test write_bytes with different compression schemes.

Source code in gwframe/tests/test_write.py
def test_write_bytes_with_compression(self):
    """Test write_bytes with different compression schemes."""
    data = np.random.randn(1000)
    t0 = 1234567890.0

    # Test with no compression
    bytes_raw = gwframe.write_bytes(
        data,
        start=t0,
        sample_rate=1000,
        name="L1:TEST",
        compression=Compression.RAW,
    )

    # Test with gzip compression
    bytes_gzip = gwframe.write_bytes(
        data,
        start=t0,
        sample_rate=1000,
        name="L1:TEST",
        compression=Compression.GZIP,
        compression_level=9,
    )

    # Verify both are valid
    result_raw = gwframe.read_bytes(bytes_raw, "L1:TEST")
    result_gzip = gwframe.read_bytes(bytes_gzip, "L1:TEST")

    np.testing.assert_array_equal(data, result_raw.array)
    np.testing.assert_array_equal(data, result_gzip.array)

    # Compressed should be smaller (for random data with high compression)
    assert len(bytes_gzip) < len(bytes_raw)

test_add_channel_with_unknown_detector_prefix

test_add_channel_with_unknown_detector_prefix(tmp_path)

Channels with a non-detector prefix write without error (silent-skip).

Source code in gwframe/tests/test_write.py
def test_add_channel_with_unknown_detector_prefix(tmp_path):
    """Channels with a non-detector prefix write without error (silent-skip)."""
    tmp_file = tmp_path / "unknown_prefix.gwf"
    data = np.arange(100, dtype=np.float64)

    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="SYN", run=1)
    # 'XX' is deliberately not a known DetectorLocation — real prefixes are
    # an alpha followed by a digit (e.g. 'L1', 'H1', 'V1'), so 'XX' cannot
    # collide with any present or future detector.
    frame.add_channel("XX:SYNTH", data, sample_rate=100, unit="counts")

    # The DetectorLocation[prefix] lookup should have raised KeyError and
    # been swallowed, leaving _detectors_added empty for this prefix.
    assert "XX" not in frame._detectors_added

    frame.write(str(tmp_file))

    # Round-trip still works — the channel is just written without detector
    # metadata attached to the frame.
    result = gwframe.read(str(tmp_file), "XX:SYNTH")
    np.testing.assert_array_equal(data, result.array)

test_add_channel_wrong_dimensions

test_add_channel_wrong_dimensions(tmp_path)

Test that adding non-1D data raises error.

Source code in gwframe/tests/test_write.py
def test_add_channel_wrong_dimensions(tmp_path):
    """Test that adding non-1D data raises error."""
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1")

    # 2D array should raise error
    data_2d = np.random.randn(10, 10)
    with pytest.raises(ValueError, match="must be 1D array"):
        frame.add_channel("L1:TEST", data_2d, sample_rate=1000)

test_framewriter_error_when_not_opened

test_framewriter_error_when_not_opened(tmp_path)

Test that FrameWriter raises error if used outside context manager.

Source code in gwframe/tests/test_write.py
def test_framewriter_error_when_not_opened(tmp_path):
    """Test that FrameWriter raises error if used outside context manager."""
    tmp_file = tmp_path / "test.gwf"
    writer = gwframe.FrameWriter(str(tmp_file))

    # Should raise error when not in 'with' block
    with pytest.raises(RuntimeError, match="not opened"):
        writer.write(
            np.random.randn(100),
            start=1234567890.0,
            sample_rate=100,
            name="L1:TEST",
        )

test_framewriter_write_single_array_without_name

test_framewriter_write_single_array_without_name(tmp_path)

Test FrameWriter.write() with single array but no name raises error.

Source code in gwframe/tests/test_write.py
def test_framewriter_write_single_array_without_name(tmp_path):
    """Test FrameWriter.write() with single array but no name raises error."""
    tmp_file = tmp_path / "test.gwf"

    with (
        gwframe.FrameWriter(str(tmp_file)) as writer,
        pytest.raises(ValueError, match="name parameter required"),
    ):
        writer.write(
            np.random.randn(1000),
            start=1234567890.0,
            sample_rate=1000,
            # Missing name parameter
        )

test_unsupported_channel_type

test_unsupported_channel_type()

Test that unsupported channel_type raises error.

Source code in gwframe/tests/test_write.py
def test_unsupported_channel_type():
    """Test that unsupported channel_type raises error."""
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1")
    data = np.random.randn(1000)

    with pytest.raises(ValueError, match="Unsupported channel_type"):
        frame.add_channel(
            "L1:TEST",
            data,
            sample_rate=1000,
            channel_type="invalid_type",
        )

test_write_frame_without_context

test_write_frame_without_context(tmp_path)

Test write_frame raises error outside context manager.

Source code in gwframe/tests/test_write.py
def test_write_frame_without_context(tmp_path):
    """Test write_frame raises error outside context manager."""
    tmp_file = tmp_path / "test.gwf"
    writer = gwframe.FrameWriter(str(tmp_file))
    frame = gwframe.Frame(start=1234567890.0, duration=1.0, name="L1")

    with pytest.raises(RuntimeError, match="not opened"):
        writer.write_frame(frame)

test_write_single_array_without_name_raises_error

test_write_single_array_without_name_raises_error(tmp_path)

Test that writing single array without name parameter raises error.

Source code in gwframe/tests/test_write.py
def test_write_single_array_without_name_raises_error(tmp_path):
    """Test that writing single array without name parameter raises error."""
    tmp_file = tmp_path / "test.gwf"
    data = np.random.randn(1000)

    with pytest.raises(ValueError, match="name parameter required"):
        gwframe.write(
            str(tmp_file),
            data,  # Single array
            start=1234567890.0,
            sample_rate=1000,
            # Missing name parameter
        )