Source code for kspecdr.inst.isoplane
"""
Header conversion utilities for KSPEC data.
This module provides functions to convert raw headers from various instruments
(e.g., PIXIS/Isoplane backup CCD) to the standard format required by kspecdr.
"""
import logging
from pathlib import Path
from typing import List, Optional
from astropy.io import fits
from astropy.io.fits.verify import VerifyError
from astropy.time import Time
from astropy.table import Table
import numpy as np
import re
logger = logging.getLogger(__name__)
[docs]
def get_isoplane_readout_settings(header: fits.Header) -> dict:
"""
Determine readout gain/noise from raw header.
Returns
-------
dict
Mapping with keys ``gain``, ``noise``, ``speed_mhz``,
``gain_mode``, and ``noise_mode``.
"""
readout_modes = {
"LOW": {
"2MHZ": {
"LOW": {"gain": 4.09, "noise": 16.03},
"MEDIUM": {"gain": 2.10, "noise": 15.88},
"HIGH": {"gain": 1.12, "noise": 15.42},
},
"100KHZ": {
"LOW": None,
"MEDIUM": None,
"HIGH": {"gain": 1.12, "noise": 4.48},
},
},
"HIGH": {
"2MHZ": {
"LOW": {"gain": 15.37, "noise": 41.34},
"MEDIUM": {"gain": 7.73, "noise": 37.57},
"HIGH": {"gain": 3.74, "noise": 35.08},
},
"100KHZ": {
"LOW": None,
"MEDIUM": None,
"HIGH": None,
},
},
}
default_key = ("LOW", "2MHZ", "LOW")
gain_mode = None
noise_mode = None
speed_mhz = float(header.get("PI CAMERA ADC SPEED", None))
gain_val = header.get("PI CAMERA ADC ANALOGGAIN", None)
if gain_val:
g = str(gain_val).strip().upper()
if "HIGH" in g:
gain_mode = "HIGH"
elif "MEDIUM" in g:
gain_mode = "MEDIUM"
elif "LOW" in g:
gain_mode = "LOW"
noise_val = header.get("PI CAMERA ADC QUALITY", None)
if noise_val:
n = str(noise_val).strip().upper()
if "LOWNOISE" in n:
noise_mode = "LOW"
elif "HIGHCAPACITY" in n:
noise_mode = "HIGH"
alien_speed = False
if speed_mhz is None:
speed_key = default_key[1]
elif speed_mhz == 0.1:
speed_key = "100KHZ"
elif speed_mhz == 2:
speed_key = "2MHZ"
else:
speed_key = default_key[1]
alien_speed = True
noise_key = noise_mode or default_key[0]
gain_key = gain_mode or default_key[2]
if speed_mhz is None or gain_mode is None or noise_mode is None:
logger.warning(
"Readout settings not fully detected; using default ADC_SPEED/RO_GAIN/RO_NOISE."
)
selected = readout_modes[default_key[0]][default_key[1]][default_key[2]]
else:
if alien_speed:
logger.warning(
f"Unsupported ADC speed in header: {speed_mhz} MHz;"
"using default speed 2MHz"
)
selected = readout_modes.get(noise_key, {}).get(speed_key, {}).get(gain_key)
if selected is None:
raise ValueError(
"Readout mode not yet calibrated: "
f"noise={noise_key}, speed={speed_key}, gain={gain_key}"
)
logger.info(
"Readout settings: speed=%.0f MHz, gain=%s, noise=%s",
speed_mhz,
gain_key,
noise_key,
)
return {
"gain": selected["gain"],
"noise": selected["noise"],
"speed_mhz": speed_mhz,
"gain_mode": gain_mode,
"noise_mode": noise_mode,
}
[docs]
def add_fiber_table(hdul: fits.HDUList, n_fibers: int, fiber_table: Table = None) -> None:
"""
Add a dummy fiber table to the HDUList.
Parameters
----------
hdul : fits.HDUList
The FITS HDUList to modify in place.
n_fibers : int
Number of fibers to include in the table.
fiber_table : Table, optional
Fiber table to add to the HDUList. If None, a dummy fiber table will be created.
"""
logger.info(f"Adding fiber table with {n_fibers} fibers")
if fiber_table is None:
# Create columns
# Standard 2dfdr fiber table columns:
# TYPE (1A), NAME (80A), MAGNITUDE (E), RA (D), DEC (D), X (E), Y (E), etc.
# For dummy/isoplane, we primarily need TYPE='P' (Program) or 'S' (Sky)
# to ensure extraction works.
# Create arrays
# All Program fibers for now
types = np.array(["P"] * n_fibers)
names = np.array([f"Fiber {i+1}" for i in range(n_fibers)])
# Create columns
c1 = fits.Column(name="TYPE", format="1A", array=types)
c2 = fits.Column(name="NAME", format="20A", array=names)
# Create Binary Table HDU
fib_hdu = fits.BinTableHDU.from_columns([c1, c2], name="FIBRES")
else:
if len(fiber_table) != n_fibers:
logger.warning(f"Fiber table has {len(fiber_table)} rows, but n_fibers={n_fibers} were given. Ignoring n_fibers argument.")
fib_hdu = fits.table_to_hdu(fiber_table, name="FIBRES")
# Append to HDUList
hdul.append(fib_hdu)
[docs]
def sanitize_header_drop_unparsable(
hdr: fits.Header, max_passes: int = 50, verbose: bool = True
) -> fits.Header:
"""
Return a cleaned copy of hdr by removing any cards that raise VerifyError (or ValueError)
when converted to a FITS card image (str(card)). Also removes associated CONTINUE chains.
This is meant for headers that contain broken OGIP long-string CONTINUE blocks.
Parameters
----------
hdr : fits.Header
max_passes : int
Safety limit to avoid infinite loops.
verbose : bool
Print which cards were dropped.
Returns
-------
fits.Header
Cleaned header.
"""
new = hdr.copy()
def safe_card_repr(card):
# triggers the same logic as printing a header, but per-card
return str(card) # may raise VerifyError
passes = 0
while passes < max_passes:
passes += 1
changed = False
i = 0
while i < len(new.cards):
card = new.cards[i]
try:
_ = safe_card_repr(card)
i += 1
continue
except (VerifyError, ValueError) as e:
# This card is problematic. Decide how much to delete.
kw = card.keyword
if verbose:
# card may not stringify, so print minimal info
print(
f"[sanitize] dropping card at idx={i} keyword={kw!r} due to {type(e).__name__}: {e}"
)
# If we're in a CONTINUE chain, we should remove the whole chain.
# Strategy:
# - find the start of the chain (go backwards while previous are CONTINUE)
# - if the immediate previous card is NOT CONTINUE, include that previous card too
# (because it's likely the long-string starter with '&')
start = i
while (
start > 0
and new.cards[start].keyword == "CONTINUE"
and new.cards[start - 1].keyword == "CONTINUE"
):
start -= 1
# If the problematic card is CONTINUE and there is a preceding non-CONTINUE card,
# that preceding card is very likely the parent long-string keyword.
if (
new.cards[i].keyword == "CONTINUE"
and i > 0
and new.cards[i - 1].keyword != "CONTINUE"
):
start = i - 1
# Now delete forward: start card + subsequent CONTINUEs
end = start + 1
while end < len(new.cards) and new.cards[end].keyword == "CONTINUE":
end += 1
if verbose:
print(
f"[sanitize] removing cards [{start}:{end}) ({end-start} cards)"
)
# Delete from end to start
for j in range(end - 1, start - 1, -1):
del new[j]
changed = True
# After deletion, continue at same index (start), since list shrank
i = start
except Exception as e:
raise e
if not changed:
break
if passes >= max_passes and verbose:
print("[sanitize] reached max_passes; header may still contain issues.")
return new
# TODO: review this - time stamping per frame, add gain/rdnoise settings for different setups
[docs]
def convert_isoplane_header(header: fits.Header, ndfclass: str) -> fits.Header:
"""
Convert a PIXIS/Isoplane raw header to the standard kspecdr format.
Parameters
----------
header : fits.Header
The original raw header.
ndfclass : str
The NDFCLASS of the data (e.g., "MFOBJECT", "MFARC", "MFFFF", "BIAS", "DARK").
Returns
-------
fits.Header
The converted header with standardized keywords.
"""
# Create a copy to avoid modifying the original
new_header = header.copy()
# Drop non-parsable cards
# TODO: add options for verbose and max_passes?
new_header = sanitize_header_drop_unparsable(new_header, verbose=False)
# 1. Instrument Name
new_header["INSTRUME"] = ("ISOPLANE", "KSPEC Backup Spectrograph")
# 2. Gain and Noise (Measured values per readout mode)
readout = get_isoplane_readout_settings(header)
new_header["RO_GAIN"] = (
readout["gain"],
"Readout Amplifer gain (e-/ADU)",
)
new_header["RO_NOISE"] = (readout["noise"], "Readout noise (electrons)")
# 3. Exposure Time
# Raw header seems to have EXPOSURETIME in milliseconds in HIERARCH keywords
# Try to find HIERARCH PI CAMERA SHUTTERTIMING EXPOSURETIME
# Note: astropy handles HIERARCH keywords, but looking up by full string is safer
exp_ms = None
for card in header.cards:
if "SHUTTERTIMING EXPOSURETIME" in card.keyword:
try:
exp_ms = float(card.value)
except (ValueError, TypeError):
pass
break
if exp_ms is not None:
exposed = exp_ms / 1000.0
else:
# Fallback to EXPTIME if available, assuming it might be seconds or needs checking
# The user provided example shows EXPTIME = '1000 ', which matched 1000ms
exposed = header.get("EXPTIME", 0.0)
# Heuristic: if > 100, assume it's ms? Or trust the HIERARCH one primarily.
try:
exposed = float(exposed)
# If it matches the HIERARCH one (which was ms), it's probably ms.
# But standard FITS is seconds.
# Given the sample: EXPTIME='1000 ', HIERARCH...='1000' /milliseconds
# It is safer to divide by 1000 if it's large and matches the ms value.
if exposed > 100:
exposed = exposed / 1000.0
except:
exposed = 0.0
new_header["EXPOSED"] = (exposed, "Exposure Time (seconds)")
new_header["TOTALEXP"] = (exposed, "Total Exposure (seconds)")
new_header["ELAPSED"] = (exposed, "Elapsed Time (seconds)")
# 4. Dates and Times
# Input: DATE-OBS= '2024-08-24T13:47:39'
date_obs = header.get("DATE-OBS", "")
if date_obs:
try:
t = Time(date_obs, format="isot", scale="utc")
new_header["UTDATE"] = (t.strftime("%Y:%m:%d"), "UT Date")
new_header["UTSTART"] = (t.strftime("%H:%M:%S"), "UT Start")
new_header["UTMJD"] = (t.mjd, "UT MJD at start of exposure")
# UTEND would be UTSTART + EXPOSED
t_end = t + (exposed / 86400.0) # exposed is seconds
new_header["UTEND"] = (t_end.strftime("%H:%M:%S"), "UT End")
new_header["EPOCH"] = (t.jyear, "Current Epoch, Years A.D.")
except Exception as e:
logger.warning(f"Could not parse DATE-OBS: {e}")
# 5. Detector Info
# HIERARCH PI CAMERA SENSOR INFORMATION SENSORNAME
sensor_name = "UNKNOWN"
for card in header.cards:
if "SENSOR INFORMATION SENSORNAME" in card.keyword:
sensor_name = card.value
break
new_header["DETECTOR"] = (sensor_name, "Detector name")
# Orientation of the detector readout
ori_uncorr = header["PI ACQUISITION ORIENTATION LIGHTPATHORIENTATIONUNCORRECTED"]
ori_result = header["PI CAMERA EXPERIMENT ACQUISITION ORIENTATION RESULT"]
logger.debug(f"ori_uncorr: {ori_uncorr}")
logger.debug(f"ori_result: {ori_result}")
# Horizontal orientation of the detector readout
i_ori_uncorr_horizontal = -1 if "FlippedHorizontally" in ori_uncorr else 1
i_ori_result_horizontal = -1 if "FlippedHorizontally" in ori_result else 1
flip_horizontal = i_ori_uncorr_horizontal * i_ori_result_horizontal
new_header["FLIPHORI"] = (flip_horizontal, "Flip horizontal orientation")
logger.debug(f"flip_horizontal: {flip_horizontal}")
logger.debug(f"FLIPHORI: {new_header['FLIPHORI']}")
# Vertical orientation of the detector readout
i_ori_uncorr_vertical = -1 if "FlippedVertically" in ori_uncorr else 1
i_ori_result_vertical = -1 if "FlippedVertically" in ori_result else 1
flip_vertical = i_ori_uncorr_vertical * i_ori_result_vertical
new_header["FLIPVERT"] = (flip_vertical, "Flip vertical orientation")
# 6. Grating Info
# HIERARCH PI SPECTROMETER GRATING SELECTED = '[500nm,150][2][0]'
# Need to parse: 150 lines/mm, 500nm center?
grating_str = ""
for card in header.cards:
if "SPECTROMETER GRATING SELECTED" in card.keyword:
grating_str = card.value
break
# Attempt to parse [Center, Lines]
# Example: [500nm,150]
match = re.search(r"\[(.*?),(\d+)\]", grating_str)
if match:
# center_val = match.group(1) # e.g. 500nm
lines_per_mm = int(match.group(2))
new_header["GRATLPMM"] = (lines_per_mm, "Grating Lines per mm")
# Maybe use lines/mm as ID if no other ID?
new_header["GRATID"] = (new_header.get("GRATLPMM"), "Grating ID")
# 7. Central Wavelength
# HIERARCH PI SPECTROMETER GRATING CENTERWAVELENGTH = '600' / nanometers
center_wl_nm = None
for card in header.cards:
if "SPECTROMETER GRATING CENTERWAVELENGTH" in card.keyword:
try:
center_wl_nm = float(card.value)
except:
pass
break
if center_wl_nm is not None:
lambdac_ang = center_wl_nm * 10.0
new_header["LAMBDAC"] = (lambdac_ang, "Central wavelength in Angstrom")
new_header["LAMBDAB"] = (lambdac_ang, "Compatibility keyword")
# 8. Dispersion
if match:
reciprocal_linear_dispersion = 23.0 * (1200 / lines_per_mm) # Angstrom / mm, from the isoplane datasheet
pixel_width = float(new_header["PI CAMERA SENSOR INFORMATION PIXEL WIDTH"]) * 1e-3 # micron to mm # TODO: check - height or width? they are the same for PIXIS1300BX
dispersion = reciprocal_linear_dispersion * pixel_width # Angstrom / pixel
new_header["DISPERS"] = (dispersion, "Central dispersion (Angstrom/pixel)") # negative because the dispersion is in the opposite direction of the wavelength
# new_header["DISPERS"] = (-3.95, "Central dispersion (Angstrom/pixel)") # negative because the dispersion is in the opposite direction of the wavelength
# 9. Spectrograph ID
# Not strictly needed but good for completeness
if "SPECTID" not in new_header:
new_header["SPECTID"] = ("UNKNOWN", "Spectrograph ID")
# 10. Observation Type
# Try to derive from object name or other fields if possible
# For now, default to OBJECT if unknown
if "OBSTYPE" not in new_header:
new_header["OBSTYPE"] = ("OBJECT", "Observation type")
new_header["NDFCLASS"] = (ndfclass, "Data Reduction class name (NDFCLASS)")
# 11. Dispersion Axis
# 1 = Horizontal (L-R), 2 = Vertical (T-B)
new_header["DISPAXIS"] = (1, "Dispersion axis (1=Horizontal, 2=Vertical)")
return new_header
[docs]
def write_isoplane_converted_image(
fpath: str,
output_fpath: str,
ndfclass: str,
n_fibers: int = None,
fiber_table: Table = None,
split_frames: bool = False,
) -> Optional[List[str]]:
"""
Convert a PIXIS/Isoplane raw FITS file into a kspecdr-compatible file.
This reads the input header, applies standard keyword conversions, attaches
a fiber table, normalizes the data to a single 2D frame, applies requested
orientation flips, and writes the result to a new FITS file.
Parameters
----------
fpath : str
Input raw FITS filename.
output_fpath : str
Output converted FITS filename.
ndfclass : str
NDFCLASS to stamp into the converted header.
n_fibers : int, optional
Number of fibers to create when using a dummy fiber table.
fiber_table : astropy.table.Table, optional
Fiber table to attach; if provided, this is used instead of a dummy table.
split_frames : bool, optional
If True and the input is a 3D cube with multiple frames, write each frame
to a separate file by appending a 4-digit index to output_fpath.
Returns
-------
list of str or None
List of output paths when split_frames is True and multiple frames exist;
otherwise None.
"""
if n_fibers is None and fiber_table is None:
raise ValueError("Either n_fibers or fiber_table must be provided")
with fits.open(fpath) as hdul:
hdr = hdul[0].header
new_hdr = convert_isoplane_header(hdr, ndfclass=ndfclass)
# add fiber table
add_fiber_table(hdul, n_fibers=n_fibers, fiber_table=fiber_table)
# just use the first frame for now
if hdul[0].data.ndim == 3:
if hdul[0].data.shape[0] > 1:
if split_frames:
output_paths: List[str] = []
output_path = Path(output_fpath)
base = output_path.with_suffix("")
ext = output_path.suffix
for idx, frame in enumerate(hdul[0].data, start=1):
frame_hdr = new_hdr.copy()
frame_hdr["NAXIS"] = 2
if "NAXIS3" in frame_hdr:
frame_hdr.remove("NAXIS3")
frame_data = frame
if frame_hdr["FLIPHORI"] > 0:
frame_data = np.flip(frame_data, axis=1)
frame_hdr["FLIPHORI"] = -1
logger.info("Flipped horizontal orientation")
if frame_hdr["FLIPVERT"] > 0:
frame_data = np.flip(frame_data, axis=0)
frame_hdr["FLIPVERT"] = -1
logger.info("Flipped vertical orientation")
frame_hdul = fits.HDUList([hdu.copy() for hdu in hdul])
frame_hdul[0].data = frame_data
frame_hdul[0].header = frame_hdr
output_path = f"{base}{idx:04d}{ext}"
frame_hdul.writeto(str(output_path), overwrite=True)
frame_hdul.close()
output_paths.append(str(output_path))
return output_paths
raise ValueError(
"More than one frame in the input file. Use combine_image to combine frames."
)
else:
hdul[0].data = hdul[0].data[0]
# make new fits file with new header and fiber table
new_hdr["NAXIS"] = 2
if "NAXIS3" in new_hdr:
new_hdr.remove("NAXIS3")
elif hdul[0].data.ndim == 2:
pass
else:
raise ValueError(f"Input data has {hdul[0].data.ndim} dimensions. Expected 2 or 3.")
if new_hdr["FLIPHORI"] > 0:
hdul[0].data = np.flip(hdul[0].data, axis=1)
new_hdr["FLIPHORI"] = -1
logger.info("Flipped horizontal orientation")
if new_hdr["FLIPVERT"] > 0:
hdul[0].data = np.flip(hdul[0].data, axis=0)
new_hdr["FLIPVERT"] = -1
logger.info("Flipped vertical orientation")
hdul[0].header = new_hdr
hdul.writeto(output_fpath, overwrite=True)
hdul.close()
return None