Skip to content

gwframe

ChannelNotFoundError

ChannelNotFoundError(channels: str | list[str], available_channels: list[str], source: str | None = None)

Bases: ValueError

Raised when one or more requested channels are not found in a GWF file.

Provides suggestions for similar channel names and lists available channels.

Attributes:

Name Type Description
channels list[str]

The channel name(s) that were not found

available_channels list[str]

List of all available channel names in the file

source str or None

Source file path or description

Source code in gwframe/errors.py
def __init__(
    self,
    channels: str | list[str],
    available_channels: list[str],
    source: str | None = None,
):
    # Normalize to list
    self.channels = [channels] if isinstance(channels, str) else list(channels)
    self.available_channels = available_channels
    self.source = source
    super().__init__(str(self))

__str__

__str__() -> str

Generate helpful error message with suggestions.

Source code in gwframe/errors.py
def __str__(self) -> str:
    """Generate helpful error message with suggestions."""
    num_missing = len(self.channels)

    # Build base error message
    if num_missing == 1:
        parts = [f"Channel '{self.channels[0]}' not found"]
    else:
        missing_str = ", ".join(self.channels)
        parts = [f"Channels not found ({num_missing}): {missing_str}"]

    if self.source:
        parts[0] += f" in '{self.source}'"
    parts[0] += "."

    # For single channel, try to find similar names
    if num_missing == 1:
        similar = get_close_matches(
            self.channels[0], self.available_channels, n=3, cutoff=0.6
        )
        if similar:
            if len(similar) == 1:
                parts.append(f"Did you mean '{similar[0]}'?")
            else:
                suggestions = "', '".join(similar)
                parts.append(f"Did you mean one of: '{suggestions}'?")

    # List available channels (limit to avoid overwhelming output)
    num_channels = len(self.available_channels)
    if num_channels == 0:
        parts.append("No channels found in file.")
    elif num_channels <= 10:
        channel_list = ", ".join(self.available_channels)
        parts.append(f"Available channels ({num_channels}): {channel_list}")
    else:
        # Show first 10 channels
        channel_list = ", ".join(self.available_channels[:10])
        parts.append(
            f"Available channels ({num_channels}, showing first 10): "
            f"{channel_list}, ..."
        )

    return "\n".join(parts)

ChannelType

Bases: str, Enum

Channel data types in GWF files.

  • PROC: Processed data (FrProcData)
  • ADC: Raw ADC data (FrAdcData)
  • SIM: Simulated data (FrSimData)

Compression

Bases: IntEnum

Compression schemes for GWF files.

See LIGO-T970130 for details on compression algorithms.

Standard modes: - RAW: No compression - GZIP: Standard GZIP compression - DIFF_GZIP: Differentiate data then apply GZIP - ZERO_SUPPRESS_WORD_2: Zero-suppress 2-byte (16-bit) words - ZERO_SUPPRESS_WORD_4: Zero-suppress 4-byte (32-bit) words - ZERO_SUPPRESS_WORD_8: Zero-suppress 8-byte (64-bit) words

Meta modes (adaptive): - ZERO_SUPPRESS_OTHERWISE_GZIP: Zero-suppress integers, GZIP floats (recommended) - BEST_COMPRESSION: Try all modes, use best compression ratio

Aliases: - ZERO_SUPPRESS_SHORT: Alias for ZERO_SUPPRESS_WORD_2 - ZERO_SUPPRESS_INT_FLOAT: Alias for ZERO_SUPPRESS_WORD_4

DetectorLocation

Bases: IntEnum

Detector location identifiers for GWF files.

These constants identify specific gravitational wave detectors and are used with the GetDetector function.

  • G1: GEO600 (Germany)
  • H1: LIGO Hanford 4km (USA)
  • H2: LIGO Hanford 2km (USA, decommissioned)
  • K1: KAGRA (Japan)
  • L1: LIGO Livingston 4km (USA)
  • T1: TAMA300 (Japan, decommissioned)
  • V1: Virgo (Italy)

FrProcDataSubType

Bases: IntEnum

Subtype classification for FrProcData structures.

Provides detailed information about the data processing or analysis type.

FrProcDataType

Bases: IntEnum

Type classification for FrProcData structures.

Indicates the dimensionality and structure of processed data.

FrVectType

Bases: IntEnum

Data types for FrVect arrays.

Provides human-readable aliases for frameCPP data type constants.

Frame

Frame(t0: float, duration: float, name: str = '', run: int = 0, frame_number: int = 0)

Bases: MutableMapping

High-level interface for creating and manipulating GWF frames.

This class provides a Pythonic interface to the underlying frameCPP FrameH class, with simplified methods for adding data and metadata.

Parameters:

Name Type Description Default
t0 float

GPS start time of the frame

required
duration float

Duration of the frame in seconds

required
name str

Frame name (e.g., 'L1' for LIGO Livingston)

''
run int

Run number (default: 0, negative for simulated data)

0
Notes

Detector information is automatically added to the frame based on channel names. When you add a channel with a name like 'L1:TEST', the detector information for L1 will be automatically included.

Examples:

>>> frame = gwframe.Frame(t0=1234567890.0, duration=1.0, name='L1', run=1)
>>> frame.add_channel('L1:TEST', data=np.random.randn(16384),
...                   dt=1.0/16384, unit='counts')
>>> frame.write('output.gwf')
Source code in gwframe/write.py
def __init__(
    self,
    t0: float,
    duration: float,
    name: str = "",
    run: int = 0,
    frame_number: int = 0,
):
    # Convert GPS time
    self._gps_time = _core.gpstime_from_float(t0)

    # Get leap seconds from GPS time (TAI-UTC offset)
    leap_seconds = self._gps_time.get_leap_seconds()

    # Use full constructor with frame_number and leap_seconds
    self._frame = _core.FrameH(
        name, run, frame_number, self._gps_time, leap_seconds, duration
    )

    # Store for convenience
    self._t0 = t0
    self.duration = duration
    self.name = name
    self.run = run
    self.frame_number = frame_number

    # Keep references to vects to prevent premature garbage collection
    # (C++ frame holds raw pointers, so Python must keep objects alive)
    self._vects: list[_core.FrVect] = []
    self._frdatas: list[_core.FrProcData | _core.FrSimData] = []

    # Track which detectors we've added to avoid duplicates
    self._detectors_added: set[str] = set()

    # Store channels for dict-like access
    self._channels: dict[str, TimeSeries] = {}

    # Track if channels were modified via dict interface
    self._channels_modified = False

t0 property writable

t0: float

GPS start time of the frame.

__delitem__

__delitem__(key: str) -> None

Delete channel by name. Frame will be rebuilt on write.

Source code in gwframe/write.py
def __delitem__(self, key: str) -> None:
    """Delete channel by name. Frame will be rebuilt on write."""
    del self._channels[key]
    self._channels_modified = True

__getitem__

__getitem__(key: str) -> TimeSeries

Get channel by name.

Source code in gwframe/write.py
def __getitem__(self, key: str) -> TimeSeries:
    """Get channel by name."""
    return self._channels[key]

__iter__

__iter__()

Iterate over channel names.

Source code in gwframe/write.py
def __iter__(self):
    """Iterate over channel names."""
    return iter(self._channels)

__len__

__len__() -> int

Return number of channels.

Source code in gwframe/write.py
def __len__(self) -> int:
    """Return number of channels."""
    return len(self._channels)

__setitem__

__setitem__(key: str, value: TimeSeries) -> None

Set/update channel with TimeSeries.

Source code in gwframe/write.py
def __setitem__(self, key: str, value: TimeSeries) -> None:
    """Set/update channel with TimeSeries."""
    self._channels[key] = value
    self._channels_modified = True

add_channel

add_channel(channel: str, data: NDArray, sample_rate: float, unit: str = '', comment: str = '', channel_type: str = 'proc')

Add a data channel to this frame.

Parameters:

Name Type Description Default
channel str

Channel name (e.g., 'L1:TEST-CHANNEL')

required
data ndarray

1D NumPy array containing the channel data

required
sample_rate float

Sample rate in Hz (samples per second)

required
unit str

Physical unit of the data (e.g., 'strain', 'counts')

''
comment str

Comment or description for this channel

''
channel_type str

Type of channel: 'proc' (processed, default) or 'sim' (simulated)

'proc'

Examples:

>>> frame.add_channel('L1:TEST', data=np.random.randn(16384),
...                   sample_rate=16384, unit='counts')
Notes

The data type (float64, float32, int32, etc.) is automatically determined from the NumPy array dtype.

Source code in gwframe/write.py
def add_channel(
    self,
    channel: str,
    data: npt.NDArray,
    sample_rate: float,
    unit: str = "",
    comment: str = "",
    channel_type: str = "proc",
):
    """
    Add a data channel to this frame.

    Parameters
    ----------
    channel : str
        Channel name (e.g., 'L1:TEST-CHANNEL')
    data : np.ndarray
        1D NumPy array containing the channel data
    sample_rate : float
        Sample rate in Hz (samples per second)
    unit : str, optional
        Physical unit of the data (e.g., 'strain', 'counts')
    comment : str, optional
        Comment or description for this channel
    channel_type : str, optional
        Type of channel: 'proc' (processed, default) or 'sim' (simulated)

    Examples
    --------
    >>> frame.add_channel('L1:TEST', data=np.random.randn(16384),
    ...                   sample_rate=16384, unit='counts')

    Notes
    -----
    The data type (float64, float32, int32, etc.) is automatically
    determined from the NumPy array dtype.
    """
    # Ensure data is a 1D numpy array
    if data.ndim != 1:
        msg = f"Data must be 1D array, got shape {data.shape}"
        raise ValueError(msg)

    # Extract detector prefix from channel name (e.g., 'L1:TEST' -> 'L1')
    # and add detector information if it's a known detector
    if ":" in channel:
        prefix = channel.split(":", 1)[0]
        if prefix not in self._detectors_added:
            try:
                # Check if this is a valid detector
                detector_loc = DetectorLocation[prefix]
                # Get detector info for this location and GPS time
                detector = _core.get_detector(detector_loc, self._gps_time)
                # Append to frame based on channel type
                if channel_type == "sim":
                    self._frame.append_fr_detector_sim(detector)
                else:  # 'proc' or 'adc'
                    self._frame.append_fr_detector_proc(detector)
                # Mark this detector as added
                self._detectors_added.add(prefix)
            except KeyError:
                # Not a known detector prefix, that's okay
                pass

    n_samples = len(data)

    # Convert sample_rate to dt (sample spacing)
    dt = 1.0 / sample_rate

    if data.dtype not in _DTYPE_TO_FRVECT:
        msg = (
            f"Unsupported data type: {data.dtype}. "
            f"Supported types: {list(_DTYPE_TO_FRVECT.keys())}"
        )
        raise ValueError(msg)

    frvect_type = _DTYPE_TO_FRVECT[data.dtype]

    # Create dimension
    dim = _core.Dimension(n_samples, dt, "s", 0.0)

    # Create FrVect and populate with data
    vect = _core.FrVect(channel, frvect_type, 1, dim, unit)
    # Direct C++ memcpy ~50% faster than get_data_array()[:] = data
    vect.set_data(data)

    # Create appropriate Fr*Data container and add to frame
    if channel_type == "proc":
        # Calculate Nyquist frequency (frange) from sample rate
        frange = sample_rate / 2.0  # Nyquist frequency

        # FrProcData: name, comment, type, subtype, time_offset, trange,
        # fshift, phase, frange, bandwidth
        frdata = _core.FrProcData(
            channel, comment, 1, 0, 0.0, self.duration, 0.0, 0.0, frange, 0.0
        )
        frdata.append_data(vect)
        self._frame.append_fr_proc_data(frdata)
    elif channel_type == "sim":
        # FrSimData: name, comment, sample_rate, time_offset, fshift, phase
        frdata = _core.FrSimData(channel, comment, sample_rate, 0.0, 0.0, 0.0)
        frdata.append_data(vect)
        self._frame.append_fr_sim_data(frdata)
    else:
        # FIXME: ADC channel support needs to be implemented
        # Requires proper GetRawData()/SetRawData() initialization in C++ bindings
        msg = (
            f"Unsupported channel_type: {channel_type}. "
            f"Supported types: 'proc', 'sim'"
        )
        raise ValueError(msg)

    # Keep references alive to prevent garbage collection
    # (C++ uses raw pointers with empty deleters, so Python must keep objects alive)
    self._vects.append(vect)
    self._frdatas.append(frdata)

    # Also store in channels dict for dict-like access
    self._channels[channel] = TimeSeries(
        array=data,
        name=channel,
        dtype=vect.get_type(),
        t0=self.t0,
        dt=dt,
        duration=len(data) * dt,
        sample_rate=sample_rate,
        unit=unit,
        type=channel_type,
    )

add_history

add_history(name: str, comment: str, time: int | None = None)

Add a history/metadata entry to the frame.

Parameters:

Name Type Description Default
name str

Name/key for this metadata entry

required
comment str

The metadata value/comment

required
time int

GPS time for this entry (default: frame start time)

None
Source code in gwframe/write.py
def add_history(self, name: str, comment: str, time: int | None = None):
    """
    Add a history/metadata entry to the frame.

    Parameters
    ----------
    name : str
        Name/key for this metadata entry
    comment : str
        The metadata value/comment
    time : int, optional
        GPS time for this entry (default: frame start time)
    """
    if time is None:
        time = int(self.t0)
    history = _core.FrHistory(name, time, comment)
    self._frame.append_frhistory(history)

write

write(filename: str | PathLike[str], compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6)

Write this frame to a GWF file.

Parameters:

Name Type Description Default
filename str or path - like

Output file path

required
compression int

Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP) Use Compression.RAW for no compression

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6, higher = more compression)

6

Examples:

>>> frame.write('output.gwf')
>>> frame.write('output_raw.gwf', compression=gwframe.Compression.RAW)
Source code in gwframe/write.py
def write(
    self,
    filename: str | PathLike[str],
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
):
    """
    Write this frame to a GWF file.

    Parameters
    ----------
    filename : str or path-like
        Output file path
    compression : int, optional
        Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)
        Use Compression.RAW for no compression
    compression_level : int, optional
        Compression level 0-9 (default: 6, higher = more compression)

    Examples
    --------
    >>> frame.write('output.gwf')
    >>> frame.write('output_raw.gwf', compression=gwframe.Compression.RAW)
    """
    # Rebuild frame if channels were modified via dict interface
    if self._channels_modified:
        self._rebuild_frame()

    # Create output stream inline (ensures proper flushing)
    self._frame.write(
        _core.OFrameFStream(fspath(filename)), compression, compression_level
    )

write_bytes

write_bytes(compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6) -> bytes

Write this frame to bytes (in-memory GWF format).

Parameters:

Name Type Description Default
compression int

Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6)

6

Returns:

Type Description
bytes

GWF-formatted data as bytes

Examples:

>>> frame = gwframe.Frame(t0=1234567890.0, duration=1.0, name='L1')
>>> frame.add_channel('L1:TEST', data, dt=1.0/16384)
>>> gwf_bytes = frame.write_bytes()
>>> # Verify round-trip
>>> read_data = gwframe.read_bytes(gwf_bytes, 'L1:TEST')
Source code in gwframe/write.py
def write_bytes(
    self,
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
) -> bytes:
    """
    Write this frame to bytes (in-memory GWF format).

    Parameters
    ----------
    compression : int, optional
        Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)
    compression_level : int, optional
        Compression level 0-9 (default: 6)

    Returns
    -------
    bytes
        GWF-formatted data as bytes

    Examples
    --------
    >>> frame = gwframe.Frame(t0=1234567890.0, duration=1.0, name='L1')
    >>> frame.add_channel('L1:TEST', data, dt=1.0/16384)
    >>> gwf_bytes = frame.write_bytes()
    >>> # Verify round-trip
    >>> read_data = gwframe.read_bytes(gwf_bytes, 'L1:TEST')
    """
    # Rebuild frame if channels were modified via dict interface
    if self._channels_modified:
        self._rebuild_frame()

    # Create memory buffer for output
    buffer = _core.MemoryBuffer(_core.IOS_OUT)

    # Write frame in a scope to ensure stream is destroyed (flushed) before reading
    # The stream destructor writes the TOC which is critical for reading
    stream = _core.OFrameMemStream(buffer)
    self._frame.write(stream, compression, compression_level)
    del stream  # Explicitly destroy stream to flush TOC

    # Extract bytes from buffer
    return buffer.get_bytes()

write_to_stream

write_to_stream(stream, compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6)

Write this frame to an output stream.

This is used internally by FrameWriter for writing multiple frames.

Parameters:

Name Type Description Default
stream OFrameFStream

Output stream to write to

required
compression int

Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6)

6
Source code in gwframe/write.py
def write_to_stream(
    self,
    stream,
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
):
    """
    Write this frame to an output stream.

    This is used internally by FrameWriter for writing multiple frames.

    Parameters
    ----------
    stream : OFrameFStream
        Output stream to write to
    compression : int, optional
        Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)
    compression_level : int, optional
        Compression level 0-9 (default: 6)
    """
    # Rebuild frame if channels were modified via dict interface
    if self._channels_modified:
        self._rebuild_frame()

    self._frame.write(stream, compression, compression_level)

FrameFileInfo dataclass

FrameFileInfo(num_frames: int, channels: list[str], frames: list[FrameInfo], compression: int)

Complete metadata about a GWF file.

This dataclass holds file-level and frame-level metadata for a GWF file.

Attributes:

Name Type Description
num_frames int

Total number of frames in the file

channels list[str]

List of all channel names in the file

frames list[FrameInfo]

List of metadata for each frame

compression int

Compression scheme used for all channels in the file (e.g., Compression.GZIP)

Examples:

>>> info = gwframe.get_info('data.gwf')
>>> print(f"File contains {info.num_frames} frames")
>>> print(f"Channels: {', '.join(info.channels)}")
>>> print(f"Compression: {Compression(info.compression).name}")
>>> # Preserve compression when writing
>>> with FrameWriter('output.gwf', **info.compression_settings) as writer:
...     pass

compression_settings property

compression_settings: dict[str, int]

Return compression settings as kwargs for FrameWriter.

Returns:

Type Description
dict[str, int]

Compression settings suitable for FrameWriter constructor

Examples:

>>> info = gwframe.get_info('input.gwf')
>>> with gwframe.FrameWriter(
...     'output.gwf', **info.compression_settings
... ) as writer:
...     # Frames written with same compression as input file
...     pass

FrameIndexError

FrameIndexError(frame_index: int, num_frames: int, source: str | None = None)

Bases: IndexError

Raised when requested frame index is out of range.

Attributes:

Name Type Description
frame_index int

The requested frame index

num_frames int

Total number of frames in the file

source str or None

Source file path or description

Source code in gwframe/errors.py
def __init__(
    self,
    frame_index: int,
    num_frames: int,
    source: str | None = None,
):
    self.frame_index = frame_index
    self.num_frames = num_frames
    self.source = source
    super().__init__(str(self))

__str__

__str__() -> str

Generate helpful error message.

Source code in gwframe/errors.py
def __str__(self) -> str:
    """Generate helpful error message."""
    max_index = self.num_frames - 1
    msg = (
        f"Frame index {self.frame_index} is out of range. "
        f"File contains {self.num_frames} frame(s) (valid indices: 0-{max_index})"
    )
    if self.source:
        msg += f" in '{self.source}'"
    msg += "."

    return msg

FrameInfo dataclass

FrameInfo(index: int, t0: float, duration: float, name: str, run: int, frame_number: int)

Metadata about a single frame in a GWF file.

This dataclass holds metadata for a frame without the actual channel data.

Attributes:

Name Type Description
index int

Frame index in the file (0-based)

t0 float

Start time in GPS seconds

duration float

Frame duration in seconds

name str

Frame name (e.g., 'H1', 'L1')

run int

Run number (negative for simulated data)

frame_number int

Frame sequence number

Examples:

>>> info = gwframe.get_info('data.gwf')
>>> frame = info.frames[0]
>>> print(f"Frame {frame.index}: {frame.name} at GPS {frame.t0}")

FrameWriter

FrameWriter(destination: str | PathLike[str] | BytesIO, compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6, frame_number: int = 0)

Context manager for writing multiple frames to a GWF file or BytesIO buffer.

This is the recommended way to write multiple frames, as it keeps the output stream open and efficiently writes frames sequentially.

Parameters:

Name Type Description Default
destination str, path-like, or BytesIO

Output destination - either a file path or BytesIO object

required
compression int

Compression scheme (default: Compression.ZERO_SUPPRESS_OTHERWISE_GZIP)

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6)

6

Examples:

>>> # Write multiple 1-second frames to file
>>> with gwframe.FrameWriter('output.gwf') as writer:
...     for i in range(10):
...         t0 = 1234567890.0 + i
...         data = np.random.randn(16384)
...         writer.write(data, t0=t0, sample_rate=16384, name='L1:TEST')
>>> # Write to BytesIO
>>> from io import BytesIO
>>> buffer = BytesIO()
>>> with gwframe.FrameWriter(buffer) as writer:
...     for i in range(10):
...         data = np.random.randn(16384)
...         writer.write(data, t0=1234567890.0 + i,
...                      sample_rate=16384, name='L1:TEST')
>>> gwf_bytes = buffer.getvalue()
Source code in gwframe/write.py
def __init__(
    self,
    destination: str | PathLike[str] | BytesIO,
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
    frame_number: int = 0,
):
    self.compression = compression
    self.compression_level = compression_level
    self._frame_number = frame_number
    self._stream = None
    self._memory_buffer = None
    self._bytesio_dest = None

    # Determine if destination is file or BytesIO
    if isinstance(destination, BytesIO):
        self._bytesio_dest = destination
        self.filename = None
    else:
        self.filename = fspath(destination)
        self._bytesio_dest = None

write

write(channels: dict[str, NDArray] | NDArray, t0: float, sample_rate: float | dict[str, float], *, name: str = '', run: int = 0, unit: str | dict[str, str] = '', channel_type: str = 'proc')

Convenience method to write data directly without creating Frame object.

This creates a Frame internally and writes it immediately.

Parameters:

Name Type Description Default
channels dict or ndarray

Channel data. Either: - dict mapping channel names to 1D NumPy arrays - Single 1D NumPy array (requires channel name in name parameter)

required
t0 float

GPS start time of the frame

required
sample_rate float or dict

Sample rate in Hz. Either: - Single float value used for all channels - dict mapping channel names to sample rates

required
name str

Frame name (e.g., 'L1') or single channel name if channels is an array

''
run int

Run number (default: 0, negative for simulated data)

0
unit str or dict

Physical unit. Either: - Single string used for all channels (default: '') - dict mapping channel names to units

''
channel_type str

Type of channels: 'proc' (processed, default) or 'sim' (simulated)

'proc'

Examples:

>>> with gwframe.FrameWriter('output.gwf') as writer:
...     for i in range(10):
...         data = np.random.randn(16384)
...         writer.write(
...             data, t0=1234567890.0 + i, sample_rate=16384, name='L1:TEST'
...         )
Source code in gwframe/write.py
def write(
    self,
    channels: dict[str, npt.NDArray] | npt.NDArray,
    t0: float,
    sample_rate: float | dict[str, float],
    *,
    name: str = "",
    run: int = 0,
    unit: str | dict[str, str] = "",
    channel_type: str = "proc",
):
    """
    Convenience method to write data directly without creating Frame object.

    This creates a Frame internally and writes it immediately.

    Parameters
    ----------
    channels : dict or np.ndarray
        Channel data. Either:
        - dict mapping channel names to 1D NumPy arrays
        - Single 1D NumPy array (requires channel name in name parameter)
    t0 : float
        GPS start time of the frame
    sample_rate : float or dict
        Sample rate in Hz. Either:
        - Single float value used for all channels
        - dict mapping channel names to sample rates
    name : str, optional
        Frame name (e.g., 'L1') or single channel name if channels is an array
    run : int, optional
        Run number (default: 0, negative for simulated data)
    unit : str or dict, optional
        Physical unit. Either:
        - Single string used for all channels (default: '')
        - dict mapping channel names to units
    channel_type : str, optional
        Type of channels: 'proc' (processed, default) or 'sim' (simulated)

    Examples
    --------
    >>> with gwframe.FrameWriter('output.gwf') as writer:
    ...     for i in range(10):
    ...         data = np.random.randn(16384)
    ...         writer.write(
    ...             data, t0=1234567890.0 + i, sample_rate=16384, name='L1:TEST'
    ...         )
    """
    if self._stream is None:
        msg = "FrameWriter not opened (use 'with' statement)"
        raise RuntimeError(msg)

    # Handle single array case - convert to dict
    if isinstance(channels, np.ndarray):
        if not name:
            msg = "name parameter required when channels is a single array"
            raise ValueError(msg)
        channel_name = name
        channels = {channel_name: channels}
        frame_name = channel_name.split(":")[0] if ":" in channel_name else ""
    else:
        frame_name = name

    # Determine frame duration from first channel
    first_channel = next(iter(channels.keys()))
    first_data = channels[first_channel]
    first_rate = (
        sample_rate
        if isinstance(sample_rate, (int, float))
        else sample_rate[first_channel]
    )
    duration = len(first_data) / first_rate

    # Create frame with auto-incremented frame number
    frame = Frame(
        t0=t0,
        duration=duration,
        name=frame_name,
        run=run,
        frame_number=self._frame_number,
    )

    # Add all channels to the frame
    _populate_frame_with_channels(frame, channels, sample_rate, unit, channel_type)

    # Write the frame (this will auto-increment _frame_number)
    self.write_frame(frame)

write_frame

write_frame(frame: Frame)

Write a Frame object to the file.

Parameters:

Name Type Description Default
frame Frame

The frame to write

required

Examples:

>>> with gwframe.FrameWriter('output.gwf') as writer:
...     frame = gwframe.Frame(t0=1234567890.0, duration=1.0, name='L1')
...     frame.add_channel('L1:TEST', data, dt=1.0/16384)
...     writer.write_frame(frame)
Notes

If the frame was created with frame_number=0 (default), the writer will use its tracked frame_number. Otherwise, the frame's frame_number is used. Frame numbers auto-increment with each write.

Source code in gwframe/write.py
def write_frame(self, frame: Frame):
    """
    Write a Frame object to the file.

    Parameters
    ----------
    frame : Frame
        The frame to write

    Examples
    --------
    >>> with gwframe.FrameWriter('output.gwf') as writer:
    ...     frame = gwframe.Frame(t0=1234567890.0, duration=1.0, name='L1')
    ...     frame.add_channel('L1:TEST', data, dt=1.0/16384)
    ...     writer.write_frame(frame)

    Notes
    -----
    If the frame was created with frame_number=0 (default), the writer
    will use its tracked frame_number. Otherwise, the frame's frame_number
    is used. Frame numbers auto-increment with each write.
    """
    if self._stream is None:
        msg = "FrameWriter not opened (use 'with' statement)"
        raise RuntimeError(msg)

    frame.write_to_stream(self._stream, self.compression, self.compression_level)

    # Auto-increment for next frame
    self._frame_number += 1

InvalidTimeRangeError

InvalidTimeRangeError(start: float, end: float, file_start: float, file_end: float, source: str | None = None)

Bases: ValueError

Raised when requested time range does not overlap with file data.

Attributes:

Name Type Description
start float

Requested start time (GPS seconds)

end float

Requested end time (GPS seconds)

file_start float

Actual start time of data in file (GPS seconds)

file_end float

Actual end time of data in file (GPS seconds)

source str or None

Source file path or description

Source code in gwframe/errors.py
def __init__(
    self,
    start: float,
    end: float,
    file_start: float,
    file_end: float,
    source: str | None = None,
):
    self.start = start
    self.end = end
    self.file_start = file_start
    self.file_end = file_end
    self.source = source
    super().__init__(str(self))

__str__

__str__() -> str

Generate helpful error message.

Source code in gwframe/errors.py
def __str__(self) -> str:
    """Generate helpful error message."""
    msg = (
        f"Requested time range [{self.start}, {self.end}) does not overlap "
        f"with file range [{self.file_start}, {self.file_end})"
    )
    if self.source:
        msg += f" in '{self.source}'"
    msg += "."

    # Add specific guidance based on the issue
    if self.start >= self.file_end:
        msg += (
            f"\nRequested start time ({self.start}) is at or after "
            f"file end ({self.file_end})."
        )
    elif self.end <= self.file_start:
        msg += (
            f"\nRequested end time ({self.end}) is at or before "
            f"file start ({self.file_start})."
        )

    return msg

TimeSeries dataclass

TimeSeries(array: NDArray[floating], name: str, dtype: int, t0: float, dt: float, duration: float, sample_rate: float, unit: str, type: str)

Time series data from a GWF channel.

This dataclass holds the array data and metadata for a channel read from a GWF file.

Attributes:

Name Type Description
array ndarray

NumPy array containing the time series data

name str

Channel name (e.g., 'H1:LOSC-STRAIN')

dtype int

frameCPP data type code (e.g., FR_VECT_8R for double)

t0 float

Start time in GPS seconds

dt float

Sample spacing in seconds

duration float

Total duration in seconds

sample_rate float

Sampling rate in Hz (1/dt)

unit str

Physical unit of the data (e.g., 'strain')

type str

Channel type: 'proc' (processed), 'adc' (raw ADC), or 'sim' (simulated)

Examples:

>>> data = gwframe.read('data.gwf', 'H1:LOSC-STRAIN')
>>> print(f"Channel: {data.name}")
>>> print(f"Duration: {data.duration} s at {data.sample_rate} Hz")
>>> print(f"Data shape: {data.array.shape}")

combine_channels

combine_channels(input_sources: Sequence[str | PathLike[str]], output_dir: str | PathLike[str], keep_channels: Sequence[str] | None = None, drop_channels: Sequence[str] | None = None) -> list[str]

Combine channels from multiple frame sources into single files.

Takes N sources (all files or all directories) covering the same time ranges and combines their channels. Sources are matched by time range.

Parameters:

Name Type Description Default
input_sources sequence of str or path-like

List of N source files or N source directories to combine. All sources must be the same type (all files or all directories).

required
output_dir str or path - like

Directory where output files will be written

required
keep_channels sequence of str

If specified, only include these channels in the output. Mutually exclusive with drop_channels.

None
drop_channels sequence of str

If specified, exclude these channels from the output. Mutually exclusive with keep_channels.

None

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Combine 2 files covering the same time range
>>> gwframe.combine_channels(['file1.gwf', 'file2.gwf'], 'output/')
>>> # Combine and keep only specific channels
>>> gwframe.combine_channels(
...     ['file1.gwf', 'file2.gwf'], 'output/',
...     keep_channels=['L1:STRAIN', 'L1:LSC']
... )
>>> # Combine and drop specific channels
>>> gwframe.combine_channels(
...     ['dir1/', 'dir2/'], 'output/',
...     drop_channels=['L1:UNWANTED']
... )
Notes

All sources must have matching frame structures (same times and durations). Raises detailed error messages if frames don't align.

Source code in gwframe/operations.py
def combine_channels(
    input_sources: Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str],
    keep_channels: Sequence[str] | None = None,
    drop_channels: Sequence[str] | None = None,
) -> list[str]:
    """
    Combine channels from multiple frame sources into single files.

    Takes N sources (all files or all directories) covering the same time ranges
    and combines their channels. Sources are matched by time range.

    Parameters
    ----------
    input_sources : sequence of str or path-like
        List of N source files or N source directories to combine.
        All sources must be the same type (all files or all directories).
    output_dir : str or path-like
        Directory where output files will be written
    keep_channels : sequence of str, optional
        If specified, only include these channels in the output.
        Mutually exclusive with drop_channels.
    drop_channels : sequence of str, optional
        If specified, exclude these channels from the output.
        Mutually exclusive with keep_channels.

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Combine 2 files covering the same time range
    >>> gwframe.combine_channels(['file1.gwf', 'file2.gwf'], 'output/')

    >>> # Combine and keep only specific channels
    >>> gwframe.combine_channels(
    ...     ['file1.gwf', 'file2.gwf'], 'output/',
    ...     keep_channels=['L1:STRAIN', 'L1:LSC']
    ... )

    >>> # Combine and drop specific channels
    >>> gwframe.combine_channels(
    ...     ['dir1/', 'dir2/'], 'output/',
    ...     drop_channels=['L1:UNWANTED']
    ... )

    Notes
    -----
    All sources must have matching frame structures (same times and durations).
    Raises detailed error messages if frames don't align.
    """
    if len(input_sources) < 2:
        msg = "combine_channels requires at least 2 sources"
        raise ValueError(msg)

    if keep_channels is not None and drop_channels is not None:
        msg = "keep_channels and drop_channels are mutually exclusive"
        raise ValueError(msg)

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Check if sources are files or directories
    source_paths = [Path(s) for s in input_sources]
    are_files = [p.is_file() for p in source_paths]
    are_dirs = [p.is_dir() for p in source_paths]

    if not (all(are_files) or all(are_dirs)):
        msg = "All sources must be the same type (all files or all directories)"
        raise ValueError(msg)

    if all(are_files):
        return _combine_files(source_paths, output_dir, keep_channels, drop_channels)
    return _combine_directories(source_paths, output_dir, keep_channels, drop_channels)

drop_channels

drop_channels(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, channels_to_drop: Sequence[str] | None = None, *, in_place: bool = False) -> list[str]

Remove specified channels from frame files.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False. Mutually exclusive with in_place=True.

None
channels_to_drop sequence of str

List of channel names to remove

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> gwframe.drop_channels(
...     'input.gwf',
...     'output/',
...     ['L1:UNWANTED_CHANNEL']
... )
>>> # In place
>>> gwframe.drop_channels(
...     'input.gwf',
...     channels_to_drop=['L1:UNWANTED_CHANNEL'],
...     in_place=True
... )
Source code in gwframe/operations.py
def drop_channels(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    channels_to_drop: Sequence[str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Remove specified channels from frame files.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False. Mutually exclusive with in_place=True.
    channels_to_drop : sequence of str
        List of channel names to remove
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> gwframe.drop_channels(
    ...     'input.gwf',
    ...     'output/',
    ...     ['L1:UNWANTED_CHANNEL']
    ... )

    >>> # In place
    >>> gwframe.drop_channels(
    ...     'input.gwf',
    ...     channels_to_drop=['L1:UNWANTED_CHANNEL'],
    ...     in_place=True
    ... )
    """
    if not channels_to_drop:
        msg = "channels_to_drop must be provided and non-empty"
        raise ValueError(msg)

    def _drop_file(input_file, output_file):
        """Process a single file, dropping specified channels."""
        frames = read_frames(input_file)
        channels_set = set(channels_to_drop)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Drop channels from frame
                for channel_name in channels_set:
                    if channel_name in frame:
                        del frame[channel_name]
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _drop_file, in_place=in_place
    )

get_channels

get_channels(filename: str | PathLike[str]) -> list[str]

Get a list of all channels in a GWF file.

Parameters:

Name Type Description Default
filename str or path - like

Path to the GWF file

required

Returns:

Name Type Description
channels list[str]

List of all channel names

Examples:

>>> channels = gwframe.get_channels('data.gwf')
>>> print(f"Found {len(channels)} channels")
>>> for channel in channels:
...     print(channel)
Source code in gwframe/inspect.py
def get_channels(filename: str | PathLike[str]) -> list[str]:
    """
    Get a list of all channels in a GWF file.

    Parameters
    ----------
    filename : str or path-like
        Path to the GWF file

    Returns
    -------
    channels : list[str]
        List of all channel names

    Examples
    --------
    >>> channels = gwframe.get_channels('data.gwf')
    >>> print(f"Found {len(channels)} channels")
    >>> for channel in channels:
    ...     print(channel)
    """
    stream = _core.IFrameFStream(fspath(filename))
    toc = stream.get_toc()
    return [*toc.get_adc(), *toc.get_proc(), *toc.get_sim()]

get_info

get_info(filename: str | PathLike[str]) -> FrameFileInfo

Get metadata about a GWF file.

Parameters:

Name Type Description Default
filename str or path - like

Path to the GWF file

required

Returns:

Name Type Description
info FrameFileInfo

Structured metadata containing: - num_frames: number of frames in file - channels: list of all channel names - frames: list of FrameInfo objects with complete frame metadata

Examples:

>>> info = gwframe.get_info('data.gwf')
>>> print(f"File contains {info.num_frames} frames")
>>> print(f"Frame 0: {info.frames[0].name} at GPS {info.frames[0].t0}")
>>> print(f"Channels: {', '.join(info.channels)}")
Source code in gwframe/inspect.py
def get_info(filename: str | PathLike[str]) -> FrameFileInfo:
    """
    Get metadata about a GWF file.

    Parameters
    ----------
    filename : str or path-like
        Path to the GWF file

    Returns
    -------
    info : FrameFileInfo
        Structured metadata containing:
        - num_frames: number of frames in file
        - channels: list of all channel names
        - frames: list of FrameInfo objects with complete frame metadata

    Examples
    --------
    >>> info = gwframe.get_info('data.gwf')
    >>> print(f"File contains {info.num_frames} frames")
    >>> print(f"Frame 0: {info.frames[0].name} at GPS {info.frames[0].t0}")
    >>> print(f"Channels: {', '.join(info.channels)}")
    """
    stream = _core.IFrameFStream(fspath(filename))
    num_frames = stream.get_number_of_frames()
    toc = stream.get_toc()

    # Get all channels
    channels = [*toc.get_adc(), *toc.get_proc(), *toc.get_sim()]

    # Detect compression from first available channel in first frame
    compression = _detect_compression(stream, toc)

    # Read each frame header to get complete metadata
    frames = []
    for i in range(num_frames):
        # Read frame header
        frame_h = stream.read_frame_n(i)

        # Extract metadata from frame header
        time = frame_h.get_gps_time()
        t0 = float(time.sec) + float(time.nsec) * 1e-9

        frames.append(
            FrameInfo(
                index=i,
                t0=t0,
                duration=frame_h.get_dt(),
                name=frame_h.get_name(),
                run=frame_h.get_run(),
                frame_number=frame_h.get_frame(),
            )
        )

    return FrameFileInfo(
        num_frames=num_frames,
        channels=channels,
        frames=frames,
        compression=compression,
    )

impute_missing_data

impute_missing_data(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, replace_value: float = nan, fill_value: float = 0.0, channels: Sequence[str] | None = None, *, in_place: bool = False) -> list[str]

Replace specific values in frame file channels with a fill value.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False. Mutually exclusive with in_place=True.

None
replace_value float

Value to replace (default: NaN). Can be NaN or any numeric value.

nan
fill_value float

Value to use for replacement (default: 0.0). Will be cast to appropriate dtype.

0.0
channels sequence of str

If specified, only impute these channels. Otherwise imputes all channels.

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Replace NaNs with 0 in all channels
>>> gwframe.impute_missing_data('input.gwf', 'output/')
>>> # In place
>>> gwframe.impute_missing_data('input.gwf', in_place=True)
>>> # Replace specific value in specific channels
>>> gwframe.impute_missing_data(
...     'input.gwf', 'output/',
...     replace_value=-999.0,
...     fill_value=0.0,
...     channels=['L1:STRAIN']
... )
Source code in gwframe/operations.py
def impute_missing_data(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    replace_value: float = np.nan,
    fill_value: float = 0.0,
    channels: Sequence[str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Replace specific values in frame file channels with a fill value.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False. Mutually exclusive with in_place=True.
    replace_value : float, optional
        Value to replace (default: NaN). Can be NaN or any numeric value.
    fill_value : float, optional
        Value to use for replacement (default: 0.0). Will be cast to appropriate dtype.
    channels : sequence of str, optional
        If specified, only impute these channels. Otherwise imputes all channels.
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Replace NaNs with 0 in all channels
    >>> gwframe.impute_missing_data('input.gwf', 'output/')

    >>> # In place
    >>> gwframe.impute_missing_data('input.gwf', in_place=True)

    >>> # Replace specific value in specific channels
    >>> gwframe.impute_missing_data(
    ...     'input.gwf', 'output/',
    ...     replace_value=-999.0,
    ...     fill_value=0.0,
    ...     channels=['L1:STRAIN']
    ... )
    """
    channels_set = set(channels) if channels else None
    is_nan_replacement = np.isnan(replace_value)

    def _impute_file(input_file, output_file):
        """Process a single file, imputing missing data."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Determine which channels to impute
                if channels_set is not None:
                    channels_to_impute = channels_set & set(frame.keys())
                else:
                    channels_to_impute = set(frame.keys())

                # Process each channel
                for channel_name in channels_to_impute:
                    ts = frame[channel_name]
                    data = ts.array.copy()
                    # Cast fill_value to appropriate dtype and replace
                    fill = np.array(fill_value).astype(data.dtype)
                    if is_nan_replacement:
                        data = np.where(np.isnan(data), fill, data)
                    else:
                        data = np.where(data == replace_value, fill, data)

                    frame[channel_name] = TimeSeries(
                        array=data,
                        name=ts.name,
                        dtype=ts.dtype,
                        t0=ts.t0,
                        dt=ts.dt,
                        duration=ts.duration,
                        sample_rate=ts.sample_rate,
                        unit=ts.unit,
                        type=ts.type,
                    )

                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _impute_file, in_place=in_place
    )

read_bytes

read_bytes(data: bytes, channel: str, frame_index: int = 0, *, validate_checksum: bool = False, start: float | None = None, end: float | None = None) -> TimeSeries
read_bytes(data: bytes, channel: None, frame_index: int = 0, *, validate_checksum: bool = False, start: float | None = None, end: float | None = None) -> dict[str, TimeSeries]
read_bytes(data: bytes, channel: list[str], frame_index: int = 0, *, validate_checksum: bool = False, start: float | None = None, end: float | None = None) -> dict[str, TimeSeries]
read_bytes(data: bytes, channel: str | None | list[str] = None, frame_index: int = 0, *, validate_checksum: bool = False, start: float | None = None, end: float | None = None) -> TimeSeries | dict[str, TimeSeries]

Read channel data from GWF data in memory (bytes).

This allows reading GWF data without writing to disk first, which is useful when working with data from network streams, compressed archives, or in-memory buffers.

Parameters:

Name Type Description Default
data bytes

Raw GWF file data as bytes

required
channel str, None, or list[str]

Channel(s) to read: - str: Read single channel (e.g., 'L1:GWOSC-16KHZ_R1_STRAIN') - None: Read all channels from the frame (default) - list[str]: Read specific list of channels

None
frame_index int

Index of the frame to read from (default: 0)

0
validate_checksum bool

Validate frame file checksums before reading (default: False). When enabled, performs file-level checksum validation which requires reading the entire frame file. Disabled by default for performance.

False

Returns:

Name Type Description
data TimeSeries or dict[str, TimeSeries]
  • If channel is a str: returns TimeSeries for that channel
  • If channel is None or list[str]: returns dict mapping channel names to TimeSeries

Examples:

>>> with open('data.gwf', 'rb') as f:
...     gwf_bytes = f.read()
>>> data = gwframe.read_bytes(gwf_bytes, 'L1:GWOSC-16KHZ_R1_STRAIN')
>>> print(f"Read {len(data.array)} samples at {data.sample_rate} Hz")
>>> # Read all channels
>>> all_data = gwframe.read_bytes(gwf_bytes, channel=None)
>>> print(f"Found {len(all_data)} channels")
>>> import io
>>> from io import BytesIO
>>> data = gwframe.read_bytes(BytesIO(gwf_bytes).read(), 'L1:STRAIN')
Notes

This function uses frameCPP's MemoryBuffer internally to read from memory without writing to disk.

Source code in gwframe/read.py
def read_bytes(
    data: bytes,
    channel: str | None | list[str] = None,
    frame_index: int = 0,
    *,
    validate_checksum: bool = False,
    start: float | None = None,
    end: float | None = None,
) -> TimeSeries | dict[str, TimeSeries]:
    """
    Read channel data from GWF data in memory (bytes).

    This allows reading GWF data without writing to disk first,
    which is useful when working with data from network streams,
    compressed archives, or in-memory buffers.

    Parameters
    ----------
    data : bytes
        Raw GWF file data as bytes
    channel : str, None, or list[str], optional
        Channel(s) to read:
        - str: Read single channel (e.g., 'L1:GWOSC-16KHZ_R1_STRAIN')
        - None: Read all channels from the frame (default)
        - list[str]: Read specific list of channels
    frame_index : int, optional
        Index of the frame to read from (default: 0)
    validate_checksum : bool, optional
        Validate frame file checksums before reading (default: False).
        When enabled, performs file-level checksum validation which requires
        reading the entire frame file. Disabled by default for performance.

    Returns
    -------
    data : TimeSeries or dict[str, TimeSeries]
        - If channel is a str: returns TimeSeries for that channel
        - If channel is None or list[str]: returns dict mapping channel names
          to TimeSeries

    Examples
    --------
    >>> with open('data.gwf', 'rb') as f:
    ...     gwf_bytes = f.read()
    >>> data = gwframe.read_bytes(gwf_bytes, 'L1:GWOSC-16KHZ_R1_STRAIN')
    >>> print(f"Read {len(data.array)} samples at {data.sample_rate} Hz")

    >>> # Read all channels
    >>> all_data = gwframe.read_bytes(gwf_bytes, channel=None)
    >>> print(f"Found {len(all_data)} channels")

    >>> import io
    >>> from io import BytesIO
    >>> data = gwframe.read_bytes(BytesIO(gwf_bytes).read(), 'L1:STRAIN')

    Notes
    -----
    This function uses frameCPP's MemoryBuffer internally to read
    from memory without writing to disk.
    """
    # Verify input is bytes
    if not isinstance(data, bytes):
        msg = f"data must be bytes, got {type(data)}"
        raise TypeError(msg)

    # Validate time-based slicing parameters
    if (start is None) != (end is None):
        msg = "start and end must be specified together"
        raise ValueError(msg)

    if start is not None and frame_index != 0:
        msg = "start/end parameters are mutually exclusive with frame_index"
        raise ValueError(msg)

    # Handle time-based slicing
    if start is not None:
        # For multi-channel, recursively read each channel with time slicing
        if channel is None or isinstance(channel, list):
            if channel is None:
                proc_ch, adc_ch, sim_ch = _core.enumerate_channels_from_bytes(data, 0)
                channels_to_read = list(proc_ch) + list(adc_ch) + list(sim_ch)
            else:
                channels_to_read = channel

            result: dict[str, TimeSeries] = {}
            for ch in channels_to_read:
                try:
                    ts = read_bytes(
                        data,
                        ch,
                        0,
                        validate_checksum=validate_checksum,
                        start=start,
                        end=end,
                    )
                    assert isinstance(
                        ts, TimeSeries
                    )  # Single channel returns TimeSeries
                    result[ch] = ts
                except (ValueError, RuntimeError):
                    continue
            return result

        # Single channel case - must be a string
        if not isinstance(channel, str):
            msg = f"channel must be str, None, or list[str], got {type(channel)}"
            raise TypeError(msg)

        # Find frames overlapping [start, end) using frame times
        frame_times = _core.get_frame_times_from_bytes(data)

        # Find which frames overlap with [start, end)
        # A frame overlaps if: frame_start < end AND frame_end > start
        frame_indices = [
            i
            for i, (frame_start, frame_duration) in enumerate(frame_times)
            if frame_start < end and (frame_start + frame_duration) > start
        ]

        if not frame_indices:
            # Get file time range for helpful error message
            if frame_times:
                file_start = frame_times[0][0]
                file_end = frame_times[-1][0] + frame_times[-1][1]
            else:
                file_start = file_end = 0.0
            assert start is not None and end is not None
            raise InvalidTimeRangeError(start, end, file_start, file_end)

        # Read and concatenate frames
        timeseries_list: list[TimeSeries] = []
        for frame_idx in frame_indices:
            ts = read_bytes(
                data, channel, frame_idx, validate_checksum=validate_checksum
            )
            assert isinstance(ts, TimeSeries)  # Narrows type for mypy
            timeseries_list.append(ts)

        # Stitch and slice using shared helper
        assert start is not None and end is not None
        return _stitch_and_slice_timeseries(timeseries_list, start, end)

    # Handle multiple channel cases
    if channel is None or isinstance(channel, list):
        # Get list of channels to read
        if channel is None:
            # Read all channels - enumerate from frame
            proc_ch, adc_ch, sim_ch = _core.enumerate_channels_from_bytes(
                data, frame_index
            )
            channels_to_read = list(proc_ch) + list(adc_ch) + list(sim_ch)
        else:
            # channel is a list[str]
            channels_to_read = channel

        # Read each channel and return dict
        channel_dict: dict[str, TimeSeries] = {}
        for ch in channels_to_read:
            try:
                ts = read_bytes(
                    data, ch, frame_index, validate_checksum=validate_checksum
                )
                assert isinstance(ts, TimeSeries)  # Single channel returns TimeSeries
                channel_dict[ch] = ts
            except (ValueError, RuntimeError):
                # Skip channels that fail to read
                continue

        return channel_dict

    # Single channel case (channel is a str)
    if not isinstance(channel, str):
        msg = f"channel must be str, None, or list[str], got {type(channel)}"
        raise TypeError(msg)

    # Validate checksums if requested
    if validate_checksum:
        _core.validate_frame_checksums(data)

    # Try each channel type until one works
    fr_data = None
    channel_type = None

    readers = [
        ("proc", _core.read_proc_from_bytes),
        ("sim", _core.read_sim_from_bytes),
        ("adc", _core.read_adc_from_bytes),
    ]

    for reader_type, reader_func in readers:
        try:
            fr_data = reader_func(data, frame_index, channel)
            channel_type = reader_type
            break
        except (RuntimeError, ValueError):
            continue

    if fr_data is None:
        # Get available channels to provide helpful error
        proc_ch, adc_ch, sim_ch = _core.enumerate_channels_from_bytes(data, frame_index)
        available = list(proc_ch) + list(adc_ch) + list(sim_ch)
        raise ChannelNotFoundError(channel, available)

    # Get the FrVect data vector (usually only one)
    if fr_data.get_data_size() == 0:
        msg = f"No data vectors found for channel '{channel}'"
        raise ValueError(msg)

    vect = fr_data.get_data_vector(0)  # Get first (usually only) vector

    # Extract NumPy array
    array = vect.get_data_uncompressed()

    # Get timing information
    if vect.get_n_dim() > 0:
        dim = vect.get_dim(0)
        dt = dim.dx  # Sample spacing
    else:
        dt = 0.0

    # Get time offset
    time_offset = fr_data.get_time_offset()

    # Read frame header to get GPS start time
    time_s, time_ns = _core.read_frame_gps_time(data, frame_index)
    frame_t0 = float(time_s) + float(time_ns) * 1e-9

    # Calculate data start time (frame start + offset)
    data_t0 = frame_t0 + time_offset

    # Calculate duration and sample rate
    duration = dt * len(array) if dt > 0 else 0.0
    sample_rate = 1.0 / dt if dt > 0 else 0.0

    # Ensure channel_type is set (helps mypy type narrowing)
    assert channel_type is not None

    return TimeSeries(
        array=array,
        name=vect.get_name(),
        dtype=vect.get_type(),
        t0=data_t0,
        dt=dt,
        duration=duration,
        sample_rate=sample_rate,
        unit=vect.get_unit_y(),
        type=channel_type,
    )

read_frames

read_frames(filename: str | PathLike[str]) -> Generator[Frame, None, None]

Read frames from a GWF file, preserving complete metadata.

Yields Frame objects that can be written directly to disk with identical metadata (frame name, run number, frame number, etc.).

Parameters:

Name Type Description Default
filename str or path - like

Path to the GWF file

required

Yields:

Name Type Description
frame Frame

Frame object containing all channel data with correct sample rates, units, types, and original frame metadata

Examples:

>>> # Iterate over frames
>>> for frame in gwframe.read_frames('data.gwf'):
...     print(f"Frame {frame.name} at GPS {frame.t0}")
>>> # Process and write frames
>>> with gwframe.FrameWriter('output.gwf') as writer:
...     for frame in gwframe.read_frames('input.gwf'):
...         writer.write_frame(frame)
>>> # Collect all frames into a list
>>> frames = list(gwframe.read_frames('data.gwf'))
>>> print(f"Read {len(frames)} frames")
See Also

read : Read channel data from frames Frame : Frame object for creating and manipulating frames FrameWriter : Context manager for writing frames to files

Source code in gwframe/read.py
def read_frames(filename: str | PathLike[str]) -> Generator[Frame, None, None]:
    """
    Read frames from a GWF file, preserving complete metadata.

    Yields Frame objects that can be written directly to disk with identical
    metadata (frame name, run number, frame number, etc.).

    Parameters
    ----------
    filename : str or path-like
        Path to the GWF file

    Yields
    ------
    frame : Frame
        Frame object containing all channel data with correct sample rates,
        units, types, and original frame metadata

    Examples
    --------
    >>> # Iterate over frames
    >>> for frame in gwframe.read_frames('data.gwf'):
    ...     print(f"Frame {frame.name} at GPS {frame.t0}")

    >>> # Process and write frames
    >>> with gwframe.FrameWriter('output.gwf') as writer:
    ...     for frame in gwframe.read_frames('input.gwf'):
    ...         writer.write_frame(frame)

    >>> # Collect all frames into a list
    >>> frames = list(gwframe.read_frames('data.gwf'))
    >>> print(f"Read {len(frames)} frames")

    See Also
    --------
    read : Read channel data from frames
    Frame : Frame object for creating and manipulating frames
    FrameWriter : Context manager for writing frames to files
    """
    # Convert to string path
    filename_str = fspath(filename) if not isinstance(filename, str) else filename

    # Get file metadata
    file_info = get_info(filename_str)

    for frame_info in file_info.frames:
        # Read all channels for this frame
        channel_data = read(filename_str, channel=None, frame_index=frame_info.index)
        assert isinstance(channel_data, dict)  # channel=None returns dict

        # Create Frame with preserved metadata
        frame = Frame(
            t0=frame_info.t0,
            duration=frame_info.duration,
            name=frame_info.name,
            run=frame_info.run,
            frame_number=frame_info.frame_number,
        )

        # Add all channels with their metadata
        for channel_name, ts in channel_data.items():
            frame.add_channel(
                channel=channel_name,
                data=ts.array,
                sample_rate=ts.sample_rate,
                unit=ts.unit,
                channel_type=ts.type,
            )

        yield frame

recompress_frames

recompress_frames(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6, *, in_place: bool = False) -> list[str]

Rewrite frame files with different compression settings.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str or path - like

Directory where output files will be written. Required if in_place=False. Ignored if in_place=True.

None
compression int

Compression scheme (e.g., Compression.RAW, Compression.GZIP)

ZERO_SUPPRESS_OTHERWISE_GZIP
compression_level int

Compression level 0-9 (default: 6)

6
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Remove compression
>>> gwframe.recompress_frames('input.gwf', 'output/',
...                           compression=gwframe.Compression.RAW)
>>> # Maximum compression, in place
>>> gwframe.recompress_frames('input.gwf',
...                           compression=gwframe.Compression.GZIP,
...                           compression_level=9,
...                           in_place=True)
Source code in gwframe/operations.py
def recompress_frames(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Rewrite frame files with different compression settings.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str or path-like, optional
        Directory where output files will be written.
        Required if in_place=False. Ignored if in_place=True.
    compression : int
        Compression scheme (e.g., Compression.RAW, Compression.GZIP)
    compression_level : int, optional
        Compression level 0-9 (default: 6)
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Remove compression
    >>> gwframe.recompress_frames('input.gwf', 'output/',
    ...                           compression=gwframe.Compression.RAW)

    >>> # Maximum compression, in place
    >>> gwframe.recompress_frames('input.gwf',
    ...                           compression=gwframe.Compression.GZIP,
    ...                           compression_level=9,
    ...                           in_place=True)
    """

    def _recompress_file(input_file, output_file):
        """Process a single file, rewriting with different compression."""
        frames = read_frames(input_file)

        with FrameWriter(
            output_file, compression=compression, compression_level=compression_level
        ) as writer:
            for frame in frames:
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _recompress_file, in_place=in_place
    )

rename_channels

rename_channels(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, channel_map: dict[str, str] | None = None, *, in_place: bool = False) -> list[str]

Rename channels in frame files.

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str, path-like, or None

Directory where output files will be written. Required if in_place=False, ignored if in_place=True.

None
channel_map dict

Mapping of old channel names to new channel names

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Write to output directory
>>> gwframe.rename_channels(
...     'input.gwf',
...     'output/',
...     {'L1:OLD_NAME': 'L1:NEW_NAME'}
... )
>>> # Modify in place
>>> gwframe.rename_channels(
...     'input.gwf',
...     channel_map={'L1:OLD_NAME': 'L1:NEW_NAME'},
...     in_place=True
... )
Source code in gwframe/operations.py
def rename_channels(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    channel_map: dict[str, str] | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Rename channels in frame files.

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str, path-like, or None
        Directory where output files will be written.
        Required if in_place=False, ignored if in_place=True.
    channel_map : dict
        Mapping of old channel names to new channel names
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Write to output directory
    >>> gwframe.rename_channels(
    ...     'input.gwf',
    ...     'output/',
    ...     {'L1:OLD_NAME': 'L1:NEW_NAME'}
    ... )

    >>> # Modify in place
    >>> gwframe.rename_channels(
    ...     'input.gwf',
    ...     channel_map={'L1:OLD_NAME': 'L1:NEW_NAME'},
    ...     in_place=True
    ... )
    """
    if not channel_map:
        msg = "channel_map must be provided and non-empty"
        raise ValueError(msg)

    def _rename_file(input_file, output_file):
        """Process a single file, renaming channels."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            for frame in frames:
                # Rename channels in frame
                for old_name, new_name in channel_map.items():
                    if old_name in frame:
                        frame[new_name] = frame.pop(old_name)
                writer.write_frame(frame)

    return _process_files_with_operation(
        input_files, output_dir, _rename_file, in_place=in_place
    )

replace_channels

replace_channels(base_files: str | PathLike[str] | Sequence[str | PathLike[str]], update_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str], channels_to_replace: Sequence[str] | None = None) -> list[str]

Replace data in channels with updated versions from another frame file.

Parameters:

Name Type Description Default
base_files str, path-like, or sequence of str/path-like

Base GWF file(s) to process

required
update_files str, path-like, or sequence of str/path-like

GWF file(s) containing updated channel data

required
output_dir str or path - like

Directory where output files will be written

required
channels_to_replace sequence of str

List of channel names to replace. If None, replaces all channels found in update_files.

None

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> gwframe.replace_channels(
...     'base.gwf',
...     'updated.gwf',
...     'output/',
...     ['L1:STRAIN']
... )
Source code in gwframe/operations.py
def replace_channels(
    base_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    update_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str],
    channels_to_replace: Sequence[str] | None = None,
) -> list[str]:
    """
    Replace data in channels with updated versions from another frame file.

    Parameters
    ----------
    base_files : str, path-like, or sequence of str/path-like
        Base GWF file(s) to process
    update_files : str, path-like, or sequence of str/path-like
        GWF file(s) containing updated channel data
    output_dir : str or path-like
        Directory where output files will be written
    channels_to_replace : sequence of str, optional
        List of channel names to replace. If None, replaces all channels
        found in update_files.

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> gwframe.replace_channels(
    ...     'base.gwf',
    ...     'updated.gwf',
    ...     'output/',
    ...     ['L1:STRAIN']
    ... )
    """
    if isinstance(base_files, str | PathLike):
        base_files = [base_files]
    if isinstance(update_files, str | PathLike):
        update_files = [update_files]

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    output_files = []

    for base_file in base_files:
        output_file = output_dir / Path(base_file).name
        output_files.append(str(output_file))

        base_frames = read_frames(base_file)

        with FrameWriter(str(output_file)) as writer:
            for frame in base_frames:
                t0 = frame.t0
                duration = frame.duration

                # Read update data from matching time range
                update_data: dict[str, TimeSeries] = {}
                for update_file in update_files:
                    try:
                        data: dict[str, TimeSeries] = read(
                            update_file, channel=None, start=t0, end=t0 + duration
                        )
                        update_data.update(data)
                    except (ValueError, FileNotFoundError):
                        continue

                # Determine which channels to replace
                if channels_to_replace is None:
                    channels_to_replace_set = set(update_data.keys())
                else:
                    channels_to_replace_set = set(channels_to_replace)

                # Replace specified channels with update data
                for channel_name in channels_to_replace_set:
                    if channel_name in update_data:
                        frame[channel_name] = update_data[channel_name]

                # Add any new channels from update data
                for channel_name, ts in update_data.items():
                    if channel_name not in frame:
                        frame[channel_name] = ts

                writer.write_frame(frame)

    return output_files

resize_frames

resize_frames(input_files: str | PathLike[str] | Sequence[str | PathLike[str]], output_dir: str | PathLike[str] | None = None, target_duration: float | None = None, *, in_place: bool = False) -> list[str]

Resize frames to a different duration (e.g., 64s frames to 4s frames).

Parameters:

Name Type Description Default
input_files str, path-like, or sequence of str/path-like

Input GWF file(s) to process

required
output_dir str or path - like

Directory where output files will be written. Required if in_place=False. Ignored if in_place=True.

None
target_duration float

Target frame duration in seconds

None
in_place bool

If True, modify files in place (default: False)

False

Returns:

Name Type Description
output_files list[str]

List of output file paths created

Examples:

>>> # Split 64-second frames into 4-second frames
>>> gwframe.resize_frames('input.gwf', 'output/', target_duration=4.0)
>>> # Split frames in place
>>> gwframe.resize_frames('input.gwf', target_duration=4.0, in_place=True)
Notes

When splitting frames (target_duration < source_duration), data is divided evenly. When merging frames (target_duration > source_duration), consecutive frames are combined.

Source code in gwframe/operations.py
def resize_frames(
    input_files: str | PathLike[str] | Sequence[str | PathLike[str]],
    output_dir: str | PathLike[str] | None = None,
    target_duration: float | None = None,
    *,
    in_place: bool = False,
) -> list[str]:
    """
    Resize frames to a different duration (e.g., 64s frames to 4s frames).

    Parameters
    ----------
    input_files : str, path-like, or sequence of str/path-like
        Input GWF file(s) to process
    output_dir : str or path-like, optional
        Directory where output files will be written.
        Required if in_place=False. Ignored if in_place=True.
    target_duration : float
        Target frame duration in seconds
    in_place : bool, optional
        If True, modify files in place (default: False)

    Returns
    -------
    output_files : list[str]
        List of output file paths created

    Examples
    --------
    >>> # Split 64-second frames into 4-second frames
    >>> gwframe.resize_frames('input.gwf', 'output/', target_duration=4.0)

    >>> # Split frames in place
    >>> gwframe.resize_frames('input.gwf', target_duration=4.0, in_place=True)

    Notes
    -----
    When splitting frames (target_duration < source_duration), data is divided
    evenly. When merging frames (target_duration > source_duration), consecutive
    frames are combined.
    """
    if target_duration is None or target_duration <= 0:
        msg = "target_duration must be a positive number"
        raise ValueError(msg)

    def _resize_file(input_file, output_file):
        """Process a single file, resizing frames."""
        frames = read_frames(input_file)

        with FrameWriter(output_file) as writer:
            frame_number = 0

            for frame in frames:
                source_t0 = frame.t0
                source_duration = frame.duration

                # Calculate how many target frames fit in this source frame
                num_splits = int(source_duration / target_duration)

                if num_splits >= 1:
                    # Split into smaller frames
                    for split_idx in range(num_splits):
                        split_t0 = source_t0 + split_idx * target_duration

                        new_frame = Frame(
                            t0=split_t0,
                            duration=target_duration,
                            name=frame.name,
                            run=frame.run,
                            frame_number=frame_number,
                        )

                        # Slice data for this split
                        for channel_name, ts in frame.items():
                            start_sample = int(
                                split_idx * target_duration * ts.sample_rate
                            )
                            end_sample = int(
                                (split_idx + 1) * target_duration * ts.sample_rate
                            )
                            sliced_data = ts.array[start_sample:end_sample]

                            new_frame.add_channel(
                                channel_name,
                                sliced_data,
                                ts.sample_rate,
                                unit=ts.unit,
                                channel_type=ts.type,
                            )

                        writer.write_frame(new_frame)
                        frame_number += 1
                else:
                    # Keep original frame (target_duration >= source_duration)
                    frame.frame_number = frame_number
                    writer.write_frame(frame)
                    frame_number += 1

    return _process_files_with_operation(
        input_files, output_dir, _resize_file, in_place=in_place
    )

write_bytes

write_bytes(channels: dict[str, NDArray] | NDArray, t0: float, sample_rate: float | dict[str, float], *, name: str = '', run: int = 0, unit: str | dict[str, str] = '', channel_type: str = 'proc', compression: int = ZERO_SUPPRESS_OTHERWISE_GZIP, compression_level: int = 6) -> bytes

Write channel data to bytes (in-memory GWF format).

Parameters are identical to write() function.

Returns:

Type Description
bytes

GWF-formatted data as bytes

Examples:

>>> data = np.sin(np.linspace(0, 2*np.pi, 16384))
>>> gwf_bytes = gwframe.write_bytes(
...     data, t0=1234567890.0, sample_rate=16384, name='L1:TEST'
... )
>>> # Verify round-trip
>>> read_data = gwframe.read_bytes(gwf_bytes, 'L1:TEST')
See Also

write : Write channel data to a file Frame.write_bytes : Write a Frame object to bytes

Source code in gwframe/write.py
def write_bytes(
    channels: dict[str, npt.NDArray] | npt.NDArray,
    t0: float,
    sample_rate: float | dict[str, float],
    *,
    name: str = "",
    run: int = 0,
    unit: str | dict[str, str] = "",
    channel_type: str = "proc",
    compression: int = Compression.ZERO_SUPPRESS_OTHERWISE_GZIP,
    compression_level: int = 6,
) -> bytes:
    """
    Write channel data to bytes (in-memory GWF format).

    Parameters are identical to write() function.

    Returns
    -------
    bytes
        GWF-formatted data as bytes

    Examples
    --------
    >>> data = np.sin(np.linspace(0, 2*np.pi, 16384))
    >>> gwf_bytes = gwframe.write_bytes(
    ...     data, t0=1234567890.0, sample_rate=16384, name='L1:TEST'
    ... )
    >>> # Verify round-trip
    >>> read_data = gwframe.read_bytes(gwf_bytes, 'L1:TEST')

    See Also
    --------
    write : Write channel data to a file
    Frame.write_bytes : Write a Frame object to bytes
    """
    # Handle single array case - convert to dict
    if isinstance(channels, np.ndarray):
        if not name:
            msg = "name parameter required when channels is a single array"
            raise ValueError(msg)
        channel_name = name
        channels = {channel_name: channels}
        frame_name = channel_name.split(":")[0] if ":" in channel_name else ""
    else:
        frame_name = name

    # Determine frame duration from first channel
    first_channel = next(iter(channels.keys()))
    first_data = channels[first_channel]
    first_rate = (
        sample_rate
        if isinstance(sample_rate, (int, float))
        else sample_rate[first_channel]
    )
    duration = len(first_data) / first_rate

    # Create frame
    frame = Frame(t0=t0, duration=duration, name=frame_name, run=run)

    # Add all channels to the frame
    _populate_frame_with_channels(frame, channels, sample_rate, unit, channel_type)

    # Write to bytes
    return frame.write_bytes(compression, compression_level)