import numpy as np
from scipy.interpolate import interp1d
import logging
from ..io.image import ImageFile
logger = logging.getLogger(__name__)
[docs]
def scrunch_open_arc(args):
"""
Returns the filename of the arc file from args.
Corresponds to SCRUNCH_OPEN_ARC in Fortran.
Parameters
----------
args : dict
Arguments dictionary containing 'WAVEL_FILENAME'.
Returns
-------
str or None
The filename of the arc file, or None if not found/empty.
"""
fname = args.get('WAVEL_FILENAME')
if not fname:
# Check TDFIO_NOID logic: if empty, return None
return None
return fname
[docs]
def rebin_data(spectra, variance, input_wave, output_wave):
"""
Rebins spectra and variance from input_wave to output_wave using linear interpolation.
Parameters
----------
spectra : np.ndarray
Input spectra (NSPEC, NFIB)
variance : np.ndarray
Input variance (NSPEC, NFIB)
input_wave : np.ndarray
Input wavelength axis. Can be 1D (NSPEC,) or 2D (NSPEC, NFIB).
output_wave : np.ndarray
Output wavelength axis. Can be 1D (NSPEC_OUT,) or 2D (NSPEC_OUT, NFIB).
Returns
-------
tuple
(out_spec, out_var) - Rebinned spectra and variance
"""
nspec, nfib = spectra.shape
# Check dimensions
in_wave_2d = input_wave.ndim == 2
out_wave_2d = output_wave.ndim == 2
# Determine output shape
if out_wave_2d:
nout = output_wave.shape[0]
else:
nout = len(output_wave)
out_spec = np.zeros((nout, nfib), dtype=np.float32)
out_var = np.zeros((nout, nfib), dtype=np.float32)
for f in range(nfib):
# Get x_in for this fiber
if in_wave_2d:
x_in = input_wave[:, f]
else:
x_in = input_wave
# Get x_out for this fiber
if out_wave_2d:
x_out = output_wave[:, f]
else:
x_out = output_wave
# Interpolate
# Note: interp1d requires x_in to be strictly increasing?
# Usually arc solutions are increasing.
# If not, we might need to sort, but wavelength solutions should be monotonic.
f_flux = interp1d(x_in, spectra[:, f], kind='linear', bounds_error=False, fill_value=0.0)
out_spec[:, f] = f_flux(x_out)
if variance is not None:
f_var = interp1d(x_in, variance[:, f], kind='linear', bounds_error=False, fill_value=0.0)
out_var[:, f] = f_var(x_out)
return out_spec, out_var
[docs]
def scrunch_from_arc_id(obj_filename, arc_filename, args, reverse=False):
"""
Scrunches (rebins) the object file using wavelength solution from the arc file.
If reverse=True, performs the reverse operation:
Assumes the object file is currently on the linear grid defined by the arc,
and rebins it back to the original pixel grid of the arc.
Parameters
----------
obj_filename : str
Path to the object FITS file to scrunch (modified in place)
arc_filename : str
Path to the arc FITS file containing WAVELA extension.
args : dict
Arguments dictionary (unused in logic but kept for interface compatibility).
reverse : bool
If True, un-scrunch (Linear -> Pixel). Default False (Pixel -> Linear).
"""
logger.info(f"Scrunching {obj_filename} using {arc_filename} (Reverse={reverse})")
# Open Object File
with ImageFile(obj_filename, mode='UPDATE') as obj_file:
spectra = obj_file.read_image_data().T # (NSPEC, NFIB)
variance = obj_file.read_variance_data().T
nx, nf = obj_file.get_size()
# Open Arc File to get Wavelength Solution
arc_wave = None
if arc_filename:
with ImageFile(arc_filename, mode='READ') as arc_file:
# Read WAVELA from Arc
# read_wave_data returns (NFIB, NSPEC) usually based on IO module
# Let's verify shape by checking.
# In reduce_arc.py: red_file.write_wave_data(new_wave.T) where new_wave is (NPIX, NFIB).
# So write_wave_data takes (NFIB, NPIX).
# read_wave_data returns what write_wave_data wrote.
# So read_wave_data returns (NFIB, NPIX).
# We want (NPIX, NFIB) for our logic.
raw_wave = arc_file.read_wave_data()
if raw_wave is not None:
arc_wave = raw_wave.T
else:
logger.warning(f"No WAVELA found in Arc file {arc_filename}.")
if arc_wave is None:
# Unit scaling fallback: 0 to NX-1
logger.info("Using unit scaling for wavelength.")
arc_wave = np.zeros((nx, nf))
for f in range(nf):
arc_wave[:, f] = np.arange(nx)
# Determine Global Output Axis (Linear)
# Defined by min/max of the Arc Solution
min_wave = np.min(arc_wave)
max_wave = np.max(arc_wave)
# Calculate dispersion (approx) from center fiber to determine step size
center_fib = nf // 2
center_wave = arc_wave[:, center_fib]
# Robust dispersion estimate
disp = (center_wave[-1] - center_wave[0]) / (len(center_wave) - 1)
# Define linear output axis (used as the common scrunched wavelength grid)
out_axis = np.linspace(min_wave, max_wave, nx)
# Perform Scrunch
if not reverse:
# Forward: Input=Pixel-based Wavelengths (Arc Wave). Output=Linear Axis.
new_spectra, new_var = rebin_data(spectra, variance, arc_wave, out_axis)
# Update Object
obj_file.write_image_data(new_spectra.T)
obj_file.write_variance_data(new_var.T)
obj_file.set_header_value("SCRUNCH", True)
# Update WAVELA to store the scrunched wavelength solution.
# We write a 2D array (NFIB, NPIX) where each fiber shares the same
# 1D linear wavelength axis (out_axis). This matches how arc RED
# files store WAVELA (per-fiber rows, spectral axis along NAXIS1)
# and ensures downstream code (e.g. flux calibration) sees the
# correct wavelength grid for scrunched data.
try:
wave_2d = np.tile(out_axis, (nf, 1)).astype(np.float32)
obj_file.write_wave_data(wave_2d)
except ValueError:
# If no WAVELA HDU is present, skip silently but keep SCRUNCH flag.
logger.warning(
"WAVELA HDU not found in %s; scrunched wavelengths not written.",
obj_filename,
)
else:
# Reverse: Input=Linear Axis (out_axis). Output=Pixel-based Wavelengths (Arc Wave).
# We map FROM the linear grid TO the pixels.
new_spectra, new_var = rebin_data(spectra, variance, out_axis, arc_wave)
obj_file.write_image_data(new_spectra.T)
obj_file.write_variance_data(new_var.T)
# Typically we don't clear SCRUNCH flag here as this is usually an intermediate step,
# but if it was the final state, we might. REDUCE_FFLAT handles the final flag.