Skip to content

test_operations

Comprehensive tests for gwframe operations module.

TestCombineChannels

Tests for combine_channels operation.

test_combine_directories

test_combine_directories(tmp_path, sample_data, create_test_frame)

Test combining channels from multiple directories with matching frames.

Source code in gwframe/tests/test_operations.py
def test_combine_directories(self, tmp_path, sample_data, create_test_frame):
    """Test combining channels from multiple directories with matching frames."""
    # Create two directories with matching time ranges
    dir1 = tmp_path / "dir1"
    dir2 = tmp_path / "dir2"
    dir1.mkdir()
    dir2.mkdir()

    n_samples = sample_data["n_samples"]
    n_files = 3

    # Create matching frame sets in both directories
    for i in range(n_files):
        t0 = sample_data["t0"] + i * sample_data["duration"]

        # Directory 1: CHAN_A
        file1 = dir1 / f"frame_{i}.gwf"
        data_a = np.full(n_samples, float(i), dtype=np.float32)
        with FrameWriter(str(file1)) as writer:
            channels = {"L1:CHAN_A": data_a}
            frame = create_test_frame(t0, sample_data["duration"], channels)
            writer.write_frame(frame)

        # Directory 2: CHAN_B
        file2 = dir2 / f"frame_{i}.gwf"
        data_b = np.full(n_samples, float(i * 10), dtype=np.float32)
        with FrameWriter(str(file2)) as writer:
            channels = {"L1:CHAN_B": data_b}
            frame = create_test_frame(t0, sample_data["duration"], channels)
            writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.combine_channels(
        [str(dir1), str(dir2)], str(output_dir)
    )

    # Should create one combined file per time range
    assert len(output_files) == n_files

    # Verify each output file has both channels
    for i, output_file in enumerate(sorted(output_files)):
        frames = list(gwframe.read_frames(output_file))
        assert len(frames) == 1

        # Both channels should be present
        assert "L1:CHAN_A" in frames[0]
        assert "L1:CHAN_B" in frames[0]

        # Verify data values
        assert np.allclose(frames[0]["L1:CHAN_A"].array, float(i))
        assert np.allclose(frames[0]["L1:CHAN_B"].array, float(i * 10))

test_combine_two_files

test_combine_two_files(tmp_path, sample_data, create_test_frame)

Test combining channels from two files.

Source code in gwframe/tests/test_operations.py
def test_combine_two_files(self, tmp_path, sample_data, create_test_frame):
    """Test combining channels from two files."""
    # Create two files with different channels covering same time
    file1 = tmp_path / "file1.gwf"
    file2 = tmp_path / "file2.gwf"
    n_samples = sample_data["n_samples"]

    data1 = np.arange(n_samples, dtype=np.float32)
    data2 = np.arange(n_samples, dtype=np.float32) * 2

    with FrameWriter(str(file1)) as writer:
        channels = {"L1:CHAN_A": data1}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    with FrameWriter(str(file2)) as writer:
        channels = {"L1:CHAN_B": data2}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.combine_channels(
        [str(file1), str(file2)], str(output_dir)
    )

    assert len(output_files) == 1

    # Verify both channels are present
    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:CHAN_A" in frames[0]
    assert "L1:CHAN_B" in frames[0]
    assert np.allclose(frames[0]["L1:CHAN_A"].array, data1)
    assert np.allclose(frames[0]["L1:CHAN_B"].array, data2)

test_combine_with_keep_channels

test_combine_with_keep_channels(tmp_path, sample_data, create_test_frame)

Test combining with channel filtering (keep).

Source code in gwframe/tests/test_operations.py
def test_combine_with_keep_channels(self, tmp_path, sample_data, create_test_frame):
    """Test combining with channel filtering (keep)."""
    file1 = tmp_path / "file1.gwf"
    file2 = tmp_path / "file2.gwf"
    n_samples = sample_data["n_samples"]

    data = np.arange(n_samples, dtype=np.float32)

    with FrameWriter(str(file1)) as writer:
        channels = {"L1:CHAN_A": data, "L1:CHAN_B": data * 2}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    with FrameWriter(str(file2)) as writer:
        channels = {"L1:CHAN_C": data * 3}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.combine_channels(
        [str(file1), str(file2)],
        str(output_dir),
        keep_channels=["L1:CHAN_A", "L1:CHAN_C"],
    )

    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:CHAN_A" in frames[0]
    assert "L1:CHAN_C" in frames[0]
    assert "L1:CHAN_B" not in frames[0]  # Filtered out

TestCombineChannelsFrameValidation

Frame-shape validation branches inside combine_channels.

test_directory_mode_missing_time_range

test_directory_mode_missing_time_range(tmp_path, sample_data, create_test_frame)

Directory mode: each time range must exist in every source directory.

Source code in gwframe/tests/test_operations.py
def test_directory_mode_missing_time_range(
    self, tmp_path, sample_data, create_test_frame
):
    """Directory mode: each time range must exist in every source directory."""
    n_samples = sample_data["n_samples"]
    dir_a = tmp_path / "dir_a"
    dir_b = tmp_path / "dir_b"
    dir_a.mkdir()
    dir_b.mkdir()

    # dir_a has a file at t0
    with FrameWriter(str(dir_a / "first.gwf")) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"],
                sample_data["duration"],
                {"L1:CHAN_A": np.arange(n_samples, dtype=np.float32)},
            )
        )

    # dir_b has a file at a different GPS time
    with FrameWriter(str(dir_b / "second.gwf")) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"] + 1000.0,
                sample_data["duration"],
                {"L1:CHAN_B": np.arange(n_samples, dtype=np.float32)},
            )
        )

    with pytest.raises(ValueError, match="missing from directories"):
        operations.combine_channels(
            [str(dir_a), str(dir_b)], output_dir=str(tmp_path / "out")
        )

test_frame_count_mismatch

test_frame_count_mismatch(tmp_path, sample_data, create_test_frame)

Two files with different frame counts should raise ValueError.

Source code in gwframe/tests/test_operations.py
def test_frame_count_mismatch(self, tmp_path, sample_data, create_test_frame):
    """Two files with different frame counts should raise ValueError."""
    n_samples = sample_data["n_samples"]
    file_a = tmp_path / "a.gwf"
    file_b = tmp_path / "b.gwf"

    # A has 1 frame
    with FrameWriter(str(file_a)) as writer:
        frame = create_test_frame(
            sample_data["t0"],
            sample_data["duration"],
            {"L1:CHAN_A": np.arange(n_samples, dtype=np.float32)},
        )
        writer.write_frame(frame)

    # B has 2 frames
    with FrameWriter(str(file_b)) as writer:
        for i in range(2):
            frame = create_test_frame(
                sample_data["t0"] + i * sample_data["duration"],
                sample_data["duration"],
                {"L1:CHAN_B": np.arange(n_samples, dtype=np.float32)},
                frame_number=i,
            )
            writer.write_frame(frame)

    with pytest.raises(ValueError, match="Frame count mismatch"):
        operations.combine_channels(
            [str(file_a), str(file_b)], output_dir=str(tmp_path / "out")
        )

test_frame_duration_mismatch

test_frame_duration_mismatch(tmp_path, sample_data, create_test_frame)

Two single-frame files with matching start but different durations raise.

Source code in gwframe/tests/test_operations.py
def test_frame_duration_mismatch(self, tmp_path, sample_data, create_test_frame):
    """Two single-frame files with matching start but different durations raise."""
    file_a = tmp_path / "a.gwf"
    file_b = tmp_path / "b.gwf"

    # Same start time, different durations, same nominal sample_rate
    with FrameWriter(str(file_a)) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"],
                1.0,
                {"L1:CHAN_A": np.arange(16, dtype=np.float32)},
            )
        )

    with FrameWriter(str(file_b)) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"],
                2.0,
                {"L1:CHAN_B": np.arange(32, dtype=np.float32)},
            )
        )

    with pytest.raises(ValueError, match="duration mismatch"):
        operations.combine_channels(
            [str(file_a), str(file_b)], output_dir=str(tmp_path / "out")
        )

test_frame_start_time_mismatch

test_frame_start_time_mismatch(tmp_path, sample_data, create_test_frame)

Two single-frame files with different start times should raise ValueError.

Source code in gwframe/tests/test_operations.py
def test_frame_start_time_mismatch(self, tmp_path, sample_data, create_test_frame):
    """Two single-frame files with different start times should raise ValueError."""
    n_samples = sample_data["n_samples"]
    file_a = tmp_path / "a.gwf"
    file_b = tmp_path / "b.gwf"

    with FrameWriter(str(file_a)) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"],
                sample_data["duration"],
                {"L1:CHAN_A": np.arange(n_samples, dtype=np.float32)},
            )
        )

    with FrameWriter(str(file_b)) as writer:
        writer.write_frame(
            create_test_frame(
                sample_data["t0"] + 5.0,  # different start time
                sample_data["duration"],
                {"L1:CHAN_B": np.arange(n_samples, dtype=np.float32)},
            )
        )

    with pytest.raises(ValueError, match="time mismatch"):
        operations.combine_channels(
            [str(file_a), str(file_b)], output_dir=str(tmp_path / "out")
        )

TestDropChannels

Tests for drop_channels operation.

test_drop_multiple_channels

test_drop_multiple_channels(single_frame_file, tmp_path, channels_to_drop, expected_remaining)

Test dropping different numbers of channels.

Source code in gwframe/tests/test_operations.py
@pytest.mark.parametrize(
    ("channels_to_drop", "expected_remaining"),
    [
        (["L1:CHAN1"], ["L1:CHAN2", "L1:CHAN3"]),
        (["L1:CHAN1", "L1:CHAN2"], ["L1:CHAN3"]),
        (["L1:CHAN1", "L1:CHAN2", "L1:CHAN3"], []),
    ],
)
def test_drop_multiple_channels(
    self, single_frame_file, tmp_path, channels_to_drop, expected_remaining
):
    """Test dropping different numbers of channels."""
    output_dir = tmp_path / "output"

    output_files = operations.drop_channels(
        str(single_frame_file), str(output_dir), channels_to_drop
    )

    frames = list(gwframe.read_frames(output_files[0]))
    remaining_channels = list(frames[0].keys())

    assert set(remaining_channels) == set(expected_remaining)

test_drop_single_channel

test_drop_single_channel(single_frame_file, tmp_path)

Test dropping a single channel.

Source code in gwframe/tests/test_operations.py
def test_drop_single_channel(self, single_frame_file, tmp_path):
    """Test dropping a single channel."""
    output_dir = tmp_path / "output"
    channels_to_drop = ["L1:CHAN2"]

    output_files = operations.drop_channels(
        str(single_frame_file), str(output_dir), channels_to_drop
    )

    assert len(output_files) == 1

    # Verify channel was dropped
    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:CHAN1" in frames[0]
    assert "L1:CHAN3" in frames[0]
    assert "L1:CHAN2" not in frames[0]

TestImputeMissingData

Tests for impute_missing_data operation.

test_impute_different_values

test_impute_different_values(tmp_path, sample_data, create_test_frame, replace_value, fill_value, input_data, expected_data)

Test imputing different replacement and fill values.

Source code in gwframe/tests/test_operations.py
@pytest.mark.parametrize(
    ("replace_value", "fill_value", "input_data", "expected_data"),
    [
        (np.nan, 0.0, [1.0, np.nan, 3.0], [1.0, 0.0, 3.0]),
        (-999.0, 0.0, [1.0, -999.0, 3.0], [1.0, 0.0, 3.0]),
        (0.0, 100.0, [1.0, 0.0, 3.0], [1.0, 100.0, 3.0]),
    ],
)
def test_impute_different_values(
    self,
    tmp_path,
    sample_data,
    create_test_frame,
    replace_value,
    fill_value,
    input_data,
    expected_data,
):
    """Test imputing different replacement and fill values."""
    file_path = tmp_path / "test.gwf"
    data = np.array(input_data, dtype=np.float32)

    with FrameWriter(str(file_path)) as writer:
        channels = {"L1:DATA": data}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.impute_missing_data(
        str(file_path),
        str(output_dir),
        replace_value=replace_value,
        fill_value=fill_value,
    )

    frames = list(gwframe.read_frames(output_files[0]))
    result_data = frames[0]["L1:DATA"].array

    expected = np.array(expected_data, dtype=np.float32)
    # Use nanclose for NaN comparisons
    if np.isnan(replace_value):
        assert np.allclose(result_data, expected, equal_nan=True)
    else:
        assert np.allclose(result_data, expected)

test_impute_nan_values

test_impute_nan_values(tmp_path, sample_data, create_test_frame)

Test imputing NaN values with zeros.

Source code in gwframe/tests/test_operations.py
def test_impute_nan_values(self, tmp_path, sample_data, create_test_frame):
    """Test imputing NaN values with zeros."""
    # Create file with NaN values
    file_path = tmp_path / "with_nan.gwf"
    data = np.array([1.0, 2.0, np.nan, 4.0, np.nan, 6.0], dtype=np.float32)

    with FrameWriter(str(file_path)) as writer:
        channels = {"L1:DATA": data}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.impute_missing_data(
        str(file_path),
        str(output_dir),
        replace_value=np.nan,
        fill_value=0.0,
    )

    # Verify NaN values were replaced
    frames = list(gwframe.read_frames(output_files[0]))
    result_data = frames[0]["L1:DATA"].array

    expected = np.array([1.0, 2.0, 0.0, 4.0, 0.0, 6.0], dtype=np.float32)
    assert np.allclose(result_data, expected)

TestOperationMutexValidation

Argument-validation error paths on operations entry points.

test_combine_channels_keep_and_drop_raises

test_combine_channels_keep_and_drop_raises(tmp_path)

keep_channels and drop_channels are mutually exclusive.

Source code in gwframe/tests/test_operations.py
def test_combine_channels_keep_and_drop_raises(self, tmp_path):
    """keep_channels and drop_channels are mutually exclusive."""
    # Two dummy paths are enough; validation happens before any file I/O.
    src_a = tmp_path / "a.gwf"
    src_b = tmp_path / "b.gwf"
    with pytest.raises(ValueError, match="mutually exclusive"):
        operations.combine_channels(
            [str(src_a), str(src_b)],
            output_dir=str(tmp_path / "out"),
            keep_channels=["L1:CHAN1"],
            drop_channels=["L1:CHAN2"],
        )

test_rename_channels_in_place_with_output_dir_raises

test_rename_channels_in_place_with_output_dir_raises(single_frame_file, tmp_path)

in_place=True and a non-None output_dir are mutually exclusive.

Source code in gwframe/tests/test_operations.py
def test_rename_channels_in_place_with_output_dir_raises(
    self, single_frame_file, tmp_path
):
    """in_place=True and a non-None output_dir are mutually exclusive."""
    with pytest.raises(ValueError, match="mutually exclusive"):
        operations.rename_channels(
            str(single_frame_file),
            output_dir=str(tmp_path / "out"),
            channel_map={"L1:CHAN1": "L1:RENAMED"},
            in_place=True,
        )

TestRecompressFrames

Tests for recompress_frames operation.

test_recompress_different_settings

test_recompress_different_settings(single_frame_file, tmp_path, compression, level)

Test recompressing with different compression settings.

Source code in gwframe/tests/test_operations.py
@pytest.mark.parametrize(
    ("compression", "level"),
    [
        (Compression.RAW, 0),
        (Compression.GZIP, 1),
        (Compression.GZIP, 9),
    ],
)
def test_recompress_different_settings(
    self, single_frame_file, tmp_path, compression, level
):
    """Test recompressing with different compression settings."""
    output_dir = tmp_path / "output"

    output_files = operations.recompress_frames(
        str(single_frame_file), str(output_dir), compression, level
    )

    assert len(output_files) == 1
    assert Path(output_files[0]).exists()

    # Verify data integrity after recompression
    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:CHAN1" in frames[0]
    assert "L1:CHAN2" in frames[0]
    assert "L1:CHAN3" in frames[0]

TestRenameChannels

Tests for rename_channels operation.

test_rename_input_types

test_rename_input_types(multiple_files, tmp_path, input_type)

Test rename with different input types.

Source code in gwframe/tests/test_operations.py
@pytest.mark.parametrize(
    "input_type",
    ["single_file", "list_of_files"],
)
def test_rename_input_types(self, multiple_files, tmp_path, input_type):
    """Test rename with different input types."""
    output_dir = tmp_path / "output"
    channel_map = {"L1:DATA": "L1:RENAMED_DATA"}

    if input_type == "single_file":
        input_files = str(multiple_files[0])
        expected_count = 1
    else:  # list_of_files
        input_files = [str(f) for f in multiple_files]
        expected_count = len(multiple_files)

    output_files = operations.rename_channels(
        input_files, str(output_dir), channel_map
    )

    assert len(output_files) == expected_count
    for output_file in output_files:
        frames = list(gwframe.read_frames(output_file))
        assert "L1:RENAMED_DATA" in frames[0]
        assert "L1:DATA" not in frames[0]

test_rename_single_file

test_rename_single_file(single_frame_file, tmp_path)

Test renaming channels in a single file.

Source code in gwframe/tests/test_operations.py
def test_rename_single_file(self, single_frame_file, tmp_path):
    """Test renaming channels in a single file."""
    output_dir = tmp_path / "output"
    channel_map = {"L1:CHAN1": "L1:NEW_CHAN1", "L1:CHAN2": "L1:NEW_CHAN2"}

    output_files = operations.rename_channels(
        str(single_frame_file), str(output_dir), channel_map
    )

    assert len(output_files) == 1
    assert Path(output_files[0]).exists()

    # Verify renamed channels
    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:NEW_CHAN1" in frames[0]
    assert "L1:NEW_CHAN2" in frames[0]
    assert "L1:CHAN3" in frames[0]  # Unchanged
    assert "L1:CHAN1" not in frames[0]
    assert "L1:CHAN2" not in frames[0]

TestReplaceChannels

Tests for replace_channels operation.

test_replace_channel_data

test_replace_channel_data(tmp_path, sample_data, create_test_frame)

Test replacing channel data from another file.

Source code in gwframe/tests/test_operations.py
def test_replace_channel_data(self, tmp_path, sample_data, create_test_frame):
    """Test replacing channel data from another file."""
    # Create base file
    base_file = tmp_path / "base.gwf"
    base_data = np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32)

    with FrameWriter(str(base_file)) as writer:
        channels = {"L1:DATA": base_data, "L1:AUX": base_data * 2}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    # Create update file with new data
    update_file = tmp_path / "update.gwf"
    update_data = np.array([10.0, 20.0, 30.0, 40.0], dtype=np.float32)

    with FrameWriter(str(update_file)) as writer:
        channels = {"L1:DATA": update_data}
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels
        )
        writer.write_frame(frame)

    output_dir = tmp_path / "output"

    output_files = operations.replace_channels(
        str(base_file),
        str(update_file),
        str(output_dir),
        channels_to_replace=["L1:DATA"],
    )

    # Verify L1:DATA was updated, L1:AUX unchanged
    frames = list(gwframe.read_frames(output_files[0]))
    assert np.allclose(frames[0]["L1:DATA"].array, update_data)
    assert np.allclose(frames[0]["L1:AUX"].array, base_data * 2)

TestResizeFrames

Tests for resize_frames operation.

test_resize_no_split

test_resize_no_split(multi_frame_file, tmp_path, sample_data)

Test that frames are kept when target duration >= source duration.

Source code in gwframe/tests/test_operations.py
def test_resize_no_split(self, multi_frame_file, tmp_path, sample_data):
    """Test that frames are kept when target duration >= source duration."""
    output_dir = tmp_path / "output"
    target_duration = sample_data["duration"] * 2  # Double the source duration

    output_files = operations.resize_frames(
        str(multi_frame_file), str(output_dir), target_duration
    )

    frames = list(gwframe.read_frames(output_files[0]))

    # Should have same number of frames as original (no splitting/merging)
    assert len(frames) == 3

    # Frames should keep their original duration
    for frame in frames:
        assert abs(frame.duration - sample_data["duration"]) < 1e-9

    # Data should be preserved
    assert np.allclose(frames[0]["L1:DATA"].array, 0.0)
    assert np.allclose(frames[1]["L1:DATA"].array, 1.0)
    assert np.allclose(frames[2]["L1:DATA"].array, 2.0)

test_resize_preserve_data

test_resize_preserve_data(multi_frame_file, tmp_path, sample_data)

Test that resizing preserves data values.

Source code in gwframe/tests/test_operations.py
def test_resize_preserve_data(self, multi_frame_file, tmp_path, sample_data):
    """Test that resizing preserves data values."""
    output_dir = tmp_path / "output"
    target_duration = sample_data["duration"] / 2

    output_files = operations.resize_frames(
        str(multi_frame_file), str(output_dir), target_duration
    )

    frames = list(gwframe.read_frames(output_files[0]))

    # First original frame had value 0.0
    assert np.allclose(frames[0]["L1:DATA"].array, 0.0)
    assert np.allclose(frames[1]["L1:DATA"].array, 0.0)

    # Second original frame had value 1.0
    assert np.allclose(frames[2]["L1:DATA"].array, 1.0)
    assert np.allclose(frames[3]["L1:DATA"].array, 1.0)

test_resize_split_frames

test_resize_split_frames(multi_frame_file, tmp_path, sample_data)

Test splitting frames into smaller duration.

Source code in gwframe/tests/test_operations.py
def test_resize_split_frames(self, multi_frame_file, tmp_path, sample_data):
    """Test splitting frames into smaller duration."""
    output_dir = tmp_path / "output"
    target_duration = sample_data["duration"] / 2  # Split each frame in half

    output_files = operations.resize_frames(
        str(multi_frame_file), str(output_dir), target_duration
    )

    assert len(output_files) == 1

    # Should have double the frames
    frames = list(gwframe.read_frames(output_files[0]))
    assert len(frames) == 6  # 3 original frames x 2

    # Verify durations
    for frame in frames:
        assert abs(frame.duration - target_duration) < 1e-9

TestSelectChannels

Tests for select_channels operation.

test_select_empty_raises

test_select_empty_raises(single_frame_file, tmp_path)

Test that empty channels_to_select raises ValueError.

Source code in gwframe/tests/test_operations.py
def test_select_empty_raises(self, single_frame_file, tmp_path):
    """Test that empty channels_to_select raises ValueError."""
    output_dir = tmp_path / "output"

    with pytest.raises(ValueError, match="channels_to_select"):
        operations.select_channels(str(single_frame_file), str(output_dir), [])

test_select_in_place

test_select_in_place(single_frame_file, tmp_path)

Test select with in_place=True.

Source code in gwframe/tests/test_operations.py
def test_select_in_place(self, single_frame_file, tmp_path):
    """Test select with in_place=True."""
    import shutil

    test_file = tmp_path / "test.gwf"
    shutil.copy(single_frame_file, test_file)

    operations.select_channels(
        str(test_file), channels_to_select=["L1:CHAN1"], in_place=True
    )

    frames = list(gwframe.read_frames(str(test_file)))
    assert "L1:CHAN1" in frames[0]
    assert "L1:CHAN2" not in frames[0]
    assert "L1:CHAN3" not in frames[0]

test_select_is_inverse_of_drop

test_select_is_inverse_of_drop(single_frame_file, tmp_path)

Test that select is the inverse of drop.

Source code in gwframe/tests/test_operations.py
def test_select_is_inverse_of_drop(self, single_frame_file, tmp_path):
    """Test that select is the inverse of drop."""
    select_dir = tmp_path / "select_output"
    drop_dir = tmp_path / "drop_output"

    # Select CHAN1 = drop CHAN2 and CHAN3
    operations.select_channels(
        str(single_frame_file), str(select_dir), ["L1:CHAN1"]
    )
    operations.drop_channels(
        str(single_frame_file), str(drop_dir), ["L1:CHAN2", "L1:CHAN3"]
    )

    select_file = select_dir / single_frame_file.name
    drop_file = drop_dir / single_frame_file.name

    select_channels = sorted(get_channels(str(select_file)))
    drop_channels = sorted(get_channels(str(drop_file)))
    assert select_channels == drop_channels

    # Verify data matches
    select_data = gwframe.read(str(select_file), "L1:CHAN1")
    drop_data = gwframe.read(str(drop_file), "L1:CHAN1")
    assert np.allclose(select_data.array, drop_data.array)

test_select_multiple_channels

test_select_multiple_channels(single_frame_file, tmp_path, channels_to_select, expected_kept)

Test selecting different numbers of channels.

Source code in gwframe/tests/test_operations.py
@pytest.mark.parametrize(
    ("channels_to_select", "expected_kept"),
    [
        (["L1:CHAN1"], ["L1:CHAN1"]),
        (["L1:CHAN1", "L1:CHAN2"], ["L1:CHAN1", "L1:CHAN2"]),
        (
            ["L1:CHAN1", "L1:CHAN2", "L1:CHAN3"],
            ["L1:CHAN1", "L1:CHAN2", "L1:CHAN3"],
        ),
    ],
)
def test_select_multiple_channels(
    self, single_frame_file, tmp_path, channels_to_select, expected_kept
):
    """Test selecting different numbers of channels."""
    output_dir = tmp_path / "output"

    output_files = operations.select_channels(
        str(single_frame_file), str(output_dir), channels_to_select
    )

    frames = list(gwframe.read_frames(output_files[0]))
    remaining_channels = list(frames[0].keys())

    assert set(remaining_channels) == set(expected_kept)

test_select_preserves_data

test_select_preserves_data(single_frame_file, tmp_path)

Test that select preserves channel data.

Source code in gwframe/tests/test_operations.py
def test_select_preserves_data(self, single_frame_file, tmp_path):
    """Test that select preserves channel data."""
    output_dir = tmp_path / "output"

    # Read original data
    original_data = gwframe.read(str(single_frame_file), "L1:CHAN1")

    output_files = operations.select_channels(
        str(single_frame_file), str(output_dir), ["L1:CHAN1"]
    )

    # Verify data is preserved
    result_data = gwframe.read(output_files[0], "L1:CHAN1")
    assert np.allclose(original_data.array, result_data.array)

test_select_single_channel

test_select_single_channel(single_frame_file, tmp_path)

Test selecting a single channel (dropping all others).

Source code in gwframe/tests/test_operations.py
def test_select_single_channel(self, single_frame_file, tmp_path):
    """Test selecting a single channel (dropping all others)."""
    output_dir = tmp_path / "output"
    channels_to_select = ["L1:CHAN2"]

    output_files = operations.select_channels(
        str(single_frame_file), str(output_dir), channels_to_select
    )

    assert len(output_files) == 1

    # Verify only the selected channel remains
    frames = list(gwframe.read_frames(output_files[0]))
    assert "L1:CHAN2" in frames[0]
    assert "L1:CHAN1" not in frames[0]
    assert "L1:CHAN3" not in frames[0]

create_test_frame

create_test_frame()

Factory fixture to create test frames with specified channels.

Source code in gwframe/tests/test_operations.py
@pytest.fixture
def create_test_frame():
    """Factory fixture to create test frames with specified channels."""

    def _create_frame(t0, duration, channels, frame_number=0):
        """
        Create a frame with specified channels.

        Parameters
        ----------
        t0 : float
            Frame start time
        duration : float
            Frame duration
        channels : dict
            Dictionary mapping channel names to data arrays
        frame_number : int
            Frame number

        Returns
        -------
        frame : Frame
            Created frame object
        """
        frame = Frame(
            start=t0, duration=duration, name="TEST", run=1, frame_number=frame_number
        )
        for channel_name, data in channels.items():
            sample_rate = len(data) / duration
            frame.add_channel(
                channel_name, data, sample_rate=sample_rate, unit="counts"
            )
        return frame

    return _create_frame

multi_frame_file

multi_frame_file(tmp_path, sample_data, create_test_frame)

Create a multi-frame GWF file for testing.

Source code in gwframe/tests/test_operations.py
@pytest.fixture
def multi_frame_file(tmp_path, sample_data, create_test_frame):
    """Create a multi-frame GWF file for testing."""
    file_path = tmp_path / "multi.gwf"
    n_frames = 3
    n_samples = sample_data["n_samples"]

    with FrameWriter(str(file_path)) as writer:
        for i in range(n_frames):
            t0 = sample_data["t0"] + i * sample_data["duration"]
            channels = {
                "L1:DATA": np.full(n_samples, float(i), dtype=np.float32),
                "L1:AUX": np.full(n_samples, float(i * 10), dtype=np.float32),
            }
            frame = create_test_frame(
                t0, sample_data["duration"], channels, frame_number=i
            )
            writer.write_frame(frame)

    return file_path

multiple_files

multiple_files(tmp_path, sample_data, create_test_frame)

Create multiple GWF files for testing operations that work on file sets.

Source code in gwframe/tests/test_operations.py
@pytest.fixture
def multiple_files(tmp_path, sample_data, create_test_frame):
    """Create multiple GWF files for testing operations that work on file sets."""
    files = []
    n_files = 3
    n_samples = sample_data["n_samples"]

    for file_idx in range(n_files):
        file_path = tmp_path / f"file{file_idx + 1}.gwf"
        files.append(file_path)

        with FrameWriter(str(file_path)) as writer:
            for frame_idx in range(2):  # 2 frames per file
                global_idx = file_idx * 2 + frame_idx
                t0 = sample_data["t0"] + global_idx * sample_data["duration"]
                channels = {
                    "L1:DATA": np.arange(n_samples, dtype=np.float32)
                    + global_idx * n_samples,
                }
                frame = create_test_frame(
                    t0, sample_data["duration"], channels, frame_number=global_idx
                )
                writer.write_frame(frame)

    return files

sample_data

sample_data()

Generate sample data for testing.

Source code in gwframe/tests/test_operations.py
@pytest.fixture
def sample_data():
    """Generate sample data for testing."""
    return {
        "t0": 1234567890.0,
        "duration": 1.0,
        "sample_rate": 16.0,
        "n_samples": 16,
    }

single_frame_file

single_frame_file(tmp_path, sample_data, create_test_frame)

Create a single-frame GWF file for testing.

Source code in gwframe/tests/test_operations.py
@pytest.fixture
def single_frame_file(tmp_path, sample_data, create_test_frame):
    """Create a single-frame GWF file for testing."""
    file_path = tmp_path / "single.gwf"
    n_samples = sample_data["n_samples"]

    with FrameWriter(str(file_path)) as writer:
        channels = {
            "L1:CHAN1": np.arange(n_samples, dtype=np.float32),
            "L1:CHAN2": np.arange(n_samples, dtype=np.float32) * 2,
            "L1:CHAN3": np.arange(n_samples, dtype=np.float32) * 3,
        }
        frame = create_test_frame(
            sample_data["t0"], sample_data["duration"], channels, frame_number=0
        )
        writer.write_frame(frame)

    return file_path