Source code for kspecdr.fluxcal.templates

"""
BOSZ 2024 stellar template library for kspecdr flux calibration.

Provides:
- :func:`parse_bosz_filename` — extract stellar parameters from filename
- :class:`TemplateLibrary` — indexed access to the downloaded BOSZ subgrid
- :func:`prepare_template` — convolve to instrument resolution and resample
- :func:`resample_spectrum` — general-purpose 1-D resampling

BOSZ file format (``_resam.txt.gz``)
------------------------------------
Two-column ASCII (whitespace-separated, no header), one row per wavelength
point. The shared wavelength grid is in a separate file
(``bosz2024_wave_r10000.txt``).

- Column 0: ``H`` (erg/s/cm²/Å/steradian)
- Column 1: ``C`` (continuum; erg/s/cm²/Å/steradian)

Surface flux: ``F = 4π × H``.  Normalised flux: ``F/C = H/C``.
"""

from __future__ import annotations

import csv
import gzip
import logging
import re
from pathlib import Path
from typing import Dict, List, Optional, Sequence, Tuple

import numpy as np
from scipy.interpolate import interp1d
from scipy.ndimage import gaussian_filter1d

from .containers import Spectrum1D, StellarTemplate

logger = logging.getLogger(__name__)

__all__ = [
    "parse_bosz_filename",
    "TemplateLibrary",
    "prepare_template",
    "resample_spectrum",
]

# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------

_REPO_ROOT = Path(__file__).parent.parent.parent.parent
_DEFAULT_LIBRARY_DIR = _REPO_ROOT / "data" / "templates" / "bosz2024"

# ---------------------------------------------------------------------------
# Filename parser
# ---------------------------------------------------------------------------

_BOSZ_RE = re.compile(
    r"bosz2024_"
    r"(?P<atmos>[a-z]{2})_"
    r"t(?P<teff>\d+)_"
    r"g\+(?P<logg>[\d.]+)_"
    r"m(?P<feh>[+-][\d.]+)_"
    r"a(?P<alpha>[+-][\d.]+)_"
    r"c(?P<carbon>[+-][\d.]+)_"
    r"v(?P<vmicro>\d+)_"
    r"(?P<resolution>r\d+)_resam\.txt\.gz$"
)


[docs] def parse_bosz_filename(filename: str) -> Optional[Dict]: """Parse a BOSZ filename into a parameter dictionary. Parameters ---------- filename : str Filename (basename only) of a BOSZ ``_resam.txt.gz`` file. Returns ------- dict or None ``{atmos, teff, logg, feh, alpha_m, carbon_m, vmicro, resolution}`` with numeric types, or *None* if the filename does not match. """ m = _BOSZ_RE.match(Path(filename).name) if m is None: return None return { "atmos": m.group("atmos"), "teff": int(m.group("teff")), "logg": float(m.group("logg")), "feh": float(m.group("feh")), "alpha_m": float(m.group("alpha")), "carbon_m": float(m.group("carbon")), "vmicro": int(m.group("vmicro")), "resolution": m.group("resolution"), }
# --------------------------------------------------------------------------- # TemplateLibrary # ---------------------------------------------------------------------------
[docs] class TemplateLibrary: """Indexed library of BOSZ 2024 stellar template spectra. On first instantiation the library directory is scanned and an index CSV is written to ``<library_dir>/index_<resolution>.csv`` for fast subsequent starts. Parameters ---------- library_dir : str or Path, optional Root of the BOSZ subgrid. Defaults to ``data/templates/bosz2024/``. resolution : str Resolution subdirectory. Default ``"r10000"``. wave_range : (float, float), optional If given, trim the shared wavelength grid (and all loaded spectra) to this range in Angstrom, reducing memory. Default ``(3000, 11000)``. """ def __init__( self, library_dir: str | Path | None = None, resolution: str = "r10000", wave_range: Tuple[float, float] = (3000.0, 11000.0), ) -> None: self.library_dir = Path(library_dir) if library_dir else _DEFAULT_LIBRARY_DIR self.resolution = resolution self.wave_range = wave_range # Shared wavelength grid (trimmed to wave_range) wave_file = self.library_dir / f"bosz2024_wave_{resolution}.txt" if not wave_file.exists(): raise FileNotFoundError( f"Wavelength grid not found: {wave_file}\n" "Run: python -m kspecdr.fluxcal.download_bosz" ) full_wave = np.loadtxt(wave_file) if wave_range is not None: sel = (full_wave >= wave_range[0]) & (full_wave <= wave_range[1]) self._wave_slice = sel else: self._wave_slice = np.ones(len(full_wave), dtype=bool) self.wavelength = full_wave[self._wave_slice] # Build or load index self._index_file = self.library_dir / f"index_{resolution}.csv" self._index: List[Dict] = [] self._build_or_load_index() logger.info( "TemplateLibrary: %d templates, wave %.0f%.0f Å (%d pixels)", len(self._index), self.wavelength[0], self.wavelength[-1], len(self.wavelength), ) # ------------------------------------------------------------------ # Index management # ------------------------------------------------------------------ def _build_or_load_index(self) -> None: if self._index_file.exists(): self._load_index() return self._scan_and_write_index() def _scan_and_write_index(self) -> None: spec_dir = self.library_dir / self.resolution if not spec_dir.exists(): raise FileNotFoundError( f"Template spectra directory not found: {spec_dir}" ) entries = [] for gz_path in sorted(spec_dir.rglob("*.txt.gz")): params = parse_bosz_filename(gz_path.name) if params is None: continue params["filepath"] = str(gz_path.relative_to(self.library_dir)) entries.append(params) if not entries: raise RuntimeError(f"No BOSZ template files found in {spec_dir}") # Write CSV index fields = [ "filepath", "atmos", "teff", "logg", "feh", "alpha_m", "carbon_m", "vmicro", "resolution", ] with open(self._index_file, "w", newline="") as fh: writer = csv.DictWriter(fh, fieldnames=fields) writer.writeheader() writer.writerows(entries) self._index = entries logger.info( "Built template index: %d entries → %s", len(entries), self._index_file, ) def _load_index(self) -> None: entries = [] with open(self._index_file, newline="") as fh: reader = csv.DictReader(fh) for row in reader: row["teff"] = int(row["teff"]) row["logg"] = float(row["logg"]) row["feh"] = float(row["feh"]) row["alpha_m"] = float(row["alpha_m"]) row["carbon_m"] = float(row["carbon_m"]) row["vmicro"] = int(row["vmicro"]) entries.append(row) self._index = entries # ------------------------------------------------------------------ # Public queries # ------------------------------------------------------------------ @property def grid_params(self) -> np.ndarray: """Parameter array, shape ``(N, 4)`` → [Teff, logg, [M/H], [α/M]].""" return np.array( [[e["teff"], e["logg"], e["feh"], e["alpha_m"]] for e in self._index] )
[docs] def query( self, teff_range: Optional[Tuple[float, float]] = None, logg_range: Optional[Tuple[float, float]] = None, feh_range: Optional[Tuple[float, float]] = None, ) -> List[Dict]: """Return index entries within the given parameter box. Entries are *metadata dicts* (cheap); call :meth:`load_template` to read the actual spectral data. """ result = [] for e in self._index: if teff_range and not (teff_range[0] <= e["teff"] <= teff_range[1]): continue if logg_range and not (logg_range[0] <= e["logg"] <= logg_range[1]): continue if feh_range and not (feh_range[0] <= e["feh"] <= feh_range[1]): continue result.append(e) return result
[docs] def get_template( self, teff: float, logg: float, feh: float, alpha_m: Optional[float] = None, ) -> StellarTemplate: """Nearest-grid-point lookup and load. Parameters ---------- teff, logg, feh : float Requested stellar parameters. alpha_m : float, optional If None, defaults to 0.00 (or 0.25 for [M/H] ≤ −0.50). Returns ------- StellarTemplate """ if alpha_m is None: alpha_m = 0.25 if feh <= -0.50 else 0.00 target = np.array([teff, logg, feh, alpha_m]) params = self.grid_params # Weighted distance (Teff dominates unless normalised) scale = np.array([250.0, 0.5, 0.25, 0.25]) # grid step sizes dist = np.sum(((params - target) / scale) ** 2, axis=1) best_idx = int(np.argmin(dist)) return self.load_template(self._index[best_idx])
[docs] def load_template(self, entry: Dict) -> StellarTemplate: """Read a template spectrum from disk. Parameters ---------- entry : dict One element from :attr:`_index` (must have ``"filepath"`` key). Returns ------- StellarTemplate """ filepath = self.library_dir / entry["filepath"] with gzip.open(filepath, "rt") as fh: data = np.loadtxt(fh) # Columns: 0 = H (Eddington flux), 1 = C (continuum) h_all = data[:, 0] c_all = data[:, 1] # Trim to wave_range h = h_all[self._wave_slice] c = c_all[self._wave_slice] # Convert Eddington flux to surface flux: F = 4π H flux = 4.0 * np.pi * h cont = 4.0 * np.pi * c return StellarTemplate( wavelength=self.wavelength.copy(), flux=flux, continuum=cont, teff=entry["teff"], logg=entry["logg"], feh=entry["feh"], alpha_m=entry["alpha_m"], carbon_m=entry["carbon_m"], vmicro=float(entry["vmicro"]), atmos_model=entry["atmos"], source=Path(entry["filepath"]).name, )
def __len__(self) -> int: return len(self._index) def __repr__(self) -> str: return ( f"TemplateLibrary({len(self)} templates, " f"wave {self.wavelength[0]:.0f}{self.wavelength[-1]:.0f} Å)" )
# --------------------------------------------------------------------------- # Template preparation # ---------------------------------------------------------------------------
[docs] def resample_spectrum( wave_in: np.ndarray, flux_in: np.ndarray, wave_out: np.ndarray, fill_value: float = 0.0, ) -> np.ndarray: """Resample a 1-D spectrum onto a new wavelength grid. Uses linear interpolation. Out-of-range pixels are filled with *fill_value*. Parameters ---------- wave_in : ndarray, shape (M,) flux_in : ndarray, shape (M,) wave_out : ndarray, shape (N,) fill_value : float Returns ------- ndarray, shape (N,) """ f = interp1d( wave_in, flux_in, kind="linear", bounds_error=False, fill_value=fill_value, ) return f(wave_out)
[docs] def prepare_template( template: StellarTemplate, target_wavelength: np.ndarray, instrument_fwhm_angstrom: float, fwhm_poly_coeffs: Optional[Sequence[float]] = None, ) -> Spectrum1D: """Convolve a template to instrument resolution and resample. The BOSZ templates are provided at R = 10,000, which is higher than the typical KSPEC instrument resolution (R ~ 3,000–5,000). This function: 1. Computes the *additional* broadening needed (quadrature subtraction of the BOSZ native width from the requested instrument width). 2. Applies a Gaussian convolution (constant or wavelength-dependent). 3. Resamples onto the observed wavelength grid. 4. Carries the BOSZ continuum through (same operations applied). Parameters ---------- template : StellarTemplate A template loaded from :class:`TemplateLibrary`. target_wavelength : ndarray, shape (N,) Observed wavelength grid in Angstrom. instrument_fwhm_angstrom : float Instrument spectral FWHM in Angstrom (``SPECFWHM`` header keyword). Used when *fwhm_poly_coeffs* is None. fwhm_poly_coeffs : sequence of float, optional Polynomial coefficients giving FWHM(λ) in Angstrom as ``FWHM(λ) = c0 + c1*λ + c2*λ² + ...`` (from wavecal ``FWHMPCn`` header keywords). If provided, a wavelength-dependent convolution is applied. Returns ------- Spectrum1D Template on the observed grid. ``variance = 0``, ``mask = True`` everywhere. ``meta['continuum']`` contains the resampled continuum. """ tw = template.wavelength tf = template.flux.copy() tc = template.continuum.copy() # Step 1: compute the additional broadening # BOSZ at R = 10,000: native FWHM_bosz ≈ λ / R at each pixel # We need the quadrature difference if fwhm_poly_coeffs is not None: tf, tc = _convolve_variable(tw, tf, tc, fwhm_poly_coeffs, r_bosz=10_000) else: tf, tc = _convolve_constant(tw, tf, tc, instrument_fwhm_angstrom, r_bosz=10_000) # Step 2: resample flux_out = resample_spectrum(tw, tf, target_wavelength) cont_out = resample_spectrum(tw, tc, target_wavelength) n = len(target_wavelength) return Spectrum1D( wavelength=target_wavelength.copy(), flux=flux_out, variance=np.zeros(n, dtype=float), mask=np.ones(n, dtype=bool), meta={ "teff": template.teff, "logg": template.logg, "feh": template.feh, "alpha_m": template.alpha_m, "source": template.source, "continuum": cont_out, }, )
# --------------------------------------------------------------------------- # Convolution internals # --------------------------------------------------------------------------- _FWHM_TO_SIGMA = 1.0 / (2.0 * np.sqrt(2.0 * np.log(2.0))) def _convolve_constant( wave: np.ndarray, flux: np.ndarray, cont: np.ndarray, instrument_fwhm: float, r_bosz: int, ) -> Tuple[np.ndarray, np.ndarray]: """Apply constant-width Gaussian broadening.""" # Representative wavelength for the middle of the range lam_mid = 0.5 * (wave[0] + wave[-1]) fwhm_bosz = lam_mid / r_bosz fwhm_add = np.sqrt(max(instrument_fwhm ** 2 - fwhm_bosz ** 2, 0.0)) if fwhm_add <= 0: return flux, cont # Convert to pixel sigma dlam = np.median(np.diff(wave)) sigma_pix = (fwhm_add * _FWHM_TO_SIGMA) / dlam return gaussian_filter1d(flux, sigma_pix), gaussian_filter1d(cont, sigma_pix) def _convolve_variable( wave: np.ndarray, flux: np.ndarray, cont: np.ndarray, fwhm_coeffs: Sequence[float], r_bosz: int, ) -> Tuple[np.ndarray, np.ndarray]: """Apply wavelength-dependent Gaussian broadening. For each pixel, the local broadening kernel is computed from the polynomial FWHM(λ) minus the native BOSZ resolution in quadrature. This is implemented as a loop over wavelength chunks for efficiency. """ fwhm_inst = np.polyval(fwhm_coeffs[::-1], wave) # polyval wants high→low fwhm_bosz = wave / r_bosz fwhm_add = np.sqrt(np.maximum(fwhm_inst ** 2 - fwhm_bosz ** 2, 0.0)) dlam = np.gradient(wave) sigma_pix = (fwhm_add * _FWHM_TO_SIGMA) / dlam # Variable-sigma convolution: approximate by chunked constant-sigma n = len(wave) n_chunks = max(1, n // 500) flux_out = np.empty_like(flux) cont_out = np.empty_like(cont) chunk_edges = np.linspace(0, n, n_chunks + 1, dtype=int) for i in range(n_chunks): s, e = chunk_edges[i], chunk_edges[i + 1] # Use wider context to avoid edge artefacts ctx_s = max(0, s - 50) ctx_e = min(n, e + 50) sig = np.median(sigma_pix[s:e]) if sig > 0.1: chunk_f = gaussian_filter1d(flux[ctx_s:ctx_e], sig) chunk_c = gaussian_filter1d(cont[ctx_s:ctx_e], sig) flux_out[s:e] = chunk_f[s - ctx_s : e - ctx_s] cont_out[s:e] = chunk_c[s - ctx_s : e - ctx_s] else: flux_out[s:e] = flux[s:e] cont_out[s:e] = cont[s:e] return flux_out, cont_out