Source code for kspecdr.fluxcal.masks

"""
Wavelength mask utilities for kspecdr flux calibration.

Masks are lists of ``(lam_lo, lam_hi)`` pairs in Angstrom that mark regions
to *exclude* from calibration fits, continuum normalisation, and template
matching (e.g. telluric absorption bands, detector bad regions).

Mask file format
----------------
Two-column ASCII, whitespace-separated.  Lines starting with ``#`` are
comments and are ignored::

    6860.0    6960.0    # O2 B-band
    7150.0    7340.0    # H2O

The default mask is loaded from ``data/masks/telluric_default.dat``.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import List, Optional

import numpy as np

from .containers import Spectrum1D

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------

# src/kspecdr/fluxcal/masks.py  →  repo root (4 × .parent)
_REPO_ROOT = Path(__file__).parent.parent.parent.parent
_MASK_DIR  = _REPO_ROOT / "data" / "masks"

# In-memory cache for loaded mask files
_mask_cache: dict[str, List[tuple[float, float]]] = {}

# ---------------------------------------------------------------------------
# I/O
# ---------------------------------------------------------------------------

[docs] def load_mask_regions( mask_name: str = "telluric_default", ) -> List[tuple[float, float]]: """Load wavelength exclusion regions from ``data/masks/{mask_name}.dat``. Results are cached after the first load. Parameters ---------- mask_name : str Base name of the mask file (without ``.dat``). Default: ``"telluric_default"``. Returns ------- list of (lam_lo, lam_hi) Wavelength intervals in Angstrom to be masked (excluded). Raises ------ FileNotFoundError If the mask file does not exist. """ if mask_name in _mask_cache: return _mask_cache[mask_name] path = _MASK_DIR / f"{mask_name}.dat" if not path.exists(): raise FileNotFoundError( f"Mask file not found: {path}\n" "Available masks: " + ", ".join(p.stem for p in _MASK_DIR.glob("*.dat")) ) regions: List[tuple[float, float]] = [] with open(path) as fh: for line in fh: stripped = line.split("#")[0].strip() if not stripped: continue parts = stripped.split() if len(parts) < 2: continue lo, hi = float(parts[0]), float(parts[1]) if lo >= hi: logger.warning( "Skipping degenerate mask region in '%s': %.1f >= %.1f", mask_name, lo, hi, ) continue regions.append((lo, hi)) logger.debug( "Loaded %d mask regions from '%s'", len(regions), mask_name ) _mask_cache[mask_name] = regions return regions
[docs] def load_mask_array( wavelength: np.ndarray, mask_name: str = "telluric_default", ) -> np.ndarray: """Return a boolean mask array for a given wavelength axis. Parameters ---------- wavelength : ndarray, shape (N,) Wavelength axis in Angstrom. mask_name : str Mask file to load (see :func:`load_mask_regions`). Returns ------- ndarray of bool, shape (N,) True for *good* (unmasked) pixels; False inside any exclusion region. """ regions = load_mask_regions(mask_name) return _regions_to_array(wavelength, regions)
# --------------------------------------------------------------------------- # Application to spectra # ---------------------------------------------------------------------------
[docs] def apply_mask_regions( spectrum: Spectrum1D, regions: List[tuple[float, float]], ) -> Spectrum1D: """Set ``mask=False`` for all pixels inside any exclusion region. Returns a *new* :class:`~.containers.Spectrum1D` with an updated mask; the original is not modified. Parameters ---------- spectrum : Spectrum1D regions : list of (lam_lo, lam_hi) Wavelength intervals to exclude (Angstrom). Returns ------- Spectrum1D New spectrum with pixels inside *regions* masked out. """ new_mask = spectrum.mask.copy() bad = _regions_to_array(spectrum.wavelength, regions) new_mask &= bad # bad is True for GOOD pixels; AND with existing mask return Spectrum1D( wavelength=spectrum.wavelength.copy(), flux=spectrum.flux.copy(), variance=spectrum.variance.copy(), mask=new_mask, meta=dict(spectrum.meta), )
[docs] def apply_named_mask( spectrum: Spectrum1D, mask_name: str = "telluric_default", ) -> Spectrum1D: """Convenience wrapper: load mask by name and apply to *spectrum*. Parameters ---------- spectrum : Spectrum1D mask_name : str Mask file base name (see :func:`load_mask_regions`). Returns ------- Spectrum1D """ regions = load_mask_regions(mask_name) return apply_mask_regions(spectrum, regions)
# --------------------------------------------------------------------------- # Utility helpers # --------------------------------------------------------------------------- def _regions_to_array( wavelength: np.ndarray, regions: List[tuple[float, float]], ) -> np.ndarray: """Build a boolean array: True = pixel is *outside* all exclusion regions. Parameters ---------- wavelength : ndarray, shape (N,) regions : list of (lam_lo, lam_hi) Returns ------- ndarray of bool, shape (N,) """ good = np.ones(len(wavelength), dtype=bool) for lo, hi in regions: good &= ~((wavelength >= lo) & (wavelength <= hi)) return good
[docs] def mask_coverage_fraction( wavelength: np.ndarray, mask_name: str = "telluric_default", ) -> float: """Return the fraction of wavelength pixels inside the named mask. Useful for QC: large fractions (> 0.3) may indicate a wavelength unit mismatch. Parameters ---------- wavelength : ndarray In Angstrom. mask_name : str Returns ------- float Fraction in [0, 1]. """ good = load_mask_array(wavelength, mask_name) return float((~good).sum() / len(wavelength))
[docs] def combine_regions( *region_lists: List[tuple[float, float]], ) -> List[tuple[float, float]]: """Merge multiple region lists and sort by start wavelength. Overlapping or adjacent regions are *not* merged — they are stacked and applied independently, which produces the same result. Parameters ---------- *region_lists : list of (lam_lo, lam_hi) Returns ------- list of (lam_lo, lam_hi) """ combined = [] for rl in region_lists: combined.extend(rl) return sorted(combined, key=lambda r: r[0])