"""
Preprocessing and Reduction Routines for KSPEC.
This module implements the high-level reduction functions similar to 2dfdr:
- combine_image: Combine multiple images
- reduce_bias: Process bias frames
- reduce_dark: Process dark frames
- reduce_lflat: Process long-slit flat frames
- reduce_fflat: Process fiber flat frames
- reduce_arc: Process arc frames
"""
import os
import logging
import numpy as np
from pathlib import Path
from typing import List, Optional, Union, Tuple
from astropy.io import fits
from astropy.stats import sigma_clip
from scipy.ndimage import median_filter
from ..io.image import ImageFile
from .make_im import make_im
logger = logging.getLogger(__name__)
[docs]
def combine_image(
input_files: List[str],
output_file: str,
method: str = 'MEDIAN',
adjust_levels: bool = True,
level_method: str = 'SCALE',
sigma: float = 5.0,
**kwargs
) -> str:
"""
Combine multiple images into a single image.
Parameters
----------
input_files : List[str]
List of input IM filenames
output_file : str
Output combined filename
method : str
Combination method ('MEDIAN', 'MEAN', 'SIGMA_CLIP'). Default is 'MEDIAN'.
adjust_levels : bool
Whether to adjust levels (normalize) before combining. Default is True.
level_method : str
Method for level adjustment when ``adjust_levels`` is True.
'SCALE' (default): multiplicative scaling using median of central region.
'OFFSET': additive offset using median of central region.
sigma : float
Sigma value for sigma clipping (used only if method='SIGMA_CLIP'). Default is 5.0.
**kwargs
Additional arguments
Returns
-------
str
Path to the created combined file
"""
logger.info(f"Combining {len(input_files)} images into {output_file} using {method}")
if not input_files:
raise ValueError("No input files provided for combination")
if len(input_files) == 1:
logger.warning("Only one file provided. Copying input to output.")
import shutil
shutil.copy2(input_files[0], output_file)
return output_file
# Read the first file to get dimensions and header
with ImageFile(input_files[0]) as first_file:
nx, ny = first_file.get_size()
ref_header = first_file.hdul[0].header
instrument_code = first_file.get_instrument_code()
# Initialize arrays
n_files = len(input_files)
# Using (ny, nx) to match read_image_data output shape (rows, cols)
stack_data = np.zeros((n_files, ny, nx), dtype=np.float32)
stack_var = np.zeros((n_files, ny, nx), dtype=np.float32)
levels = np.ones(n_files, dtype=float)
# Read all files
for i, fname in enumerate(input_files):
logger.info(f"Reading {fname} ({i+1}/{n_files})")
with ImageFile(fname) as im_file:
data = im_file.read_image_data()
var = im_file.read_variance_data()
stack_data[i, :, :] = data
stack_var[i, :, :] = var
# Calculate scaling if requested (using median of central region)
if adjust_levels:
# Use central 50% of image for stats
# data shape is (rows, cols) -> (ny, nx)
# So we slice [y1:y2, x1:x2]
row1, row2 = ny // 4, 3 * ny // 4
col1, col2 = nx // 4, 3 * nx // 4
med_val = np.nanmedian(data[row1:row2, col1:col2])
if med_val > 0:
levels[i] = med_val
else:
levels[i] = 1.0
# Normalize scales
if adjust_levels:
lm = level_method.upper()
if lm == 'SCALE':
# Multiplicative scaling so that central-region medians match
scales = levels.copy()
scales /= np.median(scales)
logger.info(f"Relative flux scalings: {scales}")
for i in range(n_files):
stack_data[i, :, :] /= scales[i]
stack_var[i, :, :] /= (scales[i] ** 2)
elif lm == 'OFFSET':
# Additive offsets so that central-region medians match
ref_level = np.median(levels)
offsets = levels - ref_level
logger.info(f"Relative flux offsets: {offsets}")
for i in range(n_files):
stack_data[i, :, :] -= offsets[i]
# Variance is unchanged by constant offsets
else:
raise ValueError(f"Unknown level_method: {level_method}")
# Combine
if method.upper() == 'MEDIAN':
combined_data = np.nanmedian(stack_data, axis=0)
# Approximate variance for median: Var_med approx (pi/2) * Var_mean
# Var_mean = sum(Var_i) / N^2
mean_var = np.nansum(stack_var, axis=0) / (n_files**2)
combined_var = mean_var * (np.pi / 2.0)
# More robust variance estimate from data scatter could be:
# combined_var = np.nanvar(stack_data, axis=0) / n_files
elif method.upper() == 'MEAN':
combined_data = np.nanmean(stack_data, axis=0)
combined_var = np.nansum(stack_var, axis=0) / (n_files**2)
elif method.upper() == 'SIGMA_CLIP':
# Use astropy sigma_clip
logger.info(f"Using sigma clipping with sigma={sigma}")
# Mask invalid values before sigma clipping
masked_data = np.ma.masked_invalid(stack_data)
# Perform sigma clipping along the file axis (axis 0)
# This returns a masked array
clipped_data = sigma_clip(masked_data, sigma=sigma, axis=0, maxiters=5)
# Calculate mean of clipped data
combined_data = np.ma.mean(clipped_data, axis=0).filled(np.nan)
# Calculate variance of clipped data (std dev / sqrt(N))
# Or propagate input variances...
# For simplicity and robustness, we propagate the input variances of the surviving pixels
# Var_mean = Sum(Var_i) / N_good^2
# Create mask of kept pixels
mask = np.ma.getmaskarray(clipped_data)
valid_count = np.sum(~mask, axis=0)
# Sum variances of valid pixels
# masked_invalid handles NaNs in input variance
masked_var = np.ma.masked_array(stack_var, mask=mask)
sum_var = np.ma.sum(masked_var, axis=0).filled(np.nan)
# Avoid division by zero
with np.errstate(divide='ignore', invalid='ignore'):
combined_var = sum_var / (valid_count**2)
combined_var[valid_count == 0] = np.nan
else:
raise ValueError(f"Unknown combination method: {method}")
# Create output file
# We create a new file based on the header of the first file
hdu = fits.PrimaryHDU(combined_data, header=ref_header)
# Update header
hdu.header['HISTORY'] = f"Combined {n_files} images using {method}"
if adjust_levels:
hdu.header['HISTORY'] = f"Flux levels adjusted before combination (mode={level_method.upper()})"
# Create variance HDU
var_hdu = fits.ImageHDU(combined_var, name='VARIANCE')
hdul = fits.HDUList([hdu, var_hdu])
# Handle fiber table if present in first file
with ImageFile(input_files[0]) as first_file:
if first_file.has_fiber_table():
fiber_data = first_file.read_fiber_table()
if fiber_data is not None:
table_name = first_file.get_fiber_table_name() or 'FIBRES'
fib_hdu = fits.BinTableHDU(fiber_data, name=table_name)
hdul.append(fib_hdu)
hdul.writeto(output_file, overwrite=True)
hdul.close()
logger.info(f"Created combined file: {output_file}")
return output_file
[docs]
def reduce_bias(
raw_files: List[str],
output_file: str = "BIAScombined.fits",
bias_type: str = "MASTER",
method: Optional[str] = 'SIGMA_CLIP',
sigma: Optional[float] = 4.0,
adjust_levels: bool = True,
level_method: str = "OFFSET",
**kwargs
) -> str:
"""
Reduce bias frames: Make IM files and combine them.
Parameters
----------
raw_files : List[str]
List of raw bias filenames
output_file : str
Output master bias filename
bias_type : str
Type of bias frame ('MASTER', 'OVERSCAN', 'OTHER')
method : str, optional
Combination method ('MEDIAN', 'MEAN', 'SIGMA_CLIP'). Default is 'SIGMA_CLIP'.
sigma : float, optional
Sigma value for sigma clipping (used only if method='SIGMA_CLIP'). Default is 4.0.
adjust_levels : bool, optional
Whether to adjust levels (normalize) before combining. Default is True.
level_method : str, optional
Method for level adjustment when ``adjust_levels`` is True.
'SCALE' (default): multiplicative scaling using median of central region.
'OFFSET': additive offset using median of central region.
Returns
-------
str
Path to the master bias file
"""
logger.info(f"Reducing {len(raw_files)} bias frames")
im_files = []
for raw_file in raw_files:
# Create IM file
im_file = make_im(
raw_filename=raw_file,
use_bias=False, # Bias frames don't get bias subtracted
use_dark=False,
mark_saturated=False,
**kwargs
)
im_files.append(im_file)
# Combine bias frames
# Use SIGMA_CLIP for bias frames as per 2dfdr standard
combined_file = combine_image(
im_files,
output_file,
method=method,
sigma=sigma,
adjust_levels=adjust_levels,
level_method=level_method,
)
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('BIAS')
im.set_header_value('BIASTYPE', bias_type)
return combined_file
[docs]
def reduce_dark(
raw_files: List[str],
output_file: str = "DARKcombined.fits",
bias_filename: Optional[str] = None,
method: Optional[str] = 'MEDIAN',
sigma: Optional[float] = None,
**kwargs
) -> Union[str, List[str]]:
"""
Reduce dark frames: Make IM files (subtract bias) and combine them.
Parameters
----------
raw_files : List[str]
List of raw dark filenames
output_file : str
Output master dark filename
bias_filename : str, optional
Master bias file to subtract
method : str, optional
Combination method ('MEDIAN', 'MEAN', 'SIGMA_CLIP'). Default is 'MEDIAN'.
sigma : float, optional
Sigma value for sigma clipping (used only if method='SIGMA_CLIP'). Default is None.
Returns
-------
str or List[str]
Path to the master dark file(s). If multiple exposure times are present,
returns a list of filenames.
"""
logger.info(f"Reducing {len(raw_files)} dark frames")
im_files = []
for raw_file in raw_files:
# Create IM file with bias subtraction if provided
im_file = make_im(
raw_filename=raw_file,
use_bias=(bias_filename is not None),
bias_filename=bias_filename,
use_dark=False,
**kwargs
)
im_files.append(im_file)
# Group files by exposure time (as per 2dfdr standard)
files_by_et = {}
for im_f in im_files:
with ImageFile(im_f, mode='READ') as img:
# Get exposure time, defaulting to 0 if not present
exptime = float(img.get_header_value('EXPOSED', 0.0))
# Use int for grouping if close to integer, to avoid float precision issues
et_int = int(round(exptime))
if et_int not in files_by_et:
files_by_et[et_int] = []
files_by_et[et_int].append(im_f)
created_files = []
# Combine each group
if len(files_by_et) == 1:
# Single group case - use original output filename
combined_file = combine_image(
im_files,
output_file,
method=method,
sigma=sigma,
adjust_levels=False
)
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('DARK')
return combined_file
else:
logger.info(f"Found {len(files_by_et)} different exposure times. Splitting combination.")
base_path = Path(output_file)
stem = base_path.stem
suffix = base_path.suffix
parent = base_path.parent
for et, group_files in files_by_et.items():
# Construct new filename: stem_ETs.fits
# e.g. DARKcombined_1200s.fits
new_filename = parent / f"{stem}_{et:04d}s{suffix}"
new_filename_str = str(new_filename)
logger.info(f"Combining {len(group_files)} frames with exposure {et}s into {new_filename_str}")
combined_file = combine_image(
group_files,
new_filename_str,
method=method,
sigma=sigma,
adjust_levels=False
)
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('DARK')
created_files.append(combined_file)
return created_files
[docs]
def reduce_lflat(
raw_files: List[str],
output_file: str = "LFLATcombined.fits",
bias_filename: Optional[str] = None,
dark_filename: Optional[str] = None,
**kwargs
) -> str:
"""
Reduce long-slit flat frames: Make IM files (sub bias/dark) and combine.
Parameters
----------
raw_files : List[str]
List of raw lflat filenames
output_file : str
Output master lflat filename
bias_filename : str, optional
Master bias file
dark_filename : str, optional
Master dark file
Returns
-------
str
Path to the master lflat file
"""
logger.info(f"Reducing {len(raw_files)} LFLAT frames")
im_files = []
for raw_file in raw_files:
im_file = make_im(
raw_filename=raw_file,
use_bias=(bias_filename is not None),
bias_filename=bias_filename,
use_dark=(dark_filename is not None),
dark_filename=dark_filename,
**kwargs
)
im_files.append(im_file)
# Combine LFLATs
# We might want to normalize levels for flats
combined_file = combine_image(
im_files,
output_file,
method='MEDIAN',
adjust_levels=True
)
# Post-processing: LFLATs are often spatially smoothed
# TODO: Add spatial smoothing (e.g. median filter 5x5) if needed
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('LFLATCAL')
return combined_file
[docs]
def reduce_fflat(
raw_files: List[str],
output_file: str = "FFLATcombined.fits",
bias_filename: Optional[str] = None,
dark_filename: Optional[str] = None,
lflat_filename: Optional[str] = None,
**kwargs
) -> str:
"""
Reduce fiber flat frames: Make IM files, combine, and prepare for extraction.
Note: Full reduction of fiber flats typically involves extraction (Tramline Map)
and normalization, which may be handled by subsequent steps or modules.
This function currently performs the image-level preparation and combination.
Parameters
----------
raw_files : List[str]
List of raw fflat filenames
output_file : str
Output combined fflat filename
bias_filename : str, optional
Master bias file
dark_filename : str, optional
Master dark file
lflat_filename : str, optional
Master lflat file
Returns
-------
str
Path to the combined fflat file
"""
logger.info(f"Reducing {len(raw_files)} FFLAT frames")
im_files = []
for raw_file in raw_files:
im_file = make_im(
raw_filename=raw_file,
use_bias=(bias_filename is not None),
bias_filename=bias_filename,
use_dark=(dark_filename is not None),
dark_filename=dark_filename,
use_lflat=(lflat_filename is not None),
lflat_filename=lflat_filename,
**kwargs
)
im_files.append(im_file)
# Combine FFLATs
combined_file = combine_image(
im_files,
output_file,
method='MEDIAN',
adjust_levels=True
)
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('MFFFF') # Master Fiber Flat Field Frame? Or just FFLAT
return combined_file
[docs]
def reduce_arc(
raw_files: List[str],
output_file: str = "ARCcombined.fits",
bias_filename: Optional[str] = None,
dark_filename: Optional[str] = None,
lflat_filename: Optional[str] = None,
**kwargs
) -> str:
"""
Reduce arc frames: Make IM files, combine, and prepare for extraction.
Parameters
----------
raw_files : List[str]
List of raw arc filenames
output_file : str
Output combined arc filename
bias_filename : str, optional
Master bias file
dark_filename : str, optional
Master dark file
lflat_filename : str, optional
Master lflat file
Returns
-------
str
Path to the combined arc file
"""
logger.info(f"Reducing {len(raw_files)} ARC frames")
im_files = []
for raw_file in raw_files:
im_file = make_im(
raw_filename=raw_file,
use_bias=(bias_filename is not None),
bias_filename=bias_filename,
use_dark=(dark_filename is not None),
dark_filename=dark_filename,
use_lflat=(lflat_filename is not None),
lflat_filename=lflat_filename,
**kwargs
)
im_files.append(im_file)
# Combine Arcs
# Arcs shouldn't be flux scaled usually as lines might vary?
# But often they are stable. Safe to use MEDIAN without adjust_levels for arcs
# to preserve relative line intensities if exposure times are same.
combined_file = combine_image(
im_files,
output_file,
method='MEDIAN',
adjust_levels=False
)
with ImageFile(combined_file, mode='UPDATE') as im:
im.set_class('MFARC')
return combined_file