"""
DataSegmentGenerator - Segment data into continuous chunks for separate analysis.
This module provides functionality to segment time series data based on gap masks,
enabling separate analysis of continuous data segments rather than using tapering/windowing approaches.
"""
from typing import Any, Dict, Optional, Tuple
import numpy as np
from numpy.typing import NDArray
from .gap_window_generator import GapWindowGenerator
[docs]
class DataSegmentGenerator:
"""
Generator for segmenting time series data into continuous chunks based on gap masks.
This class takes a binary mask (1s for valid data, NaN/0s for gaps) and segments
the corresponding data into continuous chunks. Each segment contains the data,
time stamps, mask information, and indices for separate analysis.
Parameters
----------
mask : NDArray
Binary mask where 1 indicates valid data and NaN/0 indicates gaps.
data : NDArray
Time series data corresponding to the mask.
dt : float
Sampling interval (time step between samples).
t0 : float, optional
Start time for the time series. Default is 0.0.
Examples
--------
>>> import numpy as np
>>> from lisagap import DataSegmentGenerator
>>>
>>> # Create sample data with gaps
>>> data = np.random.randn(1000)
>>> mask = np.ones_like(data)
>>> mask[200:300] = np.nan # Create a gap
>>> mask[500:520] = np.nan # Another gap
>>>
>>> # Create segmenter
>>> segmenter = DataSegmentGenerator(mask=mask, data=data, dt=0.1, t0=0.0)
>>>
>>> # Get time domain segments
>>> segments = segmenter.get_time_segments()
>>>
>>> # Get frequency domain information
>>> freq_info = segmenter.get_freq_info_from_segments()
"""
[docs]
def __init__(self, mask: NDArray, data: NDArray, dt: float, t0: float = 0.0):
"""
Initialize the DataSegmentGenerator.
Parameters
----------
mask : NDArray
Binary mask where 1 indicates valid data and NaN/0 indicates gaps.
data : NDArray
Time series data corresponding to the mask.
dt : float
Sampling interval (time step between samples).
t0 : float, optional
Start time for the time series. Default is 0.0.
"""
self.mask = np.array(mask)
self.data = np.array(data)
self.dt = dt
self.t0 = t0
# Validate inputs
if len(self.mask) != len(self.data):
raise ValueError("Mask and data must have the same length")
if self.dt <= 0:
raise ValueError("Sampling interval dt must be positive")
# Convert mask to binary (handle NaN values)
self.binary_mask = np.where(np.isnan(self.mask) | (self.mask == 0), 0, 1)
# Find continuous segments
self._find_segments()
def _find_segments(self) -> None:
"""
Find continuous segments of valid data in the mask.
This method identifies the start and end indices of continuous
segments where the mask indicates valid data (value = 1).
"""
# Find transitions in the binary mask
diff = np.diff(np.concatenate(([0], self.binary_mask, [0])))
# Find start and end indices of segments
starts = np.where(diff == 1)[0]
ends = np.where(diff == -1)[0]
# Store segment information
self.segment_indices = []
for start, end in zip(starts, ends):
if end > start: # Only keep non-empty segments
self.segment_indices.append((start, end))
[docs]
def get_time_segments(
self,
apply_window: bool = False,
left_edge_taper: Optional[int] = None,
right_edge_taper: Optional[int] = None,
) -> Dict[str, Dict[str, Any]]:
"""
Get time domain segments of the data.
Parameters
----------
apply_window : bool, optional
If True, apply the windowing/tapering to the segmented data.
Default is False.
left_edge_taper : int, optional
Number of samples to taper on the left edge of the first segment.
Only applied when apply_window=True. Default is None (no edge tapering).
right_edge_taper : int, optional
Number of samples to taper on the right edge of the last segment.
Only applied when apply_window=True. Default is None (no edge tapering).
Returns
-------
Dict[str, Dict[str, Any]]
Dictionary containing segmented data with keys:
- 'data': Data array for the segment (windowed if apply_window=True)
- 'time': Time array for the segment
- 'mask': Mask array for the segment (showing any tapering applied)
- 'start_idx': Start index in original array
- 'end_idx': End index in original array
"""
segments = {}
for i, (start, end) in enumerate(self.segment_indices):
segment_key = f"segment_{i+1}"
# Extract segment data and mask
segment_data = self.data[start:end].copy()
segment_mask = self.mask[start:end].copy()
# Apply windowing if requested
if apply_window:
# Apply the existing mask windowing
segment_data = segment_data * segment_mask
# Apply edge tapering for first and last segments
segment_length = end - start
# Left edge tapering for first segment
if i == 0 and left_edge_taper is not None:
if left_edge_taper > 0 and left_edge_taper < segment_length:
# Create one-sided Tukey window (ramp up from 0 to 1)
left_taper = np.ones(segment_length)
for j in range(min(left_edge_taper, segment_length)):
# Cosine ramp from 0 to 1
left_taper[j] = 0.5 * (
1 - np.cos(np.pi * j / left_edge_taper)
)
# Apply left edge taper to data and update mask
segment_data = segment_data * left_taper
segment_mask = segment_mask * left_taper
# Right edge tapering for last segment
if i == len(self.segment_indices) - 1 and right_edge_taper is not None:
if right_edge_taper > 0 and right_edge_taper < segment_length:
# Create one-sided Tukey window (ramp down from 1 to 0)
right_taper = np.ones(segment_length)
for j in range(min(right_edge_taper, segment_length)):
# Cosine ramp from 1 to 0
idx = segment_length - 1 - j
right_taper[idx] = 0.5 * (
1 - np.cos(np.pi * j / right_edge_taper)
)
# Apply right edge taper to data and update mask
segment_data = segment_data * right_taper
segment_mask = segment_mask * right_taper
# Create time array for this segment
segment_length = end - start
segment_time = self.t0 + (start + np.arange(segment_length)) * self.dt
segments[segment_key] = {
"data": segment_data,
"time": segment_time,
"mask": segment_mask,
"start_idx": start,
"end_idx": end,
}
return segments
[docs]
def get_freq_info_from_segments(self) -> Dict[str, Dict[str, Any]]:
"""
Get frequency domain information for each segment.
Returns
-------
Dict[str, Dict[str, Any]]
Dictionary containing frequency information with keys:
- 'frequencies': Frequency bins for the segment
- 'fft': FFT of the segment data
- 'start_idx': Start index in original array
- 'end_idx': End index in original array
"""
freq_info = {}
for i, (start, end) in enumerate(self.segment_indices):
segment_key = f"segment_{i+1}"
# Extract segment data
segment_data = self.data[start:end]
segment_length = end - start
# Compute FFT
fft_data = np.fft.rfft(segment_data)
# Create frequency array
frequencies = np.fft.rfftfreq(segment_length, d=self.dt)
freq_info[segment_key] = {
"frequencies": frequencies,
"fft": fft_data,
"start_idx": start,
"end_idx": end,
}
return freq_info
[docs]
@classmethod
def from_gap_generator(
cls,
gap_window_generator: GapWindowGenerator,
data: NDArray,
dt: float,
t0: float = 0.0,
apply_tapering: bool = False,
taper_definitions: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None,
**kwargs,
) -> Tuple["DataSegmentGenerator", NDArray]:
"""
Create DataSegmentGenerator from a GapWindowGenerator.
This class method generates a mask using the provided GapWindowGenerator
and returns both the DataSegmentGenerator instance and the mask for
downstream reuse.
Parameters
----------
gap_window_generator : GapWindowGenerator
Configured GapWindowGenerator instance.
data : NDArray
Time series data to segment.
dt : float
Sampling interval.
t0 : float, optional
Start time. Default is 0.0.
apply_tapering : bool, optional
Whether to apply tapering to the mask. Default is False.
taper_definitions : dict, optional
Tapering definitions for the mask.
**kwargs
Additional arguments passed to generate_window().
Returns
-------
Tuple[DataSegmentGenerator, NDArray]
Tuple containing:
- DataSegmentGenerator instance
- The generated mask (for downstream reuse)
"""
# Generate mask using the GapWindowGenerator
mask = gap_window_generator.generate_window(
apply_tapering=apply_tapering, taper_definitions=taper_definitions, **kwargs
)
# Create DataSegmentGenerator instance
segmenter = cls(mask=mask, data=data, dt=dt, t0=t0)
return segmenter, mask
[docs]
def summary(self) -> Dict[str, Any]:
"""
Get summary information about the segmentation.
Returns
-------
Dict[str, Any]
Summary containing:
- Number of segments
- Total data length
- Total valid data length
- Segment lengths
- Gap information
"""
total_length = len(self.data)
valid_length = np.sum(self.binary_mask)
gap_length = total_length - valid_length
segment_lengths = [end - start for start, end in self.segment_indices]
return {
"total_segments": len(self.segment_indices),
"total_data_length": total_length,
"valid_data_length": valid_length,
"gap_data_length": gap_length,
"data_fraction_valid": (
valid_length / total_length if total_length > 0 else 0
),
"segment_lengths": segment_lengths,
"min_segment_length": min(segment_lengths) if segment_lengths else 0,
"max_segment_length": max(segment_lengths) if segment_lengths else 0,
"mean_segment_length": np.mean(segment_lengths) if segment_lengths else 0,
"sampling_interval": self.dt,
"start_time": self.t0,
}