Source code for yieldplotlib.load.ayo.ayo_input

"""Node for handling input .ayo files."""

import json
from pathlib import Path

import astropy.units as u
import numpy as np
import pyparsing as pp
from lod_unit import lod
from yippy.coronagraph import Coronagraph

from yieldplotlib.core.file_nodes import FileNode
from yieldplotlib.logger import logger

# Define 'read' as a dimensionless unit
read = u.def_unit("read")
u.add_enabled_units([read])

# Define zodis as a dimensionless unit
zodis = u.def_unit("zodis")
u.add_enabled_units([zodis])


[docs] class AYOInputFile(FileNode): """Node for handling AYO input files using PyParsing.""" def __init__(self, file_path: Path): """Initialize the AYOInputFile node with the file path.""" super().__init__(file_path) self.data = {} self.parse() self.is_input = True
[docs] def load(self): """Load the text file into memory.""" with open(self.file_path, encoding="utf-8") as f: self.raw_data = f.read() logger.info(f"Loaded AYO input file: {self.file_path}")
[docs] def _get(self, key: str, **kwargs): """Return the data associated with the key.""" return self.data.get(key, None)
[docs] def parse(self): """Parse the AYO input file and populate the self.data dictionary.""" # Define a unit as a word composed of alphabetic characters and underscores, # possibly combined with slashes for compound units simple_unit = pp.Word(pp.alphas, pp.alphanums + "_") compound_unit = pp.Combine(simple_unit + pp.ZeroOrMore("/" + simple_unit)) # Unit can be either a compound unit or a simple unit unit = compound_unit("unit") # Define an optional exponent, e.g., ^-1, ^2 exponent = pp.Suppress("^") + pp.Regex(r"[+-]?\d+").setParseAction( lambda t: int(t[0]) ) # Define a unit with an optional exponent unit_with_exp = pp.Group(unit + pp.Optional(exponent, default=1)("degree")) # Define grammar for a single key-value pair with type identifier identifier = pp.Word(pp.alphas + "_", pp.alphanums + "_").setName("identifier") # Define number (integer or float) number = pp.common.number().setName("number") # Define string (single or double quoted) string = pp.QuotedString("'", escChar="\\") | pp.QuotedString('"', escChar="\\") # Define array array = ( pp.Group( pp.Suppress("[") + pp.Optional(pp.delimitedList(number | string, delim=",")) + pp.Suppress("]") ) .setName("array") .setParseAction(self._convert_array) ) # Define parameter reference (used in expressions) param_ref = identifier.copy().setName("param_ref") # Define expression parts - handle addition, subtraction, # multiplication, division expr_term = number | param_ref # Simple expressions (value op value) simple_expr = (expr_term + pp.oneOf("+ - * /") + expr_term).setName( "simple_expr" ) # Define a post-processing function for mathematical expressions def process_expression(tokens): """Process a mathematical expression with parameter references.""" if len(tokens) == 3: # It's an expression like a + b or a - b left, op, right = tokens # If the right side is a parameter reference, look it up if isinstance(right, str) and right in self.data: right_val = self.data[right] else: right_val = right # If the left side is a parameter reference, look it up if isinstance(left, str) and left in self.data: left_val = self.data[left] else: left_val = left # Perform the operation if op == "+": return left_val + right_val elif op == "-": return left_val - right_val elif op == "*": return left_val * right_val elif op == "/": return left_val / right_val return tokens[0] # Not an expression we recognize # Define the expression and set its parse action expression = simple_expr.setParseAction(process_expression) # Define type identifier in curly braces type_literal = ( pp.Suppress("{") + pp.Word(pp.alphas).setResultsName("type") + pp.SkipTo(pp.Suppress("}")) ) # Unit identifier in parentheses unit_literal = ( pp.Suppress("(") + pp.OneOrMore(unit_with_exp) .setResultsName("units") .setParseAction(self._convert_unit) + pp.Suppress(") ") ) # Define all possible value types with priorities (try more specific # patterns first) value_types = expression | array | string | number | param_ref # Define key-value pair with type, optional unit, and optional comments key_value = ( identifier.setResultsName("key") + pp.Suppress("=") + value_types.setResultsName("value") + pp.Optional(pp.Suppress(";")) # Semicolon is optional + pp.Optional(unit_literal) + pp.Optional(pp.SkipTo("{").setResultsName("comment_before_type")) + pp.Optional(type_literal) + pp.Optional(pp.SkipTo(pp.lineEnd()).setResultsName("comment_after_type")) ) # Split the raw data into individual lines input_file_lines = self.raw_data.split("\n") for line_number, line in enumerate(input_file_lines, start=1): line = line.strip() # Skip empty lines and lines starting with ';' or '#' if not line or line.startswith(";") or line.startswith("#"): continue try: # Parse the line using the defined parser parsed = key_value.parseString(line, parseAll=True) # Extract key, value, and type from the parsed result key = parsed["key"] type_ = parsed.get("type", "string").lower() value = parsed["value"][0] if "units" in parsed: unit = parsed["units"] value *= unit # Store the value in self.data self.data[key] = value logger.debug(f"Parsed {type_}: {key} = {value}") except pp.ParseException as e: logger.warning(f"Failed to parse line {line_number}: {line}") logger.warning(f"Error: {e}") continue # After parsing, set expected_keys based on the input keys self.expected_keys = list(self.data.keys()) logger.info(f"Successfully parsed {len(self.data)} input parameters.")
[docs] def _convert_array(self, tokens): """Convert parsed array tokens to a Python list.""" array = tokens[0].asList() return np.array(array)
[docs] def _convert_unit(self, parsed_unit): """Convert parsed unit string to the appropriate astropy unit.""" # get the unit from the ParseResult object _map = { "years": u.yr, "l/D": lod, "lambda/D": lod, "mags": u.mag, "microns": u.um, "counts": u.count, "photon_count": u.photon, "read": read, "degrees": u.deg, } _whitelist = [] final_unit = None n_units = 0 for current_unit in parsed_unit: current_unit_str = current_unit["unit"] if current_unit_str in _whitelist: continue if current_unit_str in _map.keys(): # Get the unit from the predefined map _u = _map[current_unit_str] else: # Parse as an astropy unit _u = u.Unit(current_unit_str) # Apply the exponent current_unit_degree = current_unit["degree"] # Apply exponent and include the unit in the final unit if n_units == 0: final_unit = _u**current_unit_degree else: final_unit *= _u**current_unit_degree n_units += 1 return final_unit
[docs] def export_exosims( self, output_path: str, base_file: Path | str | None = None, detection_wavelength_nm: float | None = None, characterization_wavelength_nm: float | None = None, **kwargs, ): """Export the AYO input to an EXOSIMS JSON file. Args: output_path: Path to write the output JSON file. base_file: Optional path to a base EXOSIMS JSON file. If provided, AYO parameters will overwrite equivalent parameters in the base file, and all other parameters will be preserved. detection_wavelength_nm: Wavelength in nanometers to use for detection mode. The closest wavelength in the lambda array will be selected. If None, uses the middle wavelength (default). characterization_wavelength_nm: Wavelength in nanometers to use for characterization mode. The closest wavelength in the sc_lambda array will be selected. If None, uses the middle wavelength (default). **kwargs: Additional keyword arguments to include in the output JSON. These will overwrite any existing values. Useful for setting paths like cachedir, e.g.: cachedir="$HOME/.EXOSIMS/2025/Natasha_JATIS" """ # Helper to round floats to avoid repeating decimals def round_float(val, decimals=6): """Round float values to specified decimal places.""" if val is None: return None if isinstance(val, int | float): return round(float(val), decimals) if isinstance(val, list | np.ndarray): return [round_float(v, decimals) for v in val] return val # Helper to safely get values with units or defaults def get_val(key, unit=None, default=None): val = self.data.get(key) if val is None: return default # Check if val is a Quantity (has .unit or .value) if hasattr(val, "value"): if unit: try: return val.to(unit).value except u.UnitConversionError: return val.value return val.value return val # Helper for arrays (return value if scalar, or array item) def get_array_val(key, idx, default=None, unit=None): val = self.data.get(key) if val is None: return default # If Quantity, convert to target unit if specified if hasattr(val, "value"): if unit is not None: try: val = val.to(unit) except u.UnitConversionError: pass # Keep original if conversion fails v = val.value else: v = val if isinstance(v, list | np.ndarray): if idx < len(v): return v[idx] # Warn when index is out of bounds - likely array length mismatch raise ValueError( f"Index {idx} out of bounds for '{key}' (length {len(v)}). " f"Check that wavelength arrays have matching lengths." ) return v # Scalar # Load base file if provided if base_file is not None: base_path = Path(base_file) if not base_path.exists(): raise FileNotFoundError(f"Base EXOSIMS file not found: {base_path}") with open(base_path) as f: out = json.load(f) logger.info(f"Loaded base EXOSIMS file: {base_path}") else: # Initialize with defaults if no base file out = { "missionLife": 5.0, "missionStart": 60634, "pupilDiam": 4.0, "koAngles_Sun": [45, 135], "modules": { "PlanetPopulation": "AlbedoByRadiusDulzPlavchan", "StarCatalog": "HPIC", "OpticalSystem": "Nemati", "ZodiacalLight": "Mennesson", "BackgroundSources": "GalaxiesFaintStars", "PlanetPhysicalModel": "Forecaster", "Observatory": "WFIRSTObservatoryL2", "TimeKeeping": " ", "PostProcessing": " ", "Completeness": "BrownCompleteness", "TargetList": " ", "SimulatedUniverse": "DulzPlavchanUniverseEarthsOnly", "SurveySimulation": "coroOnlyScheduler", "SurveyEnsemble": "EXOSIMS_local.IPClusterEnsembleJPL2", }, "scienceInstruments": [], "starlightSuppressionSystems": [], "observingModes": [], } # Set settlingTime to 0 since all overhead is handled by ohTime in the system # AYO's toverhead_fixed includes all overhead (slew + settle + initial # dark hole digging) out["settlingTime"] = 0.0 # Extract AYO parameters and overwrite base values D_m = get_val("D", u.m, None) if D_m is not None: out["pupilDiam"] = round_float(float(D_m)) # Get nchannels for multi-channel coronagraph simulation. # nchannels splits light into identical channels - EXOSIMS doesn't have # a direct analog, so we approximate by: # 1. Multiplying BW by nchannels (increased signal from combined channels) # 2. Multiplying lenslSamp by sqrt(nchannels) (since lenslSamp is squared # in EXOSIMS to get Npix, this gives Npix proportional to nchannels) nchannels = get_val("nchannels", None, 1) if nchannels is None: nchannels = 1 else: nchannels = int(nchannels) if nchannels > 1: logger.info( f"Applying nchannels={nchannels} to detection only: " f"optics *= {nchannels}, " f"lenslSamp *= sqrt({nchannels}) = {np.sqrt(nchannels):.4f}" ) mission_life_yr = get_val("mission_lifetime", u.yr, None) if mission_life_yr is not None: out["missionLife"] = round_float(float(mission_life_yr)) # AYO pitch is [min, max] from Sun # Will be set in starlightSuppressionSystems later ko_sun = self.data.get("pitch") if ko_sun is not None: if hasattr(ko_sun, "unit"): ko_sun = ko_sun.to(u.deg).value.tolist() elif isinstance(ko_sun, np.ndarray): ko_sun = ko_sun.tolist() else: ko_sun = None # Handle missionPortion total_time = get_val("total_survey_time", u.yr, None) mission_life_yr = out.get("missionLife", 5.0) if total_time is not None and mission_life_yr > 0: out["missionPortion"] = round_float(float(total_time / mission_life_yr)) # Map nexozodis to fixed_nEZ_val nexozodis = get_val("nexozodis", zodis, None) if nexozodis is not None: # nexozodis is in zodis (dimensionless unit), convert to float if hasattr(nexozodis, "value"): out["fixed_nEZ_val"] = round_float(float(nexozodis.value)) else: out["fixed_nEZ_val"] = round_float(float(nexozodis)) # Map noisefloor_PPF to ppFact and ppFact_char (inverse relationship) noisefloor_PPF = get_val("noisefloor_PPF", None, None) if noisefloor_PPF is not None: pp_fact_val = round_float(1.0 / float(noisefloor_PPF)) out["ppFact"] = pp_fact_val out["ppFact_char"] = pp_fact_val logger.info( f"Set ppFact and ppFact_char to {pp_fact_val} " f"(from noisefloor_PPF={noisefloor_PPF})" ) # Update optional_filters based on AYO target list cuts # Initialize optional_filters if not present if "optional_filters" not in out: out["optional_filters"] = {} # Update vmag_filter with target_vmag_cut target_vmag_cut = get_val("target_vmag_cut", None, None) if target_vmag_cut is not None: # Get existing vmag_range from base file if present, # otherwise use default min existing_vmag_filter = out["optional_filters"].get("vmag_filter", {}) existing_params = existing_vmag_filter.get("params", {}) existing_range = existing_params.get("vmag_range", [0, 15]) vmag_min = existing_range[0] if isinstance(existing_range, list) else 2 out["optional_filters"]["vmag_filter"] = { "enabled": True, "params": { "vmag_range": [ round_float(vmag_min), round_float(float(target_vmag_cut)), ] }, } logger.info( f"Updated vmag_filter with range [{vmag_min:.1f}, " f"{target_vmag_cut:.1f}]" ) # Update distance_filter with target_distance_cut target_distance_cut = get_val("target_distance_cut", u.pc, None) if target_distance_cut is not None: out["optional_filters"]["distance_filter"] = { "enabled": True, "params": {"max_distance": round_float(float(target_distance_cut))}, } logger.info( f"Updated distance_filter with max_distance = " f"{target_distance_cut:.1f} pc" ) # Load coronagraph using yippy and generate starlightSuppressionSystem coronagraph_path = self.data.get("coronagraph1") # Initialize coronagraph design bandwidth (will be set if coronagraph loaded) coro_design_bw = None if coronagraph_path: # Remove quotes if present (from string parsing) if isinstance(coronagraph_path, str): coronagraph_path = coronagraph_path.strip("'\"") # Try to find the coronagraph directory # First try as absolute path, then relative to AYO file directory coro_path = Path(coronagraph_path) if not coro_path.is_absolute(): # Try relative to AYO file's directory ayo_dir = self.file_path.parent coro_path = ayo_dir / coronagraph_path # If still not found, try as-is (might be in a standard location) if not coro_path.exists(): coro_path = Path(coronagraph_path) if coro_path.exists(): logger.info(f"Loading coronagraph from: {coro_path}") # Load coronagraph with yippy coro = Coronagraph(coro_path) # Extract coronagraph's design fractional bandwidth from header # AYO uses min(1/SR, coro_design_bw) as the effective bandwidth if ( coro.header.minlam is not None and coro.header.maxlam is not None and coro.header.lambda0 is not None ): coro_design_bw = ( ( (coro.header.maxlam - coro.header.minlam) / coro.header.lambda0 ) .decompose() .value ) logger.info( f"Coronagraph design bandwidth: {coro_design_bw:.4f} " f"(from MINLAM={coro.header.minlam}, " f"MAXLAM={coro.header.maxlam}, LAMBDA={coro.header.lambda0})" ) # Generate EXOSIMS files and get system info # Use AYO parameters if available, otherwise defaults aperture_radius = get_val("photap_rad", lod, 0.7) if aperture_radius is None: aperture_radius = 0.7 else: aperture_radius = aperture_radius # Get IWA/OWA from AYO if available, otherwise use defaults iwa_lod = get_val("IWA", lod, None) owa_lod = get_val("OWA", lod, None) # Generate EXOSIMS format files exosims_specs = coro.to_exosims( aperture_radius_lod=aperture_radius, fit_gaussian_for_core_area=False, use_phot_aperture_as_min=False, units="LAMBDA/D", ) # Get the system from the generated specs if exosims_specs.get("starlightSuppressionSystems"): syst = exosims_specs["starlightSuppressionSystems"][0].copy() # Convert relative FITS file paths to absolute paths exosims_dir = Path(coro_path, "exosims") fits_files = ["occ_trans", "core_thruput", "core_mean_intensity"] for fits_key in fits_files: if syst.get(fits_key): # If it's a relative path, make it absolute fits_path = Path(syst[fits_key]) if not fits_path.is_absolute(): fits_path = exosims_dir / fits_path syst[fits_key] = str(fits_path.resolve()) # Override with AYO parameters if specified contrast = get_val("raw_contrast_floor", None, None) if contrast is not None: syst["core_contrast"] = float(contrast) overhead = get_val("toverhead_fixed", u.d, None) if overhead is not None: syst["ohTime"] = round_float(float(overhead)) # Override IWA/OWA from AYO if specified # AYO IWA/OWA are in LAMBDA/D, same as yippy output if iwa_lod is not None: if hasattr(iwa_lod, "value"): syst["IWA"] = round_float(float(iwa_lod.to(lod).value)) else: syst["IWA"] = round_float(float(iwa_lod)) if owa_lod is not None: if hasattr(owa_lod, "value"): syst["OWA"] = round_float(float(owa_lod.to(lod).value)) else: syst["OWA"] = round_float(float(owa_lod)) # Round other float values in the system for key in ["lam", "deltaLam", "BW", "core_area"]: if key in syst and syst[key] is not None: syst[key] = round_float(syst[key]) # Update pupilDiam from coronagraph if not set by AYO if D_m is None and "pupilDiam" in exosims_specs: out["pupilDiam"] = round_float(exosims_specs["pupilDiam"]) # Extract obscurFac and shapeFac from exosims_specs if "obscurFac" in exosims_specs: out["obscurFac"] = round_float(exosims_specs["obscurFac"]) if "shapeFac" in exosims_specs: out["shapeFac"] = round_float(exosims_specs["shapeFac"]) # Set koAngles_Sun from AYO pitch, preserve other koAngles if ko_sun is not None: syst["koAngles_Sun"] = [round_float(x) for x in ko_sun] # Preserve other koAngles if they exist in base file if ( "starlightSuppressionSystems" in out and len(out["starlightSuppressionSystems"]) > 0 ): base_syst = out["starlightSuppressionSystems"][0] ko_keys = [ "koAngles_Small", "koAngles_Moon", "koAngles_Earth", ] for ko_key in ko_keys: if ko_key in base_syst: syst[ko_key] = base_syst[ko_key] # Set BW in starlightSuppressionSystem from detection wavelength SR # Calculate BW from detection wavelength's spectral resolution lams = self.data.get("lambda") if lams is not None: if hasattr(lams, "unit"): lams_nm = lams.to(u.nm).value else: lams_nm = np.array(lams) * 1000 if not isinstance(lams_nm, list | np.ndarray): lams_nm = [lams_nm] lams_nm = np.array(lams_nm) # Use same wavelength selection logic as for detection mode if detection_wavelength_nm is None: det_idx = len(lams_nm) // 2 else: det_idx = int( np.argmin(np.abs(lams_nm - detection_wavelength_nm)) ) det_sr = get_array_val("SR", det_idx, 5.0) sr_bw = 1.0 / float(det_sr) if det_sr > 0 else 0.2 # Take minimum of 1/SR and coronagraph design bandwidth if coro_design_bw is not None: base_bw = min(sr_bw, coro_design_bw) if base_bw < sr_bw: logger.info( f"BW limited by coronagraph design: " f"{base_bw:.4f} < 1/SR={sr_bw:.4f}" ) else: base_bw = sr_bw # DO NOT multiply BW by nchannels. Keep it physical. syst["BW"] = round_float(base_bw) # Remove deltaLam so EXOSIMS calculates it from our BW # (EXOSIMS recalculates BW = deltaLam/lam, so if deltaLam # is present, our BW would be ignored) syst.pop("deltaLam", None) elif "BW" not in syst: # Default if no detection wavelengths default_bw = 0.2 if coro_design_bw is not None: default_bw = min(default_bw, coro_design_bw) # DO NOT multiply BW by nchannels. Keep it physical. syst["BW"] = round_float(default_bw) syst.pop("deltaLam", None) # Remove core_contrast if core_mean_intensity is set # (EXOSIMS uses one or the other, not both) if syst.get("core_mean_intensity"): syst.pop("core_contrast", None) # Replace or append the system if "starlightSuppressionSystems" not in out: out["starlightSuppressionSystems"] = [] if len(out["starlightSuppressionSystems"]) == 0: out["starlightSuppressionSystems"].append(syst) else: out["starlightSuppressionSystems"][0] = syst else: logger.warning( "No starlightSuppressionSystems found in coronagraph specs" ) # except Exception as e: # logger.warning( # f"Failed to load coronagraph from {coro_path}: {e}. " # "Falling back to manual construction." # ) # # Fall through to manual construction # coronagraph_path = None # Fallback: Manual construction if coronagraph not found or failed to load if ( not coronagraph_path or "starlightSuppressionSystems" not in out or len(out["starlightSuppressionSystems"]) == 0 ): logger.info("Using manual starlightSuppressionSystem construction") # Get or create starlightSuppressionSystems if "starlightSuppressionSystems" not in out: out["starlightSuppressionSystems"] = [] syst = {} elif len(out["starlightSuppressionSystems"]) == 0: # Create new system if list is empty syst = {} else: # Update first system with AYO parameters syst = out["starlightSuppressionSystems"][0].copy() # Convert AYO IWA (L/D) to EXOSIMS # (assume arcsec for safety or standard input) # Using a reference lambda of 500nm ref_lam_m = 500e-9 D_m = out.get("pupilDiam", 4.0) iwa_lod = get_val("IWA", lod, None) owa_lod = get_val("OWA", lod, None) if iwa_lod is not None or owa_lod is not None: # Convert to arcsec: IWA_as = IWA_lod * (lam/D)_as # (lam/D)_rad = lam/D. (lam/D)_as = 206265 * lam/D lod_to_as = 206265.0 * (ref_lam_m / D_m) if iwa_lod is not None: syst["IWA"] = round_float(float(iwa_lod * lod_to_as)) if owa_lod is not None: syst["OWA"] = round_float(float(owa_lod * lod_to_as)) # Update system parameters from AYO contrast = get_val("raw_contrast_floor", None, None) if contrast is not None: syst["core_contrast"] = round_float(float(contrast)) overhead = get_val("toverhead_fixed", u.d, None) if overhead is not None: syst["ohTime"] = round_float(float(overhead)) # Set default system name if not present if "name" not in syst: syst["name"] = "AYO_Coronagraph" if "lam" not in syst: syst["lam"] = round_float(500) # nm # Set BW from detection wavelength SR if available if "BW" not in syst: lams = self.data.get("lambda") if lams is not None: if hasattr(lams, "unit"): lams_nm = lams.to(u.nm).value else: lams_nm = np.array(lams) * 1000 if not isinstance(lams_nm, list | np.ndarray): lams_nm = [lams_nm] lams_nm = np.array(lams_nm) # Use same wavelength selection logic as for detection mode if detection_wavelength_nm is None: det_idx = len(lams_nm) // 2 else: det_idx = int( np.argmin(np.abs(lams_nm - detection_wavelength_nm)) ) det_sr = get_array_val("SR", det_idx, 5.0) sr_bw = 1.0 / float(det_sr) if det_sr > 0 else 0.2 # Take minimum of 1/SR and coronagraph design bandwidth if coro_design_bw is not None: base_bw = min(sr_bw, coro_design_bw) if base_bw < sr_bw: logger.info( f"BW limited by coronagraph design: " f"{base_bw:.4f} < 1/SR={sr_bw:.4f}" ) else: base_bw = sr_bw # DO NOT multiply BW by nchannels. Keep it physical. syst["BW"] = round_float(base_bw) # Remove deltaLam so EXOSIMS calculates it from our BW syst.pop("deltaLam", None) else: default_bw = 0.2 if coro_design_bw is not None: default_bw = min(default_bw, coro_design_bw) # DO NOT multiply BW by nchannels. Keep it physical. syst["BW"] = round_float(default_bw) syst.pop("deltaLam", None) # Set koAngles_Sun from AYO pitch, preserve other koAngles from base if ko_sun is not None: syst["koAngles_Sun"] = [round_float(x) for x in ko_sun] # Preserve other koAngles if they exist in base file if ( "starlightSuppressionSystems" in out and len(out["starlightSuppressionSystems"]) > 0 ): base_syst = out["starlightSuppressionSystems"][0] for ko_key in ["koAngles_Small", "koAngles_Moon", "koAngles_Earth"]: if ko_key in base_syst: syst[ko_key] = base_syst[ko_key] # Remove core_contrast if core_mean_intensity is set # (EXOSIMS uses one or the other, not both) if syst.get("core_mean_intensity"): syst.pop("core_contrast", None) # Replace or append the system if len(out["starlightSuppressionSystems"]) == 0: out["starlightSuppressionSystems"].append(syst) else: out["starlightSuppressionSystems"][0] = syst # Get system name for use in modes syst_name = "AYO_Coronagraph" if ( out.get("starlightSuppressionSystems") and len(out["starlightSuppressionSystems"]) > 0 ): syst_name = out["starlightSuppressionSystems"][0].get("name", syst_name) # Get or create scienceInstruments and observingModes if "scienceInstruments" not in out: out["scienceInstruments"] = [] if "observingModes" not in out: out["observingModes"] = [] # Clear existing instruments and modes if AYO defines them instruments = [] modes = [] # Get Tcontam (contamination throughput factor) - applies to both # detection and characterization Tcontam = get_val("Tcontam", None, 1.0) if Tcontam is None: Tcontam = 1.0 else: Tcontam = float(Tcontam) logger.info(f"Applying Tcontam={Tcontam:.6f} to optics throughput") # SAFEGUARD: Check for potential double-counting of throughput factors # If the base file has syst['optics'] set AND AYO has Tcontam, this would # cause the contamination throughput to be applied twice: # - Once via Tcontam being multiplied into inst['optics'] # - Once via syst['optics'] in the base file # EXOSIMS computes: attenuation = inst['optics'] * syst['optics'] if Tcontam != 1.0: for syst in out.get("starlightSuppressionSystems", []): if "optics" in syst: syst_optics = syst["optics"] syst_name = syst.get("name", "unnamed") logger.warning( f"CONFLICT DETECTED: Base file has " f"syst['{syst_name}']['optics']={syst_optics} AND AYO has " f"Tcontam={Tcontam}. Would cause double-counting. " f"Removing syst['optics']." ) del syst["optics"] # Process Detection lams = self.data.get("lambda") if lams is not None: # Get values in nm if hasattr(lams, "unit"): lams_nm = lams.to(u.nm).value else: lams_nm = ( np.array(lams) * 1000 ) # Assume microns if no unit, convert to nm if not isinstance(lams_nm, list | np.ndarray): lams_nm = [lams_nm] lams_nm = np.array(lams_nm) # Select which wavelength to use if detection_wavelength_nm is None: # Default: use middle wavelength i = len(lams_nm) // 2 else: # Find closest wavelength to the requested value i = int(np.argmin(np.abs(lams_nm - detection_wavelength_nm))) logger.info( f"Selected detection wavelength {lams_nm[i]:.1f} nm " f"(closest to requested {detection_wavelength_nm:.1f} nm)" ) # Create Mode for selected wavelength only lam = lams_nm[i] sr = get_array_val("SR", i, 5.0) snr = get_array_val("SNR", i, 5.0) # Create Detection Instrument using values at selected wavelength index # Format: imaging_{wavelength_nm}_ayo lam_int = round(float(lam)) inst_det_name = f"imaging_{lam_int}_ayo" # --- PIXEL MATH CORRECTION --- # EXOSIMS squares lenslSamp to get Npix. AYO defines Npix directly. # We want lenslSamp^2 = base * nchannels base_npix_det = float(get_array_val("det_npix_multiplier", i, 1.0)) corrected_lenslSamp_det = np.sqrt(base_npix_det * nchannels) # --- THROUGHPUT & CHANNELS CORRECTION --- # Combine Toptical, dQE, nchannels, and Tcontam into one # "Effective Optics" term. # - dQE scales Signal & Noise Floor (matching AYO behavior) # - nchannels sums the signal from multiple detectors # - Tcontam accounts for contamination throughput losses dQE_det = float(get_array_val("det_dQE", i, 0.75)) Topt_det = float(get_array_val("Toptical", i, 0.5)) effective_optics_det = Topt_det * dQE_det * nchannels * Tcontam inst_det = { "name": inst_det_name, "pixelScale": round_float( float(get_val("det_pixscale_mas", None, 10.0) / 1000.0) ), # as "idark": round_float(float(get_array_val("det_DC", i, 0))), "CIC": round_float(float(get_array_val("det_CIC", i, 1e-3))), "sread": round_float(float(get_array_val("det_RN", i, 0.0))), "texp": round_float(float(get_array_val("det_tread", i, 100.0, u.s))), "texp_flag": False, "QE": round_float(float(get_array_val("det_QE", i, 0.9))), "optics": round_float(effective_optics_det), # Handle dQE and nchannels "PCeff": 1.0, # Set to 1.0 to avoid double counting "lenslSamp": round_float(corrected_lenslSamp_det), } instruments.append(inst_det) mode = { "instName": inst_det_name, "systName": syst_name, "detectionMode": True, "lam": round_float(float(lam)), "SNR": round_float(float(snr)), } modes.append(mode) # Process Characterization sc_lams = self.data.get("sc_lambda") if sc_lams is not None: if hasattr(sc_lams, "unit"): sc_lams_nm = sc_lams.to(u.nm).value else: sc_lams_nm = np.array(sc_lams) * 1000 if not isinstance(sc_lams_nm, list | np.ndarray): sc_lams_nm = [sc_lams_nm] sc_lams_nm = np.array(sc_lams_nm) # Select which wavelength to use if characterization_wavelength_nm is None: # Default: use middle wavelength i = len(sc_lams_nm) // 2 else: # Find closest wavelength to the requested value i = int(np.argmin(np.abs(sc_lams_nm - characterization_wavelength_nm))) logger.info( f"Selected characterization wavelength {sc_lams_nm[i]:.1f} nm " f"(closest to requested {characterization_wavelength_nm:.1f} nm)" ) # Get spectral resolution for characterization instrument lam = sc_lams_nm[i] sr = get_array_val("sc_SR", i, 5.0) snr = get_array_val("sc_SNR", i, 5.0) # Create Characterization Instrument using values at selected wavelength # Format: spectro_{wavelength_nm}_ayo lam_int = round(float(lam)) inst_char_name = f"spectro_{lam_int}_ayo" # --- PIXEL MATH CORRECTION --- # EXOSIMS squares lenslSamp to get Npix. AYO defines Npix directly. # We want lenslSamp^2 = base (nchannels only affects detection) base_npix_char = float(get_array_val("sc_det_npix_multiplier", i, 1.0)) corrected_lenslSamp_char = np.sqrt(base_npix_char) # --- THROUGHPUT & CHANNELS CORRECTION --- # Combine Toptical, dQE, and Tcontam into one "Effective Optics" term. # - dQE scales Signal & Noise Floor (matching AYO behavior) # - Tcontam accounts for contamination throughput losses # - nchannels only affects detection, not characterization dQE_char = float(get_array_val("sc_det_dQE", i, 0.75)) Topt_char = float(get_array_val("sc_Toptical", i, 0.5)) effective_optics_char = Topt_char * dQE_char * Tcontam inst_char = { "name": inst_char_name, "pixelScale": round_float( float(get_val("sc_det_pixscale_mas", None, 10.0) / 1000.0) ), "idark": round_float(float(get_array_val("sc_det_DC", i, 0))), "CIC": round_float(float(get_array_val("sc_det_CIC", i, 1e-3))), "sread": round_float(float(get_array_val("sc_det_RN", i, 0.0))), "texp": round_float( float(get_array_val("sc_det_tread", i, 100.0, u.s)) ), "texp_flag": False, "QE": round_float(float(get_array_val("sc_det_QE", i, 0.9))), "optics": round_float(effective_optics_char), # dQE and nchannels "PCeff": 1.0, # Set to 1.0 to avoid double counting "Rs": round_float(float(sr)), "lenslSamp": round_float(corrected_lenslSamp_char), } instruments.append(inst_char) # Create Mode for selected wavelength only mode = { "instName": inst_char_name, "systName": syst_name, "detectionMode": False, "lam": round_float(float(lam)), "SNR": round_float(float(snr)), } modes.append(mode) # Only replace instruments/modes if AYO defines them if instruments: out["scienceInstruments"] = instruments if modes: out["observingModes"] = modes # Remove koAngles_Sun from top level if it exists # (it's now in starlightSuppressionSystems) if "koAngles_Sun" in out: del out["koAngles_Sun"] # Apply any additional kwargs as top-level overrides if kwargs: for key, value in kwargs.items(): out[key] = value logger.info(f"Applied override: {key} = {value}") # Reorder output to match typical EXOSIMS structure: # 1. Top-level parameters (non-array, non-object, excluding special arrays) # 2. erange, arange, Rprange, optional_filters # 3. Other objects (anything not in structured_keys or special) # 4. scienceInstruments # 5. starlightSuppressionSystems # 6. observingModes # 7. modules # 8. completeness_specs (special, goes after modules) # Separate parameters into categories top_level = {} special_arrays = {} structured = {} other_objects = {} completeness_specs = None # Special arrays that go before scienceInstruments special_array_keys = [ "err_progression", "erange", "arange", "Rprange", "optional_filters", ] structured_keys = [ "scienceInstruments", "starlightSuppressionSystems", "observingModes", "modules", ] for key, value in out.items(): if key in structured_keys: structured[key] = value elif key in special_array_keys: special_arrays[key] = value elif key == "completeness_specs": completeness_specs = value elif isinstance(value, dict | list) and key != "modules": other_objects[key] = value else: top_level[key] = value # Reconstruct in desired order ordered_out = {} ordered_out.update(top_level) # Add special arrays before scienceInstruments for key in special_array_keys: if key in special_arrays: ordered_out[key] = special_arrays[key] # Add other objects before structured keys ordered_out.update(other_objects) if "scienceInstruments" in structured: ordered_out["scienceInstruments"] = structured["scienceInstruments"] if "starlightSuppressionSystems" in structured: ordered_out["starlightSuppressionSystems"] = structured[ "starlightSuppressionSystems" ] if "observingModes" in structured: ordered_out["observingModes"] = structured["observingModes"] if "modules" in structured: ordered_out["modules"] = structured["modules"] # Add completeness_specs after modules if completeness_specs is not None: ordered_out["completeness_specs"] = completeness_specs with open(output_path, "w") as f: json.dump(ordered_out, f, indent=4) logger.info(f"Exported AYO parameters to EXOSIMS file: {output_path}")