Skip to content

operations

Core operations for manipulating GWF files.

combine_channels

combine_channels(input_sources: Sequence[str | PathLike[str]], output_dir: str | PathLike[str], keep_channels: Sequence[str] | None = None, drop_channels: Sequence[str] | None = None) -> list[str]

Combine channels from multiple frame sources into single files.

Takes N sources (all files or all directories) covering the same time ranges and combines their channels. Sources are matched by time range.

Parameters:

Name Type Description Default
input_sources sequence of str or path-like

List of N source files or N source directories to combine. All sources must be the same type (all files or all directories).

required
output_dir str or path - like

Directory where output files will be written

required
keep_channels sequence of str

If specified, only include these channels in the output. Mutually exclusive with drop_channels.

None
drop_channels sequence of str

If specified, exclude these channels from the output. Mutually exclusive with keep_channels.

None

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Combine 2 files covering the same time range
>>> gwframe.combine_channels(['file1.gwf', 'file2.gwf'], 'output/')
>>> # Combine and keep only specific channels
>>> gwframe.combine_channels(
...     ['file1.gwf', 'file2.gwf'], 'output/',
...     keep_channels=['L1:STRAIN', 'L1:LSC']
... )
>>> # Combine and drop specific channels
>>> gwframe.combine_channels(
...     ['dir1/', 'dir2/'], 'output/',
...     drop_channels=['L1:UNWANTED']
... )
Notes

All sources must have matching frame structures (same times and durations). Raises detailed error messages if frames don't align.

Source code in gwframe/operations.py
def combine_channels(
    input_sources: Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str],
    keep_channels: Sequence[str] | None = None,
    drop_channels: Sequence[str] | None = None,
) -> list[str]:
    """
    Combine channels from multiple frame sources into single files.

    Takes N sources (all files or all directories) covering the same time ranges
    and combines their channels. Sources are matched by time range.

    Parameters
    ----------
    input_sources : sequence of str or path-like
        List of N source files or N source directories to combine.
        All sources must be the same type (all files or all directories).
    output_dir : str or path-like
        Directory where output files will be written
    keep_channels : sequence of str, optional
        If specified, only include these channels in the output.
        Mutually exclusive with drop_channels.
    drop_channels : sequence of str, optional
        If specified, exclude these channels from the output.
        Mutually exclusive with keep_channels.

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Combine 2 files covering the same time range
    >>> gwframe.combine_channels(['file1.gwf', 'file2.gwf'], 'output/')

    >>> # Combine and keep only specific channels
    >>> gwframe.combine_channels(
    ...     ['file1.gwf', 'file2.gwf'], 'output/',
    ...     keep_channels=['L1:STRAIN', 'L1:LSC']
    ... )

    >>> # Combine and drop specific channels
    >>> gwframe.combine_channels(
    ...     ['dir1/', 'dir2/'], 'output/',
    ...     drop_channels=['L1:UNWANTED']
    ... )

    Notes
    -----
    All sources must have matching frame structures (same times and durations).
    Raises detailed error messages if frames don't align.
    """
    if len(input_sources) < 2:
        msg = "combine_channels requires at least 2 sources"
        raise ValueError(msg)

    if keep_channels is not None and drop_channels is not None:
        msg = "keep_channels and drop_channels are mutually exclusive"
        raise ValueError(msg)

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Check if sources are files or directories
    source_paths = [Path(s) for s in input_sources]
    are_files = [p.is_file() for p in source_paths]
    are_dirs = [p.is_dir() for p in source_paths]

    if not (all(are_files) or all(are_dirs)):
        msg = "All sources must be the same type (all files or all directories)"
        raise ValueError(msg)

    if all(are_files):
        return _combine_files(source_paths, output_dir, keep_channels, drop_channels)
    return _combine_directories(source_paths, output_dir, keep_channels, drop_channels)

drop_channels

drop_channels(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, channels_to_drop: Sequence[str] | None = None, *, in_place: bool = False) -> list[str]

Remove specified channels from frame files.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False. Mutually exclusive with in_place=True.

None
channels_to_drop sequence of str

List of channel names to remove

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> gwframe.drop_channels(
...     'input.gwf',
...     'output/',
...     ['L1:UNWANTED_CHANNEL']
... )
>>> # In place
>>> gwframe.drop_channels(
...     'input.gwf',
...     channels_to_drop=['L1:UNWANTED_CHANNEL'],
...     in_place=True
... )
Source code in gwframe/operations.py
def drop_channels(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    channels_to_drop: Sequence[str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Remove specified channels from frame files.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False. Mutually exclusive with in_place=True.
    channels_to_drop : sequence of str
        List of channel names to remove
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> gwframe.drop_channels(
    ...     'input.gwf',
    ...     'output/',
    ...     ['L1:UNWANTED_CHANNEL']
    ... )

    >>> # In place
    >>> gwframe.drop_channels(
    ...     'input.gwf',
    ...     channels_to_drop=['L1:UNWANTED_CHANNEL'],
    ...     in_place=True
    ... )
    """
    if not channels_to_drop:
        msg = "channels_to_drop must be provided and non-empty"
        raise ValueError(msg)

    def _drop_file(input_file, output_file):
        """Process a single file, dropping specified channels."""
        frames = read_frames(input_file)
        channels_set = set(channels_to_drop)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Drop channels from frame
                for channel_name in channels_set:
                    if channel_name in frame:
                        del frame[channel_name]
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _drop_file, in_place=in_place
    )

impute_missing_data

impute_missing_data(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, replace_value: float = nan, fill_value: float = 0.0, channels: Sequence[str] | None = None, *, in_place: bool = False) -> list[str]

Replace specific values in frame file channels with a fill value.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False. Mutually exclusive with in_place=True.

None
replace_value float

Value to replace (default: NaN). Can be NaN or any numeric value.

nan
fill_value float

Value to use for replacement (default: 0.0). Will be cast to appropriate dtype.

0.0
channels sequence of str

If specified, only impute these channels. Otherwise imputes all channels.

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Replace NaNs with 0 in all channels
>>> gwframe.impute_missing_data('input.gwf', 'output/')
>>> # In place
>>> gwframe.impute_missing_data('input.gwf', in_place=True)
>>> # Replace specific value in specific channels
>>> gwframe.impute_missing_data(
...     'input.gwf', 'output/',
...     replace_value=-999.0,
...     fill_value=0.0,
...     channels=['L1:STRAIN']
... )
Source code in gwframe/operations.py
def impute_missing_data(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    replace_value: float = np.nan,
    fill_value: float = 0.0,
    channels: Sequence[str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Replace specific values in frame file channels with a fill value.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False. Mutually exclusive with in_place=True.
    replace_value : float, optional
        Value to replace (default: NaN). Can be NaN or any numeric value.
    fill_value : float, optional
        Value to use for replacement (default: 0.0). Will be cast to appropriate dtype.
    channels : sequence of str, optional
        If specified, only impute these channels. Otherwise imputes all channels.
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Replace NaNs with 0 in all channels
    >>> gwframe.impute_missing_data('input.gwf', 'output/')

    >>> # In place
    >>> gwframe.impute_missing_data('input.gwf', in_place=True)

    >>> # Replace specific value in specific channels
    >>> gwframe.impute_missing_data(
    ...     'input.gwf', 'output/',
    ...     replace_value=-999.0,
    ...     fill_value=0.0,
    ...     channels=['L1:STRAIN']
    ... )
    """
    channels_set = set(channels) if channels else None
    is_nan_replacement = np.isnan(replace_value)

    def _impute_file(input_file, output_file):
        """Process a single file, imputing missing data."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Determine which channels to impute
                if channels_set is not None:
                    channels_to_impute = channels_set & set(frame.keys())
                else:
                    channels_to_impute = set(frame.keys())

                # Process each channel
                for channel_name in channels_to_impute:
                    ts = frame[channel_name]
                    data = ts.array.copy()
                    # Cast fill_value to appropriate dtype and replace
                    fill = np.array(fill_value).astype(data.dtype)
                    if is_nan_replacement:
                        data = np.where(np.isnan(data), fill, data)
                    else:
                        data = np.where(data == replace_value, fill, data)

                    frame[channel_name] = TimeSeries(
                        array=data,
                        name=ts.name,
                        dtype=ts.dtype,
                        t0=ts.t0,
                        dt=ts.dt,
                        duration=ts.duration,
                        sample_rate=ts.sample_rate,
                        unit=ts.unit,
                        type=ts.type,
                    )

                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _impute_file, in_place=in_place
    )

recompress_frames

recompress_frames(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6, *, in_place: bool = False) -> list[str]

Rewrite frame files with different compression settings.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str or path - like

Directory where output files will be written. Required if in_place=False. Ignored if in_place=True.

None
compression int

Compression scheme (e.g., Compression.RAW, Compression.GZIP)

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6)

6
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Remove compression
>>> gwframe.recompress_frames('input.gwf', 'output/',
...                           compression=gwframe.Compression.RAW)
>>> # Maximum compression, in place
>>> gwframe.recompress_frames('input.gwf',
...                           compression=gwframe.Compression.GZIP,
...                           compression_level=9,
...                           in_place=True)
Source code in gwframe/operations.py
def recompress_frames(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Rewrite frame files with different compression settings.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str or path-like, optional
        Directory where output files will be written.
        Required if in_place=False. Ignored if in_place=True.
    compression : int
        Compression scheme (e.g., Compression.RAW, Compression.GZIP)
    compression_level : int, optional
        Compression level 0-9 (default: 6)
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Remove compression
    >>> gwframe.recompress_frames('input.gwf', 'output/',
    ...                           compression=gwframe.Compression.RAW)

    >>> # Maximum compression, in place
    >>> gwframe.recompress_frames('input.gwf',
    ...                           compression=gwframe.Compression.GZIP,
    ...                           compression_level=9,
    ...                           in_place=True)
    """

    def _recompress_file(input_file, output_file):
        """Process a single file, rewriting with different compression."""
        frames = read_frames(input_file)

        with FrameWriter(
            output_file, compression=compression, compression_level=compression_level
        ) as writer:
            for frame in frames:
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _recompress_file, in_place=in_place
    )

rename_channels

rename_channels(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, channel_map: dict[str, str] | None = None, *, in_place: bool = False) -> list[str]

Rename channels in frame files.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False, ignored if in_place=True.

None
channel_map dict

Mapping of old channel names to new channel names

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Write to output directory
>>> gwframe.rename_channels(
...     'input.gwf',
...     'output/',
...     {'L1:OLD_NAME': 'L1:NEW_NAME'}
... )
>>> # Modify in place
>>> gwframe.rename_channels(
...     'input.gwf',
...     channel_map={'L1:OLD_NAME': 'L1:NEW_NAME'},
...     in_place=True
... )
Source code in gwframe/operations.py
def rename_channels(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    channel_map: dict[str, str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Rename channels in frame files.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False, ignored if in_place=True.
    channel_map : dict
        Mapping of old channel names to new channel names
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Write to output directory
    >>> gwframe.rename_channels(
    ...     'input.gwf',
    ...     'output/',
    ...     {'L1:OLD_NAME': 'L1:NEW_NAME'}
    ... )

    >>> # Modify in place
    >>> gwframe.rename_channels(
    ...     'input.gwf',
    ...     channel_map={'L1:OLD_NAME': 'L1:NEW_NAME'},
    ...     in_place=True
    ... )
    """
    if not channel_map:
        msg = "channel_map must be provided and non-empty"
        raise ValueError(msg)

    def _rename_file(input_file, output_file):
        """Process a single file, renaming channels."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Rename channels in frame
                for old_name, new_name in channel_map.items():
                    if old_name in frame:
                        frame[new_name] = frame.pop(old_name)
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _rename_file, in_place=in_place
    )

replace_channels

replace_channels(base_files: str | PathLike[str] | Sequence[str | PathLike[str]], update_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str], channels_to_replace: Sequence[str] | None = None) -> list[str]

Replace data in channels with updated versions from another frame file.

Parameters:

Name Type Description Default
base_files str, path-like, or sequence of str/path-like

Base GWF file(s) to process

required
update_files str, path-like, or sequence of str/path-like

GWF file(s) containing updated channel data

required
output_dir str or path - like

Directory where output files will be written

required
channels_to_replace sequence of str

List of channel names to replace. If None, replaces all channels found in update_files.

None

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> gwframe.replace_channels(
...     'base.gwf',
...     'updated.gwf',
...     'output/',
...     ['L1:STRAIN']
... )
Source code in gwframe/operations.py
def replace_channels(
    base_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    update_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str],
    channels_to_replace: Sequence[str] | None = None,
) -> list[str]:
    """
    Replace data in channels with updated versions from another frame file.

    Parameters
    ----------
    base_files : str, path-like, or sequence of str/path-like
        Base GWF file(s) to process
    update_files : str, path-like, or sequence of str/path-like
        GWF file(s) containing updated channel data
    output_dir : str or path-like
        Directory where output files will be written
    channels_to_replace : sequence of str, optional
        List of channel names to replace. If None, replaces all channels
        found in update_files.

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> gwframe.replace_channels(
    ...     'base.gwf',
    ...     'updated.gwf',
    ...     'output/',
    ...     ['L1:STRAIN']
    ... )
    """
    if isinstance(base_files, str | PathLike):
        base_files = [base_files]
    if isinstance(update_files, str | PathLike):
        update_files = [update_files]

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    output_files = []

    for base_file in base_files:
        output_file = output_dir / Path(base_file).name
        output_files.append(str(output_file))

        base_frames = read_frames(base_file)

        with FrameWriter(str(output_file)) as writer:
            for frame in base_frames:
                t0 = frame.t0
                duration = frame.duration

                # Read update data from matching time range
                update_data: dict[str, TimeSeries] = {}
                for update_file in update_files:
                    try:
                        data: dict[str, TimeSeries] = read(
                            update_file, channel=None, start=t0, end=t0 + duration
                        )
                        update_data.update(data)
                    except (ValueError, FileNotFoundError):
                        continue

                # Determine which channels to replace
                if channels_to_replace is None:
                    channels_to_replace_set = set(update_data.keys())
                else:
                    channels_to_replace_set = set(channels_to_replace)

                # Replace specified channels with update data
                for channel_name in channels_to_replace_set:
                    if channel_name in update_data:
                        frame[channel_name] = update_data[channel_name]

                # Add any new channels from update data
                for channel_name, ts in update_data.items():
                    if channel_name not in frame:
                        frame[channel_name] = ts

                writer.write_frame(frame)

    return output_files

resize_frames

resize_frames(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, target_duration: float | None = None, *, in_place: bool = False) -> list[str]

Resize frames to a different duration (e.g., 64s frames to 4s frames).

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str or path - like

Directory where output files will be written. Required if in_place=False. Ignored if in_place=True.

None
target_duration float

Target frame duration in seconds

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Split 64-second frames into 4-second frames
>>> gwframe.resize_frames('input.gwf', 'output/', target_duration=4.0)
>>> # Split frames in place
>>> gwframe.resize_frames('input.gwf', target_duration=4.0, in_place=True)
Notes

When splitting frames (target_duration < source_duration), data is divided evenly. When merging frames (target_duration > source_duration), consecutive frames are combined.

Source code in gwframe/operations.py
def resize_frames(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    target_duration: float | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Resize frames to a different duration (e.g., 64s frames to 4s frames).

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str or path-like, optional
        Directory where output files will be written.
        Required if in_place=False. Ignored if in_place=True.
    target_duration : float
        Target frame duration in seconds
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Split 64-second frames into 4-second frames
    >>> gwframe.resize_frames('input.gwf', 'output/', target_duration=4.0)

    >>> # Split frames in place
    >>> gwframe.resize_frames('input.gwf', target_duration=4.0, in_place=True)

    Notes
    -----
    When splitting frames (target_duration < source_duration), data is divided
    evenly. When merging frames (target_duration > source_duration), consecutive
    frames are combined.
    """
    if target_duration is None or target_duration <= 0:
        msg = "target_duration must be a positive number"
        raise ValueError(msg)

    def _resize_file(input_file, output_file):
        """Process a single file, resizing frames."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            frame_number = 0

            for frame in frames:
                source_t0 = frame.t0
                source_duration = frame.duration

                # Calculate how many target frames fit in this source frame
                num_splits = int(source_duration / target_duration)

                if num_splits >= 1:
                    # Split into smaller frames
                    for split_idx in range(num_splits):
                        split_t0 = source_t0 + split_idx * target_duration

                        new_frame = Frame(
                            t0=split_t0,
                            duration=target_duration,
                            name=frame.name,
                            run=frame.run,
                            frame_number=frame_number,
                        )

                        # Slice data for this split
                        for channel_name, ts in frame.items():
                            start_sample = int(
                                split_idx * target_duration * ts.sample_rate
                            )
                            end_sample = int(
                                (split_idx + 1) * target_duration * ts.sample_rate
                            )
                            sliced_data = ts.array[start_sample:end_sample]

                            new_frame.add_channel(
                                channel_name,
                                sliced_data,
                                ts.sample_rate,
                                unit=ts.unit,
                                channel_type=ts.type,
                            )

                        writer.write_frame(new_frame)
                        frame_number += 1
                else:
                    # Keep original frame (target_duration >= source_duration)
                    frame.frame_number = frame_number
                    writer.write_frame(frame)
                    frame_number += 1

    return _process_files_with_operation(
        input_files, output_dir, _resize_file, in_place=in_place
    )