Source code for kspecdr.preproc.make_im

"""
Make IM (Image) Module

This module handles the preprocessing of raw astronomical data to create IM files.
It performs the following steps:
1. Copy raw file to IM file
2. Mark bad pixels
3. Subtract bias and process overscan
4. Subtract dark frame (optional)
5. Create and initialize variance HDU
6. Apply long-slit flat field (optional)
7. Remove cosmic rays (optional)

Based on the Fortran MAKE_IM routine from 2dfdr.
"""

import logging
import sys
import numpy as np
from pathlib import Path
from typing import Optional, Tuple, Dict, Any
from astropy.io import fits
from astropy.table import Table
import warnings

try:
    import astroscrappy

    HAS_ASTROSCRAPPY = True
except ImportError:
    HAS_ASTROSCRAPPY = False
    logger = logging.getLogger(__name__)
    logger.warning("astroscrappy not available. Cosmic ray removal will not work.")

from ..io.image import ImageFile

logger = logging.getLogger(__name__)


[docs] class MakeIM: """ Handles preprocessing of raw astronomical data to create IM files. This class implements the equivalent of the Fortran MAKE_IM routine, performing all necessary preprocessing steps to convert raw instrument data into calibrated image files with proper variance estimates. """ def __init__(self, verbose: bool = True): """ Initialize the MakeIM processor. Parameters ---------- verbose : bool, optional Whether to print verbose output during processing """ self.verbose = verbose self.saturation_value = 65535.0 # 16-bit saturation value self.saturation_tolerance = 0.99 * self.saturation_value
[docs] def process_raw_to_im( self, raw_filename: str, im_filename: Optional[str] = None, use_bias: bool = False, bias_filename: Optional[str] = None, use_dark: bool = False, dark_filename: Optional[str] = None, use_glow_pca: bool = False, glow_pca_filename: Optional[str] = None, dc_rate_filename: Optional[str] = None, glow_n_components: int = 5, glow_fit_rows: Optional[list] = None, use_lflat: bool = False, lflat_filename: Optional[str] = None, bad_pixel_mask: Optional[str] = None, bad_pixel_mask2: Optional[str] = None, mark_saturated: bool = True, cosmic_ray_method: str = "NONE", **kwargs, ) -> str: """ Process raw file to create IM file with all preprocessing steps. Parameters ---------- raw_filename : str Path to the raw input file im_filename : str, optional Path for the output IM file. If None, will be derived from raw_filename use_bias : bool, optional Whether to subtract bias frame use_dark : bool, optional Whether to subtract dark frame dark_filename : str, optional Path to dark frame file use_glow_pca : bool, optional Whether to subtract glow using PCA templates instead of (or in addition to) simple dark subtraction. glow_pca_filename : str, optional Path to the PCA glow cube FITS file (glow_pca_cube.fits). Must contain a PRIMARY HDU with the mean glow pattern and extensions named PC01…PCxx with individual PCA component images. dc_rate_filename : str, optional Path to a FITS file with the per-pixel dark-current rate [e-/s]. If None, the DC-current contribution is assumed to be zero and only the glow (mean + PCA) model is subtracted. glow_n_components : int, optional Number of PCA components to use in the glow fit (default: 5). glow_fit_rows : list of (int, int), optional Row ranges (axis=0) to include when fitting the glow model, e.g. ``[(0, 500), (800, 1300)]``. Pixels outside these ranges are excluded from the least-squares fit but the model is still evaluated and subtracted everywhere. If None, all finite pixels are used. use_lflat : bool, optional Whether to divide by long-slit flat field lflat_filename : str, optional Path to long-slit flat field file bad_pixel_mask : str, optional Path to bad pixel mask file bad_pixel_mask2 : str, optional Path to second bad pixel mask file (e.g., cosmic ray mask) mark_saturated : bool, optional Whether to mark saturated pixels as bad cosmic_ray_method : str, optional Method for cosmic ray removal ('NONE', 'LACOSMIC', 'BCLEAN', 'PYCOSMIC') **kwargs Additional keyword arguments Returns ------- str Path to the created IM file """ if self.verbose: logger.info("=" * 50) logger.info("Preprocessing image data contained in RAW frame") logger.info("=" * 50) logger.info(f"RAW file = {raw_filename}") # Determine output filename if im_filename is None: raw_path = Path(raw_filename) im_filename = str(raw_path.with_name(f"{raw_path.stem}_im.fits")) # Step 1: Create IM file from raw file with proper processing logger.info( "Creating IM file from raw data with TDFIO_CREATEBYCOPY functionality..." ) self._create_im_from_raw(raw_filename, im_filename) # Step 2: Process the IM file with ImageFile(im_filename, mode="UPDATE") as im_file: # Step 2: Mark bad pixels if self.verbose: logger.info("Marking bad pixels...") self._mark_bad_pixels( im_file, mark_saturated, bad_pixel_mask, bad_pixel_mask2 ) # Step 3: Process overscan and subtract bias if self.verbose: logger.info("Processing overscan and bias subtraction...") self._debias_image(im_file, use_bias, bias_filename, **kwargs) # Step 4: Subtract dark frame if use_dark and dark_filename: if self.verbose: logger.info("Subtracting dark frame...") self._subtract_dark(im_file, dark_filename) # Step 4b: Subtract glow using PCA templates if use_glow_pca and glow_pca_filename: if self.verbose: logger.info("Subtracting glow via PCA template fit...") self._subtract_glow_pca( im_file, glow_pca_filename=glow_pca_filename, dc_rate_filename=dc_rate_filename, n_components=glow_n_components, fit_rows=glow_fit_rows, ) # Step 5: Create and initialize variance HDU if self.verbose: logger.info("Creating variance HDU...") self._add_variance(im_file) # Step 6: Apply long-slit flat field if use_lflat and lflat_filename: if self.verbose: logger.info("Applying long-slit flat field...") self._divide_by_lflat(im_file, lflat_filename) # Step 7: Remove cosmic rays if cosmic_ray_method != "NONE": if self.verbose: logger.info(f"Removing cosmic rays using {cosmic_ray_method}...") self._remove_cosmic_rays(im_file, cosmic_ray_method, **kwargs) # Step 8: Save the updated IM file im_file.save_as(im_filename) if self.verbose: logger.info(f"Image data frame {im_filename} created.") return im_filename
def _mark_bad_pixels( self, im_file: ImageFile, mark_saturated: bool, bad_pixel_mask: Optional[str] = None, bad_pixel_mask2: Optional[str] = None, ) -> None: """ Mark bad pixels in the image. Parameters ---------- im_file : ImageFile The image file to process mark_saturated : bool Whether to mark saturated pixels as bad bad_pixel_mask : str, optional Path to bad pixel mask file bad_pixel_mask2 : str, optional Path to second bad pixel mask file """ # Read image data image_data = im_file.read_image_data() nx, ny = im_file.get_size() # Apply first bad pixel mask if provided if bad_pixel_mask and Path(bad_pixel_mask).exists(): logger.info(f"Using bad pixel mask: {bad_pixel_mask}") bad_mask = self._load_bad_pixel_mask(bad_pixel_mask, nx, ny) image_data[bad_mask > 0.99] = np.nan # Apply second bad pixel mask if provided if bad_pixel_mask2 and Path(bad_pixel_mask2).exists(): logger.info(f"Using second bad pixel mask: {bad_pixel_mask2}") bad_mask2 = self._load_bad_pixel_mask(bad_pixel_mask2, nx, ny) image_data[bad_mask2 > 0.99] = np.nan # Mark saturated pixels if requested if mark_saturated: saturated_pixels = image_data >= self.saturation_tolerance if np.any(saturated_pixels): percent_saturated = np.sum(saturated_pixels) * 100.0 / (nx * ny) logger.info(f"{percent_saturated:.2f}% of pixels were saturated") if percent_saturated > 1.0: logger.warning("Warning: Higher than expected levels of saturation") logger.warning("This may cause serious reduction problems!") # Mark saturated pixels and neighbors as bad image_data[saturated_pixels] = np.nan # Mark neighboring pixels for i, j in zip(*np.where(saturated_pixels)): if i > 0: image_data[i - 1, j] = np.nan if i < nx - 1: image_data[i + 1, j] = np.nan if j > 0: image_data[i, j - 1] = np.nan if j < ny - 1: image_data[i, j + 1] = np.nan # Write back the modified image data im_file.write_image_data(image_data) def _load_bad_pixel_mask(self, mask_filename: str, nx: int, ny: int) -> np.ndarray: """ Load bad pixel mask from file. Parameters ---------- mask_filename : str Path to the bad pixel mask file nx, ny : int Expected dimensions of the mask Returns ------- np.ndarray Bad pixel mask array """ with fits.open(mask_filename) as hdul: mask_data = hdul[0].data if mask_data.shape != (ny, nx): # FITS is (y, x) order raise ValueError( f"Bad pixel mask dimensions {mask_data.shape} " f"don't match image dimensions ({ny}, {nx})" ) return mask_data.T # Transpose to match our (x, y) convention def _debias_image( self, im_file: ImageFile, use_bias: bool, bias_filename: Optional[str] = None, **kwargs, ) -> None: """ Process overscan region and subtract bias. Parameters ---------- im_file : ImageFile The image file to process use_bias : bool Whether bias subtraction is being used bias_filename : str, optional Path to bias frame file **kwargs Additional keyword arguments for bias processing """ # Read image data image_data = im_file.read_image_data() nx, ny = im_file.get_size() # Optional overscan region (in image_data coordinates: (x1, x2, y1, y2)) overscan_region = kwargs.get("overscan_region", None) if use_bias: # Subtract bias frame if provided if bias_filename is not None: # Read bias frame with ImageFile(bias_filename, mode="READ") as bias_file: bias_data = bias_file.read_image_data() # Get bias subtraction method from header or use defaults bias_method = bias_file.get_header_value("BIASTYPE", "MEDIAN") if bias_method == "MEDIAN": bias_level = np.nanmedian(bias_data) elif bias_method == "MASTER": bias_level = bias_data else: raise ValueError( f"Unknown bias subtraction method: {bias_method}" ) logger.info(f"Bias subtraction method: {bias_method}") image_data -= bias_level bias_record = np.nanmedian(bias_level) logger.info(f"Subtracted bias level (from bias frame): {bias_record:.2f}") # If overscan information is available for the target image, # measure the residual level in the overscan region after # bias-frame subtraction and subtract that residual so that # the final overscan is close to zero. if overscan_region is not None: overscan_residual = self._calculate_bias_level( image_data, overscan_region ) image_data -= overscan_residual bias_record += overscan_residual logger.info("Refined bias subtraction using overscan region") logger.info( f"Additional overscan level subtracted: {overscan_residual:.2f}" ) else: logger.info( "No overscan region information provided; " "skipping overscan-based refinement of bias subtraction" ) else: # Get overscan region from header or use defaults if overscan_region is not None: # Process overscan region bias_level = self._calculate_bias_level(image_data, overscan_region) image_data -= bias_level bias_record = bias_level logger.info("Bias subtraction method: OVERSCAN") logger.info(f"Subtracted bias level: {bias_record:.2f}") else: logger.warning("No overscan region found in header") # Write back the processed image data im_file.write_image_data(image_data) im_file.add_history(f"Subtracted bias level: {bias_record:.2f}") else: logger.info("No bias subtraction performed") bias_record = 0.0 im_file.add_history("No bias subtraction performed") def _calculate_bias_level( self, image_data: np.ndarray, overscan_region: Tuple[int, int, int, int] ) -> float: """ Calculate bias level from overscan region. Parameters ---------- image_data : np.ndarray Image data array overscan_region : tuple Overscan region as (x1, x2, y1, y2) Returns ------- float Calculated bias level """ x1, x2, y1, y2 = overscan_region overscan_data = image_data[x1:x2, y1:y2] # TODO: add option for fitting a polynomial to the overscan region # Use median to avoid outliers bias_level = np.nanmedian(overscan_data) return bias_level def _subtract_dark(self, im_file: ImageFile, dark_filename: str) -> None: """ Subtract dark frame from image. Parameters ---------- im_file : ImageFile The image file to process dark_filename : str Path to dark frame file """ # Read image data image_data = im_file.read_image_data() nx, ny = im_file.get_size() # Read dark frame with ImageFile(dark_filename, mode="READ") as dark_file: dark_data = dark_file.read_image_data() # Get exposure times im_exp = im_file.get_header_value("EXPOSED", 1.0) dark_exp = dark_file.get_header_value("EXPOSED", 1.0) logger.info(f"Object exposure time: {im_exp}") logger.info(f"Dark exposure time: {dark_exp}") # Scale dark frame if dark_exp > 0: scale = im_exp / dark_exp logger.info(f"Dark scaled by: {scale}") dark_data *= scale else: logger.warning("ERROR: No scaling of dark") # Subtract dark image_data -= dark_data # Write back the processed image data im_file.write_image_data(image_data) # Add history record im_file.add_history(f"Subtracted dark frame {dark_filename}") def _subtract_glow_pca( self, im_file: ImageFile, glow_pca_filename: str, dc_rate_filename: Optional[str] = None, n_components: int = 5, fit_rows: Optional[list] = None, ) -> None: """ Subtract glow from an image using a PCA template fit. The glow model is: glow_model = alpha_0 * glow_mean + sum_i(alpha_i * PC_i) If ``dc_rate_filename`` is provided, a DC-current model ``dc_rate * exptime`` is subtracted first, and the PCA fit is applied to the residual. Without it, the PCA fit is applied to the raw (bias-subtracted) image directly. Parameters ---------- im_file : ImageFile The image file to process (opened in UPDATE mode). glow_pca_filename : str Path to glow_pca_cube.fits (PRIMARY = mean, PC01…PCxx extensions). dc_rate_filename : str, optional Path to a FITS file with the per-pixel DC-current rate [e-/s]. n_components : int Number of PCA components to include in the fit. fit_rows : list of (int, int), optional Row (axis=0) ranges used for fitting, e.g. ``[(0, 500), (800, 1300)]``. If None, all finite pixels are used. """ from astropy.stats import sigma_clip as astro_sigma_clip image_data = im_file.read_image_data().astype(np.float64) exptime = float(im_file.get_header_value("EXPOSED", 1.0)) # --- Load PCA templates --- with fits.open(glow_pca_filename) as cube: glow_mean = cube[0].data.astype(np.float64) pc_extnames = [h.name for h in cube if h.name.upper().startswith("PC")] pca_comps = [ cube[name].data.astype(np.float64) for name in pc_extnames[:n_components] ] if len(pca_comps) < n_components: logger.warning( "Requested %d PCA components but cube only has %d — using %d", n_components, len(pca_comps), len(pca_comps), ) # --- Handle overscan rows: template may be shorter than the image --- # If the image has extra rows at the top (overscan), trim them before # fitting and only apply the correction to the science region. tmpl_rows = glow_mean.shape[0] img_rows = image_data.shape[0] n_overscan = img_rows - tmpl_rows if n_overscan < 0: logger.warning( "Glow template (%d rows) is taller than image (%d rows) — skipping glow subtraction", tmpl_rows, img_rows, ) return if n_overscan > 0: logger.info( "Image has %d extra rows vs. template — treating first %d rows as overscan", n_overscan, n_overscan, ) science = image_data[n_overscan:, :] # view into the science region # --- DC-current model --- if dc_rate_filename: with fits.open(dc_rate_filename) as f: dc_rate = f[0].data.astype(np.float64) dc_model = dc_rate * exptime else: dc_model = np.zeros_like(science) residual = science - dc_model # --- Build fitting mask (coordinates relative to science region) --- fit_mask_2d = np.ones(science.shape, dtype=bool) if fit_rows is not None: fit_mask_2d[:] = False for r0, r1 in fit_rows: fit_mask_2d[r0:r1, :] = True # --- Design matrix: [glow_mean, PC1, PC2, …] --- templates = [glow_mean] + list(pca_comps) A = np.column_stack([t.ravel() for t in templates]) # (Npix, 1+n_comp) b = residual.ravel() pixel_ok = np.isfinite(b) & np.all(np.isfinite(A), axis=1) & fit_mask_2d.ravel() active = pixel_ok.copy() coeffs = None for _ in range(5): if active.sum() < A.shape[1]: logger.warning("Too few pixels for glow PCA fit — falling back to no glow subtraction") return coeffs, _, _, _ = np.linalg.lstsq(A[active], b[active], rcond=None) resid_fit = b[active] - A[active] @ coeffs clipped = astro_sigma_clip(resid_fit, sigma=3.0, masked=True) new_active = active.copy() new_active[active] = ~clipped.mask if np.array_equal(new_active, active): break active = new_active glow_model = (A @ coeffs).reshape(science.shape) corrected = image_data.copy() corrected[n_overscan:, :] = science - dc_model - glow_model im_file.write_image_data(corrected.astype(np.float32)) im_file.add_history( f"Subtracted PCA glow model ({len(pca_comps)} components, " f"exptime={exptime:.1f}s, dc_rate={'yes' if dc_rate_filename else 'no'})" ) logger.info( "Glow PCA subtraction done: exptime=%.1fs, n_comp=%d, n_overscan=%d, coeffs=%s", exptime, len(pca_comps), n_overscan, np.array2string(coeffs, precision=3), ) def _add_variance(self, im_file: ImageFile) -> None: """ Create and initialize variance HDU. Parameters ---------- im_file : ImageFile The image file to process """ # Read image data image_data = im_file.read_image_data() nx, ny = im_file.get_size() # Get noise and gain information from header noise, gain = self._get_noise_gain_info(im_file) # Calculate variance variance_data = self._calculate_variance(image_data, noise, gain) # Write variance data im_file.write_variance_data(variance_data) logger.info("Variance HDU created and initialized") def _get_noise_gain_info(self, im_file: ImageFile) -> Tuple[np.ndarray, np.ndarray]: """ Get noise and gain information from FITS header. Parameters ---------- im_file : ImageFile The image file Returns ------- tuple (noise, gain) """ # (noise_array, gain_array) for each amplifier # Try to get amplifier-specific noise and gain # noise = np.array([im_file.get_header_value('RDNOISE1', 5.0), # im_file.get_header_value('RDNOISE2', 5.0), # im_file.get_header_value('RDNOISE3', 5.0), # im_file.get_header_value('RDNOISE4', 5.0)]) # gain = np.array([im_file.get_header_value('GAIN1', 1.0), # im_file.get_header_value('GAIN2', 1.0), # im_file.get_header_value('GAIN3', 1.0), # im_file.get_header_value('GAIN4', 1.0)]) noise = im_file.get_header_value( "RO_NOISE", 7.0 ) # TODO: fix silent replacement gain = im_file.get_header_value("RO_GAIN", 2.0) # TODO: fix silent replacement return noise, gain def _calculate_variance( self, image_data: np.ndarray, noise: np.ndarray, gain: np.ndarray ) -> np.ndarray: """ Calculate variance for each pixel. Parameters ---------- image_data : np.ndarray Image data array noise : float Readout noise gain : float Gain Returns ------- np.ndarray Variance array """ nx, ny = image_data.shape # variance = np.zeros_like(image_data) variance = noise**2 + np.maximum(image_data, 0) / gain # logger.info(f"noise: {noise}, gain: {gain}, zero variance? {np.any(variance == 0)}") # # Determine amplifier layout # if len(noise) == 1 or np.all(noise == noise[0]): # # Single amplifier # variance = noise[0]**2 + np.maximum(image_data, 0) / gain[0] # elif len(noise) == 2: # # Two amplifiers (left/right) # mid_x = nx // 2 # # Left half # variance[:mid_x, :] = noise[0]**2 + np.maximum(image_data[:mid_x, :], 0) / gain[0] # # Right half # variance[mid_x:, :] = noise[1]**2 + np.maximum(image_data[mid_x:, :], 0) / gain[1] # elif len(noise) == 4: # # Four amplifiers (quadrants) # mid_x, mid_y = nx // 2, ny // 2 # # Bottom-left # variance[:mid_x, :mid_y] = noise[0]**2 + np.maximum(image_data[:mid_x, :mid_y], 0) / gain[0] # # Bottom-right # variance[mid_x:, :mid_y] = noise[1]**2 + np.maximum(image_data[mid_x:, :mid_y], 0) / gain[1] # # Top-right # variance[mid_x:, mid_y:] = noise[2]**2 + np.maximum(image_data[mid_x:, mid_y:], 0) / gain[2] # # Top-left # variance[:mid_x, mid_y:] = noise[3]**2 + np.maximum(image_data[:mid_x, mid_y:], 0) / gain[3] return variance def _divide_by_lflat(self, im_file: ImageFile, lflat_filename: str) -> None: """ Divide image by long-slit flat field. Parameters ---------- im_file : ImageFile The image file to process lflat_filename : str Path to long-slit flat field file """ # Read image data and variance nx, ny = im_file.get_size() image_data = im_file.read_image_data() variance_data = im_file.read_variance_data() # Read long-slit flat field with ImageFile(lflat_filename, mode="READ") as lflat_file: lflat_data = lflat_file.read_image_data() lflat_variance = lflat_file.read_variance_data() # Check flat field class lflat_class = lflat_file.get_header_value("CLASS", "") if not lflat_class.startswith("LFLATCAL"): logger.warning("Flat field file may not have correct class") # Perform division with proper error propagation with warnings.catch_warnings(): warnings.simplefilter("ignore") # Avoid division by zero lflat_data = np.where(lflat_data > 0, lflat_data, np.nan) # Divide image by flat image_data /= lflat_data # Propagate errors variance_data = variance_data / lflat_data**2 + ( image_data**2 * lflat_variance / lflat_data**2 ) # Write back the processed data im_file.write_image_data(image_data) im_file.write_variance_data(variance_data) im_file.add_history(f"Divided by long-slit flat field {lflat_filename}") def _remove_cosmic_rays(self, im_file: ImageFile, method: str, **kwargs) -> None: """ Remove cosmic rays from image. Parameters ---------- im_file : ImageFile The image file to process method : str Method for cosmic ray removal **kwargs Additional keyword arguments for cosmic ray removal """ # Read image data nx, ny = im_file.get_size() image_data = im_file.read_image_data() variance_data = im_file.read_variance_data() if method == "LACOSMIC": # Use LACosmic algorithm cleaned_data = self._lacosmic_clean( image_data, variance_data, im_file=im_file, **kwargs ) elif method == "BCLEAN": # Use BCLEAN algorithm cleaned_data = self._bclean_clean(image_data, **kwargs) elif method == "PYCOSMIC": # Use Python cosmic ray removal cleaned_data = self._pycosmic_clean(image_data, **kwargs) else: logger.warning(f"Unknown cosmic ray removal method: {method}") return # Write back the cleaned data im_file.write_image_data(cleaned_data) # Add history record im_file.add_history(f"Removed cosmic rays using {method}") def _lacosmic_clean( self, image_data: np.ndarray, variance_data: np.ndarray, im_file: Optional[ImageFile] = None, **kwargs, ) -> np.ndarray: """ Clean cosmic rays using LACosmic algorithm via astroscrappy. Parameters ---------- image_data : np.ndarray Image data to clean variance_data : np.ndarray Variance data to clean im_file : ImageFile, optional The image file object to read header keywords (for gain/readnoise) **kwargs Additional parameters for LACosmic: - sigclip: sigma clipping threshold (default: 4.5) - sigfrac: fraction of sigma clipping (default: 0.3) - objlim: object limit (default: 5.0) - gain: gain value (default: None, auto-detect from header) - readnoise: read noise (default: None, auto-detect from header) - satlevel: saturation level (default: None) - verbose: verbose output (default: False) Returns ------- np.ndarray Cleaned image data """ if not HAS_ASTROSCRAPPY: raise ImportError( "astroscrappy is required for LACosmic cosmic ray removal. " "Install it with: pip install astroscrappy" ) # Default parameters for astroscrappy # TODO: add gain, readnoise, satlevel default_params = { "sigclip": 4.5, "sigfrac": 0.3, "objlim": 5.0, "gain": None, "readnoise": None, "satlevel": self.saturation_value, # "satlevel": np.inf, "verbose": False, } # Update with any provided kwargs params = default_params.copy() params.update(kwargs) # If gain/readnoise not set, try to read from header (RO_GAIN, RO_NOISE) if im_file is not None: readnoise, gain = self._get_noise_gain_info(im_file) if params["gain"] is None: if gain is not None: params["gain"] = gain logger.info(f"Using gain from header (RO_GAIN): {gain}") if params["readnoise"] is None: if readnoise is not None: params["readnoise"] = readnoise logger.info(f"Using readnoise from header (RO_NOISE): {readnoise}") logger.info("Running LACosmic cosmic ray removal with astroscrappy") logger.info(f"Parameters: {params}") try: # Run astroscrappy.detect_cosmics # This returns (mask, cleaned_data) mask, cleaned_data = astroscrappy.detect_cosmics( image_data, invar=variance_data, sigclip=params["sigclip"], sigfrac=params["sigfrac"], objlim=params["objlim"], gain=params["gain"], readnoise=params["readnoise"], satlevel=params["satlevel"], verbose=params["verbose"], sepmed=True, ) # Count cosmic rays detected n_cosmic = np.sum(mask) logger.info(f"Detected and removed {n_cosmic} cosmic ray pixels") return cleaned_data except Exception as e: logger.error(f"Error in LACosmic cosmic ray removal: {e}") raise RuntimeError(f"LACosmic cosmic ray removal failed: {e}") def _bclean_clean(self, image_data: np.ndarray, **kwargs) -> np.ndarray: """ Clean cosmic rays using BCLEAN-like algorithm via astroscrappy. Parameters ---------- image_data : np.ndarray Image data to clean **kwargs Additional parameters for BCLEAN Returns ------- np.ndarray Cleaned image data """ raise NotImplementedError( "BCLEAN cosmic ray removal not yet implemented. " "This should implement the BCLEAN algorithm for cosmic ray detection and removal." ) def _pycosmic_clean(self, image_data: np.ndarray, **kwargs) -> np.ndarray: """ Clean cosmic rays using Python implementation. Parameters ---------- image_data : np.ndarray Image data to clean **kwargs Additional parameters for Python cosmic ray removal Returns ------- np.ndarray Cleaned image data """ raise NotImplementedError( "Python cosmic ray removal not yet implemented. " "This should implement a Python-based algorithm for cosmic ray detection and removal." ) def _create_im_from_raw(self, raw_filename: str, im_filename: str) -> None: """ Create IM file from raw file with proper processing. This method implements the functionality of TDFIO_CREATEBYCOPY, including: - Raw file to IM file conversion - BITPIX conversion (16-bit integer to 32-bit float) - Variance HDU creation - Instrument-specific processing (6DF, KOALA, TAIPAN) - TAIPAN pre-subtraction - Fiber table handling Parameters ---------- raw_filename : str Path to raw input file im_filename : str Path to output IM file """ logger.info(f"Creating IM file from raw file: {raw_filename} -> {im_filename}") # Open raw file and get basic information with ImageFile(raw_filename, mode="READ") as raw_file: # Get image dimensions nx, ny = raw_file.get_size() # Read image data (TDFIO_IMAGE_READ handles integer to float conversion) image_data = raw_file.read_image_data() # Get instrument information instrument = raw_file.get_header_value("INSTRUME", "UNKNOWN") spectid = raw_file.get_header_value("SPECTID", "") class_type = raw_file.get_header_value("CLASS", "") bitpix = raw_file.get_header_value("BITPIX", 16) logger.info( f"Instrument: {instrument}, SPECTID: {spectid}, CLASS: {class_type}, BITPIX: {bitpix}" ) # Check if this is raw data (16-bit integer) is_raw_16bit = bitpix == 16 # Instrument-specific processing is_6df = instrument.upper().startswith("6DF") and is_raw_16bit is_koala = instrument.upper().startswith("AAOMEGA-KOALA") and is_raw_16bit is_taipan = instrument.upper().startswith("TAIPAN") and is_raw_16bit # TAIPAN specific checks taipan_is_blue = spectid.upper().startswith("BL") taipan_is_red = spectid.upper().startswith("RD") is_taipan_science = is_taipan and class_type.startswith("MFOBJECT") is_taipan_flat = is_taipan and class_type.startswith("MFFFF") # Check if variance exists (definitive proof of raw file) has_variance = raw_file.has_variance() no_variance_in_target = not has_variance # TAIPAN pre-subtraction (scattered light removal) if is_taipan_science: logger.info( "Raw frame is Taipan Science. Looking for pre-subtract image frame." ) # Determine pre-subtract filename based on color if taipan_is_red: sub_filename = "RD_SCI_PRESUB.fits" elif taipan_is_blue: sub_filename = "BL_SCI_PRESUB.fits" else: sub_filename = None if sub_filename and Path(sub_filename).exists(): logger.info(f"Pre-subtracting data from {sub_filename}") # Read pre-subtract file with ImageFile(sub_filename, mode="READ") as sub_file: sub_data = sub_file.read_image_data() # Calculate scale from exposure time ratios src_time = raw_file.get_header_value("EXPOSED", 1.0) sub_time = sub_file.get_header_value("EXPOSED", 1.0) scale = src_time / sub_time # Subtract scaled pre-subtract data image_data = image_data - scale * sub_data logger.info(f"Pre-subtracted with scale factor: {scale:.3f}") else: logger.info( f"Cannot find file {sub_filename}. Not pre-subtracting." ) # Instrument-specific image transformations # TODO: fix these # if is_6df: # logger.info("Processing 6DF raw data: transposing and reversing axes") # # Transpose and reverse spectral axis # image_data = np.flipud(image_data.T) # nx, ny = ny, nx # Update dimensions # elif is_koala: # logger.info("Processing KOALA raw data: flipping spatial axis") # # Flip spatial axis (fiber number indexing from top to bottom) # image_data = np.fliplr(image_data) # elif is_taipan: # logger.info("Processing TAIPAN raw data: transposing axes") # # Transpose axes # if taipan_is_blue: # image_data = image_data.T # elif taipan_is_red: # image_data = np.flipud(image_data.T) # else: # image_data = image_data.T # nx, ny = ny, nx # Update dimensions # Create IM file with proper structure self._create_im_file_structure( im_filename, image_data, nx, ny, raw_filename, instrument, class_type, no_variance_in_target, ) logger.info(f"Successfully created IM file: {im_filename}") def _create_im_file_structure( self, im_filename: str, image_data: np.ndarray, nx: int, ny: int, raw_filename: str, instrument: str, class_type: str, no_variance_in_target: bool, ) -> None: """ Create IM file with proper FITS structure. Parameters ---------- im_filename : str Output IM filename image_data : np.ndarray Image data nx, ny : int Image dimensions raw_filename : str Original raw filename instrument : str Instrument name class_type : str Frame class no_variance_in_target : bool Whether to create variance HDU """ from astropy.io import fits # Open source file for header and fiber table source_file = ImageFile(raw_filename, mode="READ") source_file.open() header = source_file.hdul[source_file.hdul.index_of("PRIMARY")].header # Create primary HDU with image data primary_hdu = fits.PrimaryHDU(image_data, header=header) # Set BITPIX to -32 (32-bit float) for IM files primary_hdu.header["BITPIX"] = -32 # Remove BSCALE and BZERO keywords (not used with float data) if "BSCALE" in primary_hdu.header: del primary_hdu.header["BSCALE"] if "BZERO" in primary_hdu.header: del primary_hdu.header["BZERO"] # Remove AVGVALUE keyword (can cause problems) if "AVGVALUE" in primary_hdu.header: del primary_hdu.header["AVGVALUE"] # Add processing history primary_hdu.header["HISTORY"] = f"Created IM file from {raw_filename}" primary_hdu.header["HISTORY"] = f"Instrument: {instrument}" primary_hdu.header["HISTORY"] = f"Class: {class_type}" # Create HDU list hdul = fits.HDUList([primary_hdu]) # Add variance HDU if needed (raw files don't have variance) if no_variance_in_target: logger.info("Creating variance HDU for raw file") # Create variance HDU at correct extension index variance_hdu = fits.ImageHDU(name="VARIANCE") variance_hdu.header["EXTNAME"] = "VARIANCE" # Initialize variance with zeros (will be calculated later) variance_data = np.zeros((ny, nx), dtype=np.float32) variance_hdu.data = variance_data hdul.append(variance_hdu) # Set class in header primary_hdu.header["CLASS"] = class_type # Write the file hdul.writeto(im_filename, overwrite=True) hdul.close() # Copy fiber table if present and appropriate if class_type not in ["BIAS", "DARK"]: # Open source file to copy fiber table if source_file.has_fiber_table(): logger.info("Copying fiber table from source file") # Open destination file for writing with ImageFile(im_filename, mode="UPDATE") as dest_file: dest_file.copy_fiber_table_from(source_file) # Handle TAIPAN fiber table (remove fibers beyond 150) if instrument.upper().startswith("TAIPAN"): logger.info( "Processing TAIPAN fiber table (limiting to 150 fibers)" ) dest_file.remove_fibers_beyond(150) else: logger.info("No fiber table found in source file") # close source file source_file.close()
[docs] def make_im( raw_filename: str, im_filename: Optional[str] = None, use_bias: bool = False, use_dark: bool = False, dark_filename: Optional[str] = None, use_glow_pca: bool = False, glow_pca_filename: Optional[str] = None, dc_rate_filename: Optional[str] = None, glow_n_components: int = 5, glow_fit_rows: Optional[list] = None, use_lflat: bool = False, lflat_filename: Optional[str] = None, bad_pixel_mask: Optional[str] = None, bad_pixel_mask2: Optional[str] = None, mark_saturated: bool = True, cosmic_ray_method: str = "NONE", verbose: bool = True, **kwargs, ) -> str: """ Convenience function to process raw file to IM file. Parameters ---------- raw_filename : str Path to the raw input file im_filename : str, optional Path for the output IM file. If None, will be derived from raw_filename use_bias : bool, optional Whether to subtract bias frame use_dark : bool, optional Whether to subtract dark frame (simple exposure-time-scaled subtraction) dark_filename : str, optional Path to dark frame file use_glow_pca : bool, optional Whether to subtract glow using PCA templates. Can be used instead of or in addition to ``use_dark``. glow_pca_filename : str, optional Path to the PCA glow cube FITS file (PRIMARY = mean glow, extensions PC01…PCxx = PCA component images). dc_rate_filename : str, optional Path to a FITS file with the per-pixel dark-current rate [e-/s]. If None, no DC-current term is included in the glow model. glow_n_components : int, optional Number of PCA components to use in the glow fit (default: 5). glow_fit_rows : list of (int, int), optional Row (axis=0) ranges used for the glow fitting, e.g. ``[(0, 500), (800, 1300)]``. Pixels outside these ranges are excluded from the fit but the model is subtracted everywhere. None → all pixels. use_lflat : bool, optional Whether to divide by long-slit flat field lflat_filename : str, optional Path to long-slit flat field file bad_pixel_mask : str, optional Path to bad pixel mask file bad_pixel_mask2 : str, optional Path to second bad pixel mask file mark_saturated : bool, optional Whether to mark saturated pixels as bad cosmic_ray_method : str, optional Method for cosmic ray removal verbose : bool, optional Whether to print verbose output **kwargs Additional keyword arguments Returns ------- str Path to the created IM file """ processor = MakeIM(verbose=verbose) return processor.process_raw_to_im( raw_filename=raw_filename, im_filename=im_filename, use_bias=use_bias, use_dark=use_dark, dark_filename=dark_filename, use_glow_pca=use_glow_pca, glow_pca_filename=glow_pca_filename, dc_rate_filename=dc_rate_filename, glow_n_components=glow_n_components, glow_fit_rows=glow_fit_rows, use_lflat=use_lflat, lflat_filename=lflat_filename, bad_pixel_mask=bad_pixel_mask, bad_pixel_mask2=bad_pixel_mask2, mark_saturated=mark_saturated, cosmic_ray_method=cosmic_ray_method, **kwargs, )