Source code for lisagap.gap_window_generator

"""
GapWindowGenerator - Advanced gap mask processing with tapering capabilities.

This module provides enhanced gap mask functionality built on top of lisaglitch.GapMaskGenerator,
focusing specifically on smooth tapering and window transitions.
"""

from collections.abc import Mapping
from typing import Any, Dict, Optional

import numpy as np
from lisaglitch import GapMaskGenerator
from numpy.typing import NDArray
from scipy.signal.windows import tukey


[docs] class GapWindowGenerator: """ Advanced gap mask generator with smooth tapering capabilities. This class wraps an existing lisaglitch.GapMaskGenerator instance and adds sophisticated tapering and windowing functionality for smooth transitions around gap edges. Parameters ---------- gap_mask_generator : GapMaskGenerator An instantiated GapMaskGenerator object from lisaglitch. Examples -------- >>> from lisaglitch import GapMaskGenerator >>> from lisagap import GapWindowGenerator >>> >>> # Create the core gap generator >>> gap_gen = GapMaskGenerator(sim_t=sim_t, gap_definitions=gap_defs) >>> >>> # Wrap it with windowing capabilities >>> window = GapWindowGenerator(gap_gen) >>> >>> # Generate masks with optional tapering >>> mask = window.generate_mask(apply_tapering=True, taper_definitions=taper_defs) """
[docs] def __init__(self, gap_mask_generator: GapMaskGenerator): """ Initialize a GapWindowGenerator from an existing GapMaskGenerator. Parameters ---------- gap_mask_generator : GapMaskGenerator An instantiated GapMaskGenerator object from lisaglitch. """ # Store the core generator self.gap_mask_generator = gap_mask_generator # Inherit key attributes from the core generator self.sim_t = gap_mask_generator.sim_t self.dt = gap_mask_generator.dt self.gap_definitions = gap_mask_generator.gap_definitions self.treat_as_nan = gap_mask_generator.treat_as_nan self.n_data = gap_mask_generator.n_data # Extract gap labels for tapering validation self.planned_labels = list(self.gap_definitions.get("planned", {}).keys()) self.unplanned_labels = list(self.gap_definitions.get("unplanned", {}).keys())
[docs] def generate_window( self, include_planned: bool = True, include_unplanned: bool = True, apply_tapering: bool = False, taper_definitions: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None, merge_close_gaps: bool = False, min_freq_resolution_hz: float = 1 / (3 * 3600), ) -> tuple[NDArray, Optional[Dict[str, Any]]]: """ Generate gap mask with optional tapering and gap merging. This method combines gap generation from the underlying GapMaskGenerator with optional smooth tapering and gap merging to create production-ready masks. Parameters ---------- include_planned : bool, optional Include planned gaps in the mask. Default is True. include_unplanned : bool, optional Include unplanned gaps in the mask. Default is True. apply_tapering : bool, optional Whether to apply smooth tapering around gaps. Default is False. taper_definitions : dict, optional Tapering parameters for each gap type. Required if apply_tapering=True. Expected structure: { "planned": { "gap_name": {"lobe_lengths_hr": float} }, "unplanned": { "gap_name": {"lobe_lengths_hr": float} } } merge_close_gaps : bool, optional If True, merge gaps that are close together to avoid short segments with poor frequency resolution. Default is False. min_freq_resolution_hz : float, optional Minimum frequency resolution for data segments. Segments shorter than 1/min_freq_resolution_hz will be merged with adjacent gaps. Default is 1/(3*3600) Hz ≈ 9.26e-5 Hz (3 hour minimum segments). Returns ------- mask : np.ndarray Gap mask with 1.0 for good data and 0.0/NaN for gaps. If tapering is applied, values between 0 and 1 indicate the tapering transition regions. merge_stats : dict or None Dictionary containing merge statistics if merge_close_gaps=True, else None. Contains: 'segments_merged', 'original_duty_cycle', 'merged_duty_cycle', 'additional_data_lost_hr'. Examples -------- >>> # Basic mask generation >>> mask, stats = window.generate_window() >>> >>> # With gap merging >>> mask, stats = window.generate_window( ... merge_close_gaps=True, ... min_freq_resolution_hz=1/(2*3600) # 2 hour minimum segments ... ) >>> print(f"Merged {stats['segments_merged']} segments") >>> >>> # With tapering and merging >>> taper_defs = { ... "planned": {"maintenance": {"lobe_lengths_hr": 2.0}} ... } >>> mask, stats = window.generate_window( ... apply_tapering=True, ... taper_definitions=taper_defs, ... merge_close_gaps=True ... ) """ # Generate the base mask using the underlying GapMaskGenerator mask = self.gap_mask_generator.generate_mask( include_planned=include_planned, include_unplanned=include_unplanned ) # Initialize merge statistics merge_stats = None # Apply gap merging if requested if merge_close_gaps: mask, merge_stats = self._merge_close_gaps(mask, min_freq_resolution_hz) # Apply tapering if requested if apply_tapering: if taper_definitions is None: raise ValueError( "taper_definitions must be provided when apply_tapering=True" ) mask = self.apply_smooth_taper_to_mask(mask, taper_definitions) return mask, merge_stats
def _merge_close_gaps( self, mask: NDArray, min_freq_resolution_hz: float, ) -> tuple[NDArray, Dict[str, Any]]: """ Merge gaps that are too close together to avoid short segments. This method identifies continuous data segments that are shorter than the minimum duration required for the target frequency resolution, and converts them to gaps (zeros those segments). Parameters ---------- mask : np.ndarray Binary mask (1 = good data, 0/NaN = gap). min_freq_resolution_hz : float Minimum frequency resolution. Segments shorter than 1/min_freq_resolution_hz will be converted to gaps. Returns ------- merged_mask : np.ndarray Mask with short segments converted to gaps. stats : dict Statistics about the merging operation. """ # Calculate minimum segment duration in samples min_segment_duration_sec = 1.0 / min_freq_resolution_hz min_segment_samples = int(min_segment_duration_sec / self.dt) # Work with a copy merged_mask = mask.copy().astype(float) # Calculate original duty cycle original_valid = ( np.sum(mask != 0) if not self.treat_as_nan else np.sum(~np.isnan(mask)) ) original_duty_cycle = original_valid / len(mask) # Identify continuous data segments (non-gap regions) gap_value = 0.0 if not self.treat_as_nan else np.nan if self.treat_as_nan: gap_mask = np.isnan(mask) else: gap_mask = mask == gap_value # Find continuous data segments (where gap_mask is False) data_mask = ~gap_mask data_diff = np.diff(np.concatenate(([False], data_mask, [False])).astype(int)) segment_starts = np.where(data_diff == 1)[0] segment_ends = np.where(data_diff == -1)[0] # Track segments that will be merged segments_merged = 0 # Iterate through all segments and zero out short ones for start, end in zip(segment_starts, segment_ends): segment_length = end - start if segment_length < min_segment_samples: # This segment is too short - convert it to a gap if self.treat_as_nan: merged_mask[start:end] = np.nan else: merged_mask[start:end] = 0.0 segments_merged += 1 # Calculate merged duty cycle merged_valid = ( np.sum(merged_mask != 0) if not self.treat_as_nan else np.sum(~np.isnan(merged_mask)) ) merged_duty_cycle = merged_valid / len(merged_mask) # Calculate additional data lost additional_data_lost_samples = original_valid - merged_valid additional_data_lost_hr = (additional_data_lost_samples * self.dt) / 3600.0 # Build statistics dictionary stats = { "segments_merged": segments_merged, "original_duty_cycle": float(original_duty_cycle), "merged_duty_cycle": float(merged_duty_cycle), "additional_data_lost_hr": float(additional_data_lost_hr), "min_segment_duration_hr": min_segment_duration_sec / 3600.0, } return merged_mask, stats
[docs] def apply_smooth_taper_to_mask( self, mask: NDArray, taper_gap_definitions: Optional[Dict[str, Dict[str, Dict[str, Any]]]], ): """ Apply Tukey taper smoothing to an existing gap mask. This function takes as input a mask and applies smooth tapers to the end of the gaps using Tukey windows, which helps reduce spectral artifacts in frequency domain analysis. Parameters ---------- mask : np.ndarray Original binary or NaN mask (1 = good, 0/NaN = gap). taper_gap_definitions : dict Dictionary containing taper parameters per gap type. Expected structure: { "planned": { "gap_name": {"lobe_lengths_hr": float} }, "unplanned": { "gap_name": {"lobe_lengths_hr": float} } } Returns ------- np.ndarray A smoothed mask with tapering applied around each gap. """ # Validate taper definitions self._validate_taper_definitions(taper_gap_definitions) if taper_gap_definitions is None: return mask # No tapering applied # Work with a copy to avoid modifying the original smoothed_mask = mask.copy().astype(float) # Find gap regions in the mask gap_value = 0.0 if not self.treat_as_nan else np.nan # Identify gap regions if self.treat_as_nan: gap_mask = np.isnan(mask) else: gap_mask = mask == gap_value # Find start and end of gap segments gap_diff = np.diff(np.concatenate(([False], gap_mask, [False])).astype(int)) gap_starts = np.where(gap_diff == 1)[0] gap_ends = np.where(gap_diff == -1)[0] # Apply tapering to each gap for start, end in zip(gap_starts, gap_ends): # Determine which gap type this corresponds to (simplified approach) # In practice, you might want more sophisticated gap type identification for kind in ["planned", "unplanned"]: labels = ( self.planned_labels if kind == "planned" else self.unplanned_labels ) for label in labels: taper_info = taper_gap_definitions.get(kind, {}).get(label, {}) lobe_hr = taper_info.get("lobe_lengths_hr", 0.0) if lobe_hr > 0: # Convert hours to samples lobe_samples = int(lobe_hr * 3600 / self.dt) # Create tapering window win_start = max(0, start - lobe_samples) win_end = min(len(mask), end + lobe_samples) actual_win_len = win_end - win_start if actual_win_len > 2: # Need at least 3 points for tapering alpha = 2 * lobe_samples / actual_win_len alpha = min(alpha, 1.0) # Ensure alpha <= 1 taper = 1 - tukey(actual_win_len, alpha) smoothed_mask[win_start:win_end] = np.minimum( smoothed_mask[win_start:win_end], taper ) return smoothed_mask
def _validate_taper_definitions( self, taper_gap_definitions: Optional[Dict[str, Dict[str, Dict[str, Any]]]] ) -> None: """ Validate the structure and contents of taper_gap_definitions. Parameters ---------- taper_gap_definitions : dict or None The taper definitions to validate. Raises ------ TypeError, ValueError If any structural or semantic errors are detected. """ if taper_gap_definitions is None: return # None is valid if not isinstance(taper_gap_definitions, Mapping): raise TypeError("taper_gap_definitions must be a dictionary.") for kind in taper_gap_definitions: if kind not in ("planned", "unplanned"): raise ValueError( f"Invalid gap kind '{kind}'. Must be 'planned' or 'unplanned'." ) if not isinstance(taper_gap_definitions[kind], Mapping): raise TypeError( f"taper_gap_definitions['{kind}'] must be a dictionary." ) # Check that all gap names in taper definitions exist in gap definitions available_labels = ( self.planned_labels if kind == "planned" else self.unplanned_labels ) for gap_name in taper_gap_definitions[kind]: if gap_name not in available_labels: raise ValueError( f"Gap '{gap_name}' in taper_gap_definitions['{kind}'] " f"not found in gap_definitions. Available: {available_labels}" ) taper_params = taper_gap_definitions[kind][gap_name] if not isinstance(taper_params, Mapping): raise TypeError( f"Taper parameters for '{gap_name}' must be a dictionary." ) # Validate required parameters if "lobe_lengths_hr" not in taper_params: raise ValueError( f"Missing 'lobe_lengths_hr' in taper parameters for '{gap_name}'." ) lobe_hr = taper_params["lobe_lengths_hr"] if not isinstance(lobe_hr, (int, float)) or lobe_hr < 0: raise ValueError( f"'lobe_lengths_hr' for '{gap_name}' must be a non-negative number." )
[docs] def summary(self, mask: Optional[NDArray[np.float64]] = None) -> Dict[str, Any]: """ Return a summary of the gap configuration and mask statistics. Parameters ---------- mask : np.ndarray, optional If provided, includes statistics about this specific mask. Returns ------- dict Summary dictionary with configuration and statistics. """ return self.gap_mask_generator.summary(mask)
[docs] def save_to_hdf5(self, mask: NDArray, filename: str = "gap_mask_data.h5", **kwargs): """Save a gap mask to an HDF5 file.""" return self.gap_mask_generator.save_to_hdf5(mask, filename, **kwargs)
[docs] def construct_planned_gap_mask(self, **kwargs): """Generate only planned gaps mask.""" return self.gap_mask_generator.construct_planned_gap_mask(**kwargs)
[docs] def construct_unplanned_gap_mask(self, **kwargs): """Generate only unplanned gaps mask.""" return self.gap_mask_generator.construct_unplanned_gap_mask(**kwargs)
@property def planned_rates(self): """Get planned gap rates.""" return self.gap_mask_generator.planned_rates @property def unplanned_rates(self): """Get unplanned gap rates.""" return self.gap_mask_generator.unplanned_rates @property def planned_durations(self): """Get planned gap durations.""" return self.gap_mask_generator.planned_durations @property def unplanned_durations(self): """Get unplanned gap durations.""" return self.gap_mask_generator.unplanned_durations
[docs] @staticmethod def apply_proportional_tapering( mask_data: NDArray, dt: float = 1.0, short_taper_fraction: float = 0.25, medium_taper_fraction: float = 0.05, long_taper_fraction: float = 0.05, min_gap_points: int = 5, short_gap_threshold_minutes: float = 10.0, long_gap_threshold_hours: float = 10.0, ) -> NDArray: """ Apply proportional tapering to gaps in a mask loaded from .npy array. This method automatically detects gaps in the input mask and applies Tukey window tapering proportional to the gap duration. Different taper fractions are applied based on gap length categories. Parameters ---------- mask_data : np.ndarray Input mask data from .npy file. Can contain NaN or 0 for gaps. dt : float Time step in seconds between samples. short_taper_fraction : float, optional Fraction of gap duration to taper on each side for short gaps. Default is 0.25 (25% each side = 50% total taper). medium_taper_fraction : float, optional Fraction of gap duration to taper on each side for medium gaps. Default is 0.05 (5% each side = 10% total taper). long_taper_fraction : float, optional Fraction of gap duration to taper on each side for long gaps. Default is 0.05 (5% each side = 10% total taper). min_gap_points : int, optional Minimum number of consecutive gap points to apply tapering. Gaps shorter than this are left unchanged. Default is 5. short_gap_threshold_minutes : float, optional Threshold in minutes to distinguish short from medium gaps. Default is 10.0 minutes. long_gap_threshold_hours : float, optional Threshold in hours to distinguish medium from long gaps. Default is 10.0 hours. Returns ------- np.ndarray Tapered mask with smooth transitions around gap edges. Examples -------- >>> # Load mask from .npy file >>> mask = np.load('gap_mask.npy') >>> >>> # Apply proportional tapering >>> tapered_mask = GapWindowGenerator.apply_proportional_tapering( ... mask, dt=1.0 ... ) >>> >>> # Custom tapering for different gap categories >>> tapered_mask = GapWindowGenerator.apply_proportional_tapering( ... mask, ... dt=0.25, ... short_taper_fraction=0.3, # 30% each side for short gaps ... medium_taper_fraction=0.1, # 10% each side for medium gaps ... long_taper_fraction=0.02 # 2% each side for long gaps ... ) """ # Convert thresholds to samples short_threshold_samples = int(short_gap_threshold_minutes * 60 / dt) long_threshold_samples = int(long_gap_threshold_hours * 3600 / dt) # Work with a copy to avoid modifying the original tapered_mask = mask_data.copy().astype(float) # Detect gaps (both NaN and zero values) is_nan_gap = np.isnan(mask_data) # Replace any NaN values with zeros tapered_mask[is_nan_gap] = 0.0 is_zero_gap = mask_data == 0.0 gap_mask = is_zero_gap | is_nan_gap # Include BOTH zero and NaN gaps # Find gap segments gap_diff = np.diff(np.concatenate(([False], gap_mask, [False])).astype(int)) gap_starts = np.where(gap_diff == 1)[0] gap_ends = np.where(gap_diff == -1)[0] for i, (start, end) in enumerate(zip(gap_starts, gap_ends)): gap_length = end - start gap_duration_minutes = gap_length * dt / 60 # Skip very short gaps if gap_length < min_gap_points: continue # Categorize gap and select taper fraction if gap_length < short_threshold_samples: # Short gap taper_fraction = short_taper_fraction category = "short" elif gap_length < long_threshold_samples: # Medium gap taper_fraction = medium_taper_fraction category = "medium" else: # Long gap taper_fraction = long_taper_fraction category = "long" # Calculate taper window parameters taper_samples = int(taper_fraction * gap_length) # Ensure we have enough samples for tapering if taper_samples < 1: taper_samples = 1 elif 2 * taper_samples >= gap_length: # If taper would overlap, use maximum possible taper_samples = gap_length // 2 # Create extended window including taper regions win_start = max(0, start - taper_samples) win_end = min(len(mask_data), end + taper_samples) window_length = win_end - win_start if window_length < 3: continue # Calculate alpha for Tukey window # Alpha = fraction of window that is tapered alpha = 2 * taper_samples / window_length alpha = min(alpha, 1.0) # Ensure alpha <= 1 # Generate Tukey window (1 - tukey gives us the gap shape) taper_window = 1 - tukey(window_length, alpha) # Apply tapering tapered_mask[win_start:win_end] = np.minimum( tapered_mask[win_start:win_end], taper_window ) return tapered_mask
# Delegate other methods to the underlying gap_mask_generator def __getattr__(self, name): """Delegate unknown attributes to the underlying GapMaskGenerator.""" return getattr(self.gap_mask_generator, name)