Source code for seda.utils

import numpy as np
import time
import os
import fnmatch
import xarray
import pickle
import astropy
from prettytable import PrettyTable
from spectres import spectres
from astropy import units as u
from astropy.io import ascii
from astropy.table import Column, MaskedColumn, Table
from astropy.convolution import Gaussian1DKernel, convolve
from scipy.interpolate import RegularGridInterpolator
from scipy.interpolate import interp1d
from tqdm.auto import tqdm
from sys import exit

from . import models
from .synthetic_photometry.synthetic_photometry import synthetic_photometry
from specutils import Spectrum1D

from numpy.typing import ArrayLike

##########################
[docs]def convolve_spectrum(wl, flux, res, eflux=None, lam_res=None, disp_wl_range=None, convolve_wl_range=None, out_file=None): ''' Description: ------------ Convolve a spectrum to a desired resolution at a given wavelength. Parameters: ----------- - wl : float array Wavelength (any length units) for the spectrum. - flux : float array Fluxes (any flux units) for the spectrum. - res : float Spectral resolution (at ``lam_res``) desired to smooth the input spectrum. - eflux : float array, optional Flux uncertainties (any flux units) for the spectrum. - lam_res : float, optional Wavelength of reference at which ``res`` is given. Default is the integer closest to the median wavelength of the spectrum. - disp_wl_range : float array, optional Minimum and maximum wavelengths (same units as ``wl``) to compute the median wavelength dispersion of the input spectrum. Default values are the minimum and maximum wavelengths in ``wl``. - convolve_wl_range : float array, optional Minimum and maximum wavelengths (same units as ``wl``) to convolve the input spectrum. Default values are the minimum and maximum wavelengths in ``wl``. - out_file : str, optional File name to save the convolved spectrum as netCDF with xarray (it produces lighter files compared to normal ASCII files). The file name can include a path e.g. my_path/convolved_spectrum.nc If not provided, the convolved spectrum will not be saved. Returns: -------- - Dictionary Dictionary with the convolved spectrum: - ``'wl_conv'`` : wavelengths for the convolved spectrum (equal to input ``wl`` within ``convolve_wl_range``). - ``'flux_conv'`` : convolved fluxes. - ``'eflux_conv'`` : convolved flux errors (if ``eflux`` is provided). - ``out_file`` file netCDF file with the convolved spectrum, if ``out_file`` is provided. Wavelengths and fluxes for the convolved spectrum have the same units as the input spectrum. Note: the wavelength data points are the same as in the input spectrum, so wavelength steps do not reflect the resolution of the convolved spectrum. Example: -------- >>> import seda >>> from astropy.io import ascii >>> >>> # read sample spectrum >>> file = '../docs/notebooks/data/IRTF_SpeX_0415-0935.dat' >>> spectrum = ascii.read(file) >>> wl = spectrum['wl(um)'] # um >>> flux = spectrum['flux(erg/s/cm2/A)'] # erg/s/cm2/A >>> eflux = spectrum['eflux(erg/s/cm2/A)'] # erg/s/cm2/A >>> # desired resolution >>> res = 50 # resolution of 50 >>> >>> out_convolve_spectrum = seda.utils.convolve_spectrum(wl=wl, flux=flux, >>> eflux=eflux, res=res) Author: Genaro Suárez Date: 2020-03 ''' # convert input spectrum into numpy arrays if astropy wl = astropy_to_numpy(wl) flux = astropy_to_numpy(flux) eflux = astropy_to_numpy(eflux) # remove NaN values if eflux is None: mask_nonan = (~np.isnan(wl)) & (~np.isnan(flux)) else: mask_nonan = (~np.isnan(wl)) & (~np.isnan(flux)) & (~np.isnan(eflux)) wl = wl[mask_nonan] flux = flux[mask_nonan] if eflux is not None: eflux = eflux[mask_nonan] # stop if res=None (it can occur when other functions use this function) if res is None: raise Exception(f'"res=None" is not allowed. It must take a value.') # set lam_res if not provided if lam_res is None: lam_res = set_lam_res(wl) # set disp_wl_range and convolve_wl_range if not provided if (disp_wl_range is None): disp_wl_range = np.array((wl.min(), wl.max())) # define disp_wl_range if not provided if (convolve_wl_range is None): convolve_wl_range = np.array((wl.min(), wl.max())) # define convolve_wl_range if not provided wl_bin = abs(wl[1:] - wl[:-1]) # wavelength dispersion of the spectrum wl_bin = np.append(wl_bin, wl_bin[-1]) # add an element equal to the last row to have same data points # define a Gaussian for convolution mask_fit = (wl>=disp_wl_range[0]) & (wl<=disp_wl_range[1]) # mask to obtain the median wavelength dispersion stddev = (lam_res/res)*(1./np.median(wl_bin[mask_fit])) / (2.*np.sqrt(2*np.log(2))) # stddev is given in pixels #if stddev<1: if (lam_res/res)*(1./np.median(wl_bin[mask_fit]))<1: print(' Warning: the input spectrum may have a resolution smaller than the desired one.') print(' the spectrum will be convolved but will be essentially the same.') gauss = Gaussian1DKernel(stddev=stddev) mask_conv = (wl>=convolve_wl_range[0]) & (wl<=convolve_wl_range[1]) # range to convolve the spectrum flux_conv = convolve(flux[mask_conv], gauss) # convolve only the selected wavelength range wl_conv = wl[mask_conv] # corresponding wavelength data points for convolved fluxes if eflux is not None: eflux_conv = convolve(eflux[mask_conv], gauss) out = {'wl_conv': wl_conv, 'flux_conv': flux_conv} if (eflux is not None): out['eflux_conv'] = eflux_conv # save convolved spectrum as netCDF if out_file is not None: # make xarray if eflux is None: ds = xarray.Dataset({'wl': wl_conv}, coords={'flux': flux_conv}) else: ds = xarray.Dataset({'wl': wl_conv}, coords={'flux': flux_conv, 'eflux': eflux_conv}) # store xarray as netCDF ds.to_netcdf(out_file) return out
##########################
[docs]def scale_synthetic_spectrum(wl, flux, distance, radius): ''' Description: ------------ Scale model spectrum when distance and radius are known. Parameters: ----------- - wl : float array Wavelength (any length units) for the spectrum. - flux : float array Fluxes (any flux units) for the spectrum. - radius: float Radius in Rjup. - distance: float Distance in pc. Returns: -------- Scaled fluxes. ''' distance_km = distance*u.parsec.to(u.km) # distance in km radius_km = radius*u.R_jup.to(u.km) # radius in km scaling = (radius_km/distance_km)**2 # scaling = (R/d)^2 flux_scaled = scaling*flux return flux_scaled
########################## ##########################
[docs]def best_chi2_fits(output_chi2, N_best_fits=1, model_dir_ori=None, ori_res=False): ''' Description: ------------ Read best-fitting model spectra from the chi-square minimization. Parameters: ----------- - output_chi2 : dictionary or str Output dictionary with the results from the chi-square minimization by ``chi2``. It can be either the name of the pickle file or simply the output dictionary. - N_best_fits : int, optional (default 1) Number of best model fits to be read. - ori_res : {``True``, ``False``}, optional (default ``False``) Read (``True``) or do not read (``False``) model spectra with the original resolution. - model_dir_ori : str, list, or array Path to the directory (str, list, or array) or directories (as a list or array) containing the model spectra with the original resolution. This parameter is needed if ``ori_res=True`` and `seda.chi2_fit.chi2` was run skipping the model spectra convolution (if `skip_convolution=True``). Returns: -------- Dictionary with best model fits: - ``'spectra_name_best'`` : model spectra names. - ``'fit_spectra'``: label indicating whether spectra were fitted. - ``'fit_photometry'``: label indicating whether photometry was fitted. - ``'chi2_red_fit_best'`` : reduced chi-square. - ``'chi2_red_wl_fit_best'`` : reduced chi-square as a function of wavelength. - ``'wl_model_best'`` : (if ``ori_res``) wavelength (in um) of original model spectra. - ``'flux_model_best'`` : (if ``ori_res``) fluxes (in erg/s/cm2/A) of original model spectra. - ``'wl_array_model_conv_resam_best'`` : (if ``fit_spectra``) wavelength (in um) of resampled, convolved model spectra in the input spectra ranges. - ``'flux_array_model_conv_resam_best'`` : (if ``fit_spectra``) fluxes (in erg/s/cm2/A) of resampled, convolved model spectra in the input spectra ranges. - ``'wl_array_model_conv_resam_fit_best'`` : (if ``fit_spectra``) wavelength (in um) of resampled, convolved model spectra within the fit range. - ``'flux_array_model_conv_resam_fit_best'`` : (if ``fit_spectra``) fluxes (in erg/s/cm2/A) of resampled, convolved model spectra within the fit range. - ``'flux_residuals_spec_best'`` : (if ``fit_spectra``) flux residuals in linear scale between model and input spectra. - ``'logflux_residuals_spec_best'`` : (if ``fit_spectra``) flux residuals in log scale between model and input spectra. - ``'flux_syn_array_model_fit_best'`` : (if ``fit_photometry``) synthetic photometry (in erg/s/cm2/A) for the filters within the fit range. - ``'lambda_eff_array_model_fit_best'`` : (if ``fit_photometry``) effective wavelength (in um) from model spectra for filters within the fit range. - ``'width_eff_array_model_fit_best'`` : (if ``fit_photometry``) effective width (in um) from model spectra for filters within the fit range. - ``'flux_residuals_phot_best'`` : (if ``fit_photometry``) flux residuals in linear scale between model and input spectra. - ``'logflux_residuals_phot_best'`` : (if ``fit_photometry``) flux residuals in log scale between model and input spectra. - ``'params'`` : model free parameters Author: Genaro Suárez Date: 2024-10, 2025-09-07 ''' # open dictionary if need it output_chi2 = load_output_fit(output_chi2) fit_spectra = output_chi2['my_chi2'].fit_spectra fit_photometry = output_chi2['my_chi2'].fit_photometry model = output_chi2['my_chi2'].model res = output_chi2['my_chi2'].res#[0] # resolution for the first input spectrum lam_res = output_chi2['my_chi2'].lam_res#[0] # lam_resolution for the first input spectrum N_model_spectra = output_chi2['N_model_spectra'] spectra_name_full = output_chi2['spectra_name_full'] spectra_name = output_chi2['spectra_name'] chi2_red_fit = output_chi2['chi2_red_fit'] chi2_red_wl_fit = output_chi2['chi2_red_wl_fit'] # list of reduced chi-square for each model compared scaling_fit = output_chi2['scaling_fit'] skip_convolution = output_chi2['my_chi2'].skip_convolution if fit_spectra: wl_array_model_conv_resam = output_chi2['my_chi2'].wl_array_model_conv_resam # wavelengths of resampled, convolved model spectra in the input spectra wavelength ranges #flux_array_model_conv_resam = output_chi2['my_chi2'].flux_array_model_conv_resam # fluxes of resampled, convolved model spectra in the input spectra wavelength ranges flux_array_model_conv_resam_scaled = output_chi2['flux_array_model_conv_resam_scaled'] # fluxes of resampled, convolved, scaled model spectra in the input spectra wavelength ranges wl_array_model_conv_resam_fit = output_chi2['my_chi2'].wl_array_model_conv_resam_fit # wavelengths of resampled, convolved model spectra within the fit ranges #flux_array_model_conv_resam_fit = output_chi2['my_chi2'].flux_array_model_conv_resam_fit # fluxes of resampled, convolved model spectra within the fit ranges flux_array_model_conv_resam_scaled_fit = output_chi2['flux_array_model_conv_resam_scaled_fit'] # fluxes of resampled, convolved, scaled model spectra within the fit ranges flux_residuals_spec = output_chi2['flux_residuals_spec'] logflux_residuals_spec = output_chi2['logflux_residuals_spec'] if fit_photometry: flux_syn_array_model_scaled_fit = output_chi2['flux_syn_array_model_scaled_fit'] #flux_syn_array_model_fit = output_chi2['my_chi2'].flux_syn_array_model_fit lambda_eff_array_model_fit = output_chi2['my_chi2'].lambda_eff_array_model_fit width_eff_array_model_fit = output_chi2['my_chi2'].width_eff_array_model_fit flux_residuals_phot = output_chi2['flux_residuals_phot'] logflux_residuals_phot = output_chi2['logflux_residuals_phot'] # select the best fits given by N_best_fits if (N_best_fits > N_model_spectra): raise Exception(f'not enough model spectra (only {N_model_spectra}) in ' f'the fit to select "N_best_fits={N_best_fits}" best fits') sort_ind = np.argsort(chi2_red_fit) chi2_red_fit_best = chi2_red_fit[sort_ind][:N_best_fits] chi2_red_wl_fit_best = [chi2_red_wl_fit[i] for i in sort_ind][:N_best_fits] scaling_fit_best = scaling_fit[sort_ind][:N_best_fits] spectra_name_full_best = spectra_name_full[sort_ind][:N_best_fits] spectra_name_best = spectra_name[sort_ind][:N_best_fits] if fit_spectra: wl_array_model_conv_resam_best = [wl_array_model_conv_resam[i] for i in sort_ind][:N_best_fits] #flux_array_model_conv_resam_best = [flux_array_model_conv_resam[i] for i in sort_ind][:N_best_fits] flux_array_model_conv_resam_scaled_best = [flux_array_model_conv_resam_scaled[i] for i in sort_ind][:N_best_fits] wl_array_model_conv_resam_fit_best = [wl_array_model_conv_resam_fit[i] for i in sort_ind][:N_best_fits] #flux_array_model_conv_resam_fit_best = [flux_array_model_conv_resam_fit[i] for i in sort_ind][:N_best_fits] flux_array_model_conv_resam_scaled_fit_best = [flux_array_model_conv_resam_scaled_fit[i] for i in sort_ind][:N_best_fits] flux_residuals_spec_best = [flux_residuals_spec[i] for i in sort_ind][:N_best_fits] logflux_residuals_spec_best = [logflux_residuals_spec[i] for i in sort_ind][:N_best_fits] if fit_photometry: #flux_syn_array_model_fit_best = [flux_syn_array_model_fit[i] for i in sort_ind][:N_best_fits] flux_syn_array_model_scaled_fit_best = [flux_syn_array_model_scaled_fit[i] for i in sort_ind][:N_best_fits] lambda_eff_array_model_fit_best = [lambda_eff_array_model_fit[i] for i in sort_ind][:N_best_fits] width_eff_array_model_fit_best = [width_eff_array_model_fit[i] for i in sort_ind][:N_best_fits] flux_residuals_phot_best = [flux_residuals_phot[i] for i in sort_ind][:N_best_fits] logflux_residuals_phot_best = [logflux_residuals_phot[i] for i in sort_ind][:N_best_fits] # read parameters from file name for the best fits out_separate_params = models.separate_params(model=model, spectra_name=spectra_name_best) # read best fits with the original resolution if ori_res: wl_model_best_lst = [] flux_model_best_lst = [] if not skip_convolution: # when the convolution was not skipped for i in range(N_best_fits): out_read_model_spectrum = models.read_model_spectrum(spectrum_name_full=spectra_name_full_best[i], model=model) wl_model_best_lst.append(out_read_model_spectrum['wl_model']) flux_model_best_lst.append(scaling_fit_best[i] * out_read_model_spectrum['flux_model']) # scaled fluxes else: # when the convolution was skipped if model_dir_ori is None: raise Exception(f'parameter "model_dir_ori" is needed to read model spectra with the original resolution') else: out_select_model_spectra = select_model_spectra(model_dir=model_dir_ori, model=model) # all spectra in model_dir_ori for i in range(N_best_fits): spectrum_name = spectra_name_best[i].split('_R')[0] # convolved spectrum name without additions to match original resolution name if spectrum_name in out_select_model_spectra['spectra_name']: # if the i best fit is in the model_dir_ori folder spectrum_name_full = out_select_model_spectra['spectra_name_full'][out_select_model_spectra['spectra_name']==spectrum_name][0] out_read_model_spectrum = models.read_model_spectrum(spectrum_name_full=spectrum_name_full, model=model) wl_model_best_lst.append(out_read_model_spectrum['wl_model']) flux_model_best_lst.append(scaling_fit_best[i] * out_read_model_spectrum['flux_model']) # scaled fluxes else: raise Exception(f'{spectrum_name} is not in {model_dir_ori}') # convert list with models to a numpy array # maximum number of data points max_tmp1 = 0 for i in range(N_best_fits): # for each model spectrum max_tmp2 = len(wl_model_best_lst[i]) if max_tmp2>max_tmp1: N_modelpoints_max = max_tmp2 max_tmp1 = max_tmp2 else: N_modelpoints_max = max_tmp1 wl_model_best = np.zeros((N_best_fits, N_modelpoints_max)) flux_model_best = np.zeros((N_best_fits, N_modelpoints_max)) for i in range(N_best_fits): # for each model spectrum wl_model_best[i,:] = wl_model_best_lst[i] flux_model_best[i,:] = flux_model_best_lst[i] # # convolve spectrum # wl_model_conv = np.zeros((N_modelpoints, N_best_fits)) # flux_model_conv = np.zeros((N_modelpoints, N_best_fits)) # for i in range(N_best_fits): # out_convolve_spectrum = convolve_spectrum(wl=wl_model[:,i], flux=flux_model[:,i], lam_res=lam_res, res=res, disp_wl_range=fit_wl_range) # wl_model_conv[:,i] = out_convolve_spectrum['wl_conv'] # flux_model_conv[:,i] = out_convolve_spectrum['flux_conv'] # #if not skip_convolution: # # out_convolve_spectrum = convolve_spectrum(wl=wl_model[:,i], flux=flux_model[:,i], lam_res=lam_res, res=res, disp_wl_range=fit_wl_range) # # wl_model_conv[:,i] = out_convolve_spectrum['wl_conv'] # # flux_model_conv[:,i] = out_convolve_spectrum['flux_conv'] # #else: # # out_read_model_spectrum = models.read_model_spectrum_conv(spectrum_name_full=spectra_name_full_best[i]) # # wl_model_conv[:,i] = out_read_model_spectrum['wl_model'] # # flux_model_conv[:,i] = out_read_model_spectrum['flux_model'] out = {'spectra_name_best': spectra_name_best, 'fit_spectra': fit_spectra, 'fit_photometry': fit_photometry, 'chi2_red_fit_best': chi2_red_fit_best, 'chi2_red_wl_fit_best': chi2_red_wl_fit_best} #'flux_model': flux_model, 'wl_model_conv': wl_model_conv, 'flux_model_conv': flux_model_conv} #'wl_model_conv': wl_array_model_conv_resam_best, 'flux_model_conv': flux_array_model_conv_resam_best, if fit_spectra: out['wl_array_model_conv_resam_best'] = wl_array_model_conv_resam_best #out['flux_array_model_conv_resam_best'] = flux_array_model_conv_resam_best out['flux_array_model_conv_resam_scaled_best'] = flux_array_model_conv_resam_scaled_best out['wl_array_model_conv_resam_fit_best'] = wl_array_model_conv_resam_fit_best #out['flux_array_model_conv_resam_fit_best'] = flux_array_model_conv_resam_fit_best out['flux_array_model_conv_resam_scaled_fit_best'] = flux_array_model_conv_resam_scaled_fit_best out['flux_residuals_spec_best'] = flux_residuals_spec_best out['logflux_residuals_spec_best'] = logflux_residuals_spec_best if fit_photometry: #out['flux_syn_array_model_fit_best'] = flux_syn_array_model_fit_best out['flux_syn_array_model_scaled_fit_best'] = flux_syn_array_model_scaled_fit_best out['lambda_eff_array_model_fit_best'] = lambda_eff_array_model_fit_best out['width_eff_array_model_fit_best'] = width_eff_array_model_fit_best out['flux_residuals_phot_best'] = flux_residuals_phot_best out['logflux_residuals_phot_best'] = logflux_residuals_phot_best #if not skip_convolution or model_dir_ori is not None: if ori_res: out['wl_model_best'] = wl_model_best out['flux_model_best'] = flux_model_best out['params'] = out_separate_params['params'] return out
##################################################
[docs]def generate_model_spectrum(params, model, grid=None, model_dir=None, save_spectrum=False): ''' Description: ------------ Generate a synthetic spectrum for an arbitrary combination of free parameters within the coverage of the input atmospheric model grid. The Python SciPy-based RegularGridInterpolator is used to perform multidimensional linear interpolation of the model spectra at each wavelength point. The interpolated spectrum is constructed from the grid models enclosing the target point in parameter space. The result is a weighted linear combination of these models, with weights determined by the relative distances between the target parameters and the bounding grid nodes. Parameters: ----------- - params : dictionary Value for each free parameter in the models to generate a synthetic spectrum with the desired parameter values. E.g., ``params = {'Teff': 1010, 'logg': 4.2, 'Z': 0.1, 'fsed': 2.2}`` for a model grid with those free parameters. - model : str Atmospheric models to generate the synthetic spectrum. See available models in ``seda.models.Models().available_models``. - grid : dictionary, optional Model grid (``'wavelength'`` and ``'flux'``) generated by ``seda.utils.read_grid`` for interpolations. If not provided (default), then the grid is read (``model`` and ``model_dir`` must be provided). If provided, the code will skip reading the grid, which will save some time (a few minutes). - model_dir : str or list, optional Path to the directory (str or list) or directories (as a list) containing the model spectra (e.g., ``model_dir = ['path_1', 'path_2']``). Required if ``grid`` is not provided. - save_spectrum : {``True``, ``False``}, optional (default ``False``) Save (``True``) or do not save (``False``) the generated spectrum as an ascii table. Default name is '``model``\_``params``\_.dat'. Returns: -------- Dictionary with generated model spectrum: - ``'wavelength'``: wavelengths in microns for the generated spectrum. - ``'flux'``: fluxes in erg/s/cm2/A for the generated spectrum. - ``'params'``: input parameters for the generated synthetic spectrum. Example: -------- >>> import seda >>> >>> # models >>> model = 'Sonora_Elf_Owl' >>> model_dir = ['my_path/Sonora_Elf_Owl/spectra/output_700.0_800.0/', >>> 'my_path/Sonora_Elf_Owl/spectra/output_850.0_950.0/'] >>> >>> # parameters to generate a model spectrum >>> params = {'Teff': 765, 'logg': 4.15, 'logKzz': 5.2, 'Z': 0.2, 'CtoO': 1.2} >>> >>> # generate model spectrum >>> out = seda.utils.generate_model_spectrum(model=model, model_dir=model_dir, >>> params=params) Author: Genaro Suárez Date: 2025-03-14 ''' # verify model_dir is provided if grid is not given if not grid and not model_dir: raise Exception(f'"model_dir" or "grid" must be provided.') # if desired parameters are given as a one-element numpy array # convert it into a float number for param, value in params.items(): if isinstance(value, np.ndarray): params[param] = float(params[param].item()) else: params[param] = float(value) # parameter ranges covered by the model spectra in the input model_dir or grid if grid is None: # when no grid is provided, read the parameter ranges from the spectra in the input folders params_models = select_model_spectra(model=model, model_dir=model_dir)['params'] else: # when a grid is provided, read the parameter ranges from the grid dictionary params_models = grid['params_unique'] # verify there is an input value for each free parameter in the grid for param in params_models: if param not in params: raise Exception(f'Provide "{param}" value in "params" since it is a free parameter in "{model}" models.') # verify that "params" are within the model grid coverage for param in params: if params[param]<params_models[param].min() or params[param]>params_models[param].max(): raise Exception(f'"{param}={params[param]}" value in "params" is out of the range covered by the "{model}" ' f'spectra in "model_dir", which is [{params_models[param].min()}, {params_models[param].max()}]') # sort input params in the same order as the dictionary with free parameters returned by `models.separate_params` params = models.reorder_dict(params, models.Models(model).free_params) # read model grid if not provided if grid is None: # grid values around the desired parameter values params_ranges = {} for param in params: # for each free parameter in the grid params_ranges[param] = find_two_nearest(params_models[param], params[param]) grid = read_grid(model=model, model_dir=model_dir, params_ranges=params_ranges) # define an interpolating function from the grid flux_grid = grid['flux'] params_unique = grid['params_unique'] interp_flux = RegularGridInterpolator((params_unique.values()), flux_grid) # interpolate input parameters spectra_flux = interp_flux(list(params.values()))[0,:] # to return a 1D array spectra_wl = grid['wavelength'] # proper interpolation for each wavelength data point (much slower than above) # # define an interpolating function from the grid # flux_grid = grid['flux'] # params_unique = grid['params_unique'] # # # interpolate input parameters # param_order = models.Models(model).free_params # values = [] # for p in param_order: # values.append(params_unique[p]) # grid_axes = tuple(values) # # xi = [] # for p in param_order: # xi.append(params[p]) # # spectra_flux = [] # for i in range(flux_grid.shape[-1]): # slice_i = flux_grid[..., i] # # # skip if slice contains NaNs (optional but safer) # if np.isnan(slice_i).any(): # spectra_flux.append(np.nan) # continue # # interp = RegularGridInterpolator(grid_axes, slice_i, bounds_error=True) # spectra_flux.append(interp(xi)) # # spectra_flux = np.array(spectra_flux).flatten() # spectra_wl = grid['wavelength'] ## reverse array to sort wavelength from shortest to longest #ind_sort = np.argsort(spectra_wl) #spectra_wl = spectra_wl[ind_sort] #spectra_flux = spectra_flux[ind_sort] # store generated spectrum if save_spectrum: # name for the file spectrum_filename = model for param in params: spectrum_filename += f'_{param}{params[param]}' spectrum_filename += '.dat' # save spectrum out = open(spectrum_filename, 'w') out.write('# wavelength(um) flux(erg/s/cm2/A) \n') for i in range(len(spectra_wl)): out.write('%11.7f %17.6E \n' %(spectra_wl[i], spectra_flux[i])) out.close() out = {'wavelength': spectra_wl, 'flux': spectra_flux, 'params': params} return out
##################################################
[docs]def read_grid(model, model_dir, params_ranges=None, convolve=False, model_wl_range=None, fit_wl_range=None, res=None, lam_res=None, wl_resample=None, disp_wl_range=None, skip_convolution=False, filename_pattern=None, path_save_spectra_conv=None): ''' Description: ------------ Read a model grid of spectra constrained by input parameters. Parameters: ----------- - model : str Atmospheric models. See available models in ``seda.models.Models().available_models``. - model_dir : str or list Path to the directory (str or list) or directories (as a list) containing the model spectra (e.g., ``model_dir = ['path_1', 'path_2']``). - params_ranges : dictionary, optional Minimum and maximum values for any model free parameters to select a model grid subset. E.g., ``params_ranges = {'Teff': [1000, 1200], 'logg': [4., 5.]}`` to consider spectra within those Teff and logg ranges. If a parameter range is not provided, the full range in ``model_dir`` is considered. - convolve: {``True``, ``False``}, optional (default ``False``) Convolve (``'True'``) or do not convolve (``'False'``) the model grid spectra to the indicated ``res`` at ``lam_res``. - fit_wl_range : float array, optional Minimum and maximum wavelengths (in micron) to resample the model spectra to the fit range. E.g., ``fit_wl_range = np.array([fit_wl_min1, fit_wl_max1])``. Default values are the minimum and the maximum wavelengths in ``wl_resample``. - disp_wl_range : float array, optional Minimum and maximum wavelengths (in micron) to compute the median wavelength dispersion of model spectra to convolve them. Default values are the minimum and the maximum wavelengths in ``wl_resample``. - model_wl_range : float array (optional) Minimum and maximum wavelength (in microns) to cut model spectra to keep only wavelengths of interest. Default values are the minimum and maximum wavelengths in ``wl_resample``, if provided, with a padding to avoid issues with the resampling. Otherwise, ``model_wl_range=None``, so model spectra will not be trimmed. - fit_phot_range : float array or list, optional Minimum and maximum wavelengths (in micron) where photometry will be compared to the models. E.g., ``fit_phot_range = np.array([fit_phot_min1, fit_phot_max1])``. This parameter is used if ``fit_photometry`` but ignored if only ``fit_spectra``. Default values are the minimum and the maximum of the filter effective wavelengths from SVO. - res : float, optional (required if ``convolve``). Spectral resolution (at ``lam_res``) to smooth model spectra. - lam_res : float, optional. Wavelength of reference for ``res``. Default is the median wavelength of the spectrum. - wl_resample : float array or list, optional Wavelength data points to resample the grid - disp_wl_range : float array, optional Minimum and maximum wavelengths (in microns) to compute the median wavelength dispersion of model spectrum. Default values are the minimum and maximum wavelengths in model spectra. - skip_convolution : {``True``, ``False``}, optional (default ``False``) Convolution of model spectra (the slowest process in the code) can (``True``) or cannot (``False``) be avoided. Once the code has be run and the convolved spectra were stored in ``path_save_spectra_conv``, the convolved grid can be reused for other input data with the same resolution as the convolved spectra. - filename_pattern : str, optional Pattern to select only files including it. Default is a common pattern in all spectra original filenames in ``model``, as indicated by ``models.Models(model).filename_pattern``. - path_save_spectra_conv: str, optional Directory path to store convolved model spectra. If not provided (default), the convolved spectra will not be saved. If the directory does not exist, it will be created. Otherwise, the spectra will be added to the existing folder. The convolved spectra will keep the same original names along with the ``res`` and ``lam_res`` parameters, e.g. 'original_spectrum_name_R100at1um.nc' for ``res=100`` and ``lam_res=1``. They will be saved as netCDF with xarray (it produces lighter files compared to normal ASCII files). Returns: -------- Dictionary with the model grid either convolved and resampled (if requested) or synthetic photometry: - ``'wavelength'`` : wavelengths in microns for the model spectra in the grid. - ``'flux'`` : fluxes in erg/s/cm2/A for the model spectra in the grid. - ``'params_unique'`` : dictionary with unique (non-repetitive) values for each model free parameter - ``'N_model_spectra'`` : dictionary with unique (non-repetitive) values for each model free parameter Example: -------- >>> import seda >>> >>> # models >>> model = 'Sonora_Elf_Owl' >>> model_dir = ['my_path/Sonora_Elf_Owl/spectra/output_700.0_800.0/', >>> 'my_path/Sonora_Elf_Owl/spectra/output_850.0_950.0/'] >>> >>> # set ranges for some (Teff and logg) free parameters to select only a grid subset >>> params_ranges = {'Teff': [700, 900], 'logg': [4.0, 5.0]} >>> >>> # read the grid >>> out_read_grid = seda.utils.read_grid(model=model, model_dir=model_dir, >>> params_ranges=params_ranges) Author: Genaro Suárez Date: 2023-02 ''' ini_time_grid = time.time() # to estimate the time elapsed reading the grid # ensure model_dir is a list model_dir = var_to_list(model_dir) # add path separator at the end of model_dir if needed model_dir = ensure_trailing_separator(model_dir) if wl_resample is not None: # handle fit_wl_range fit_wl_range = set_fit_wl_range(fit_wl_range=fit_wl_range, N_spectra=1, wl_spectra=[wl_resample])[0] # handle model_wl_range model_wl_range = set_model_wl_range(model_wl_range=model_wl_range, wl_spectra_min=fit_wl_range.min(), wl_spectra_max=fit_wl_range.max()) # read the model spectra names and their parameters in the input folders and meeting the indicated parameters ranges out_select_model_spectra = select_model_spectra(model=model, model_dir=model_dir, params_ranges=params_ranges, filename_pattern=filename_pattern) spectra_name_full = out_select_model_spectra['spectra_name_full'] spectra_name = out_select_model_spectra['spectra_name'] params = out_select_model_spectra['params'] # unique values for each free parameter in the selected spectra params_unique = {} # initialize dictionary for param in params.keys(): params_unique[param] = np.unique(params[param]) # save unique values for each free parameter # create an array with as many dimensions as the number of free parameters in the models # and each dimension's size as the number of unique values for the corresponding parameter arr = np.array(np.nan) # initialize float array of NaNs with dimension zero for param in params.keys(): # for each free parameter dim_size = len(params_unique[param]) # number of unique values in each parameter new_dim = np.expand_dims(arr, -1) # add a dimension (at the end) arr = np.repeat(new_dim, dim_size, axis=-1) # change size of new dimension # load model grid with the selected model spectra # create a tqdm progress bar if convolve: if wl_resample is not None: desc = 'Reading, convolving, and resampling model grid' else: desc = 'Reading and convolving model grid' else: if wl_resample is not None: desc = 'Reading and resampling model grid' else: desc = 'Reading model grid' # number of parameter combinations values = params_unique.values() # get all arrays from the dictionary sizes = [] # initialize list for sizes # compute length of each array explicitly for v in values: sizes.append(len(v)) # multiply all sizes to get total number of combinations n_combinations = np.prod(sizes) # print message for heterogeneous grid if n_combinations-len(spectra_name)>0: print(f'\n Heterogeneous grid detected') print(f' A homogeneous grid would contain {n_combinations} spectra from unique parameter combinations,') print(f' but {len(spectra_name)} spectra were read from "model_dir".') print(f' The output dictionary "dict_missing" list the parameter combinations lacking spectra.') grid_bar = tqdm(total=n_combinations, desc=desc) # initialize dictionary to store parameter combinations without model spectra dict_missing = {} for k in params.keys(): dict_missing[k] = [] # save model spectra for each combination of free parameter values first_spec = True # reference to initialize arrays for store the grid in the first iteration with a model spectrum for index in np.ndindex(arr.shape): # iterate over all possible combinations of the free parameter unique values # update the progress bar grid_bar.update(1) # mask to select the corresponding spectrum for each combination of parameters mask = np.ones(len(spectra_name), bool) # initialize mask with Trues for i,param in enumerate(params.keys()): # for each free parameter mask &= params[param]==params_unique[param][index[i]] # apply criteria to the mask and update it # verify if there is a model spectrum for the combination of parameters if not np.any(mask): # if there is not a spectrum for i,param in enumerate(params.keys()): dict_missing[param].append(params_unique[param][index[i]]) else: # if there is a spectrum # read the spectrum with the parameter combination in the iteration and cut it to model_wl_range (default value: fit_wl_range plus padding) spectrum_name_full = spectra_name_full[mask][0] if not skip_convolution: # read model spectra with original resolution out_read_model_spectrum = models.read_model_spectrum(spectrum_name_full=spectrum_name_full, model=model, model_wl_range=model_wl_range) else: # read precomputed convolved model spectra out_read_model_spectrum = models.read_model_spectrum_conv(spectrum_name_full=spectrum_name_full, model_wl_range=model_wl_range) flux_model = out_read_model_spectrum['flux_model'] # in erg/s/cm2/A wl_model = out_read_model_spectrum['wl_model'] # in um # convolve (if requested) the model spectrum to the indicated resolution if convolve and not skip_convolution: # convolve spectra only if convolve is True and skip_convolution is False if path_save_spectra_conv is None: # do not save the convolved spectrum out_convolve_spectrum = convolve_spectrum(wl=wl_model, flux=flux_model, res=res, lam_res=lam_res, disp_wl_range=disp_wl_range) else: # save convolved spectrum if not os.path.exists(path_save_spectra_conv): os.makedirs(path_save_spectra_conv) # make directory (if not existing) to store convolved spectra out_file = path_save_spectra_conv+spectra_name[mask][0]+f'_R{res}at{lam_res}um.nc' out_convolve_spectrum = convolve_spectrum(wl=wl_model, flux=flux_model, res=res, lam_res=lam_res, disp_wl_range=disp_wl_range, out_file=out_file) wl_model = out_convolve_spectrum['wl_conv'] flux_model = out_convolve_spectrum['flux_conv'] # resample (if requested) the convolved model spectrum to the wavelength data points in the observed spectra if wl_resample is not None: # mask to select data points within the fit range or model coverage range, whichever is narrower mask_fit = (wl_resample >= max(fit_wl_range[0], wl_model.min())) & \ (wl_resample <= min(fit_wl_range[1], wl_model.max())) flux_model = spectres(wl_resample[mask_fit], wl_model, flux_model) wl_model = wl_resample[mask_fit] # save spectrum for each combination if first_spec: # for the first parameters' combination with a model spectrum # define arrays to save the grid # add a last dimension with the number of data points in the model spectrum subset # it is better to initialize the arrays after the first iteration to consider the model spectrum # data points after resampling instead of using all data points in the original model spectrum wl_grid = wl_model flux_grid = np.repeat(np.expand_dims(arr, -1), len(wl_model), axis=-1) # to save the flux at each grid point # save first spectrum # wl_grid[index] = wl_model flux_grid[index] = flux_model first_spec = False # to avoid initializing grid arrays in future iterations else: # all but first parameters' combinations # ensure the new spectrum has the same number of data points as the first one read if wl_model.shape!=wl_grid.shape[-1:]: raise ValueError(f'Spectrum {spectra_name[mask]} has a different number of data points compared to the previous ones') # save spectrum # wl_grid[index] = wl_model flux_grid[index] = flux_model # close the progress bar grid_bar.close() fin_time_grid = time.time() print_time(fin_time_grid-ini_time_grid) out = {'wavelength': wl_grid, 'flux': flux_grid, 'params_unique': params_unique, 'N_model_spectra': len(spectra_name), 'dict_missing': dict_missing} return out
##################################################
[docs]def read_grid_phot(model, model_dir, filters, params_ranges=None, fit_phot_range=None, skip_syn_phot=False, model_wl_range=None, path_save_syn_phot=None): ''' Description: ------------ Read a model grid of synthetic photometry constrained by input parameters. Parameters: ----------- - model : str Atmospheric models. See available models in ``seda.models.Models().available_models``. - model_dir : str or list Path to the directory (str or list) or directories (as a list) containing the model spectra (e.g., ``model_dir = ['path_1', 'path_2']``). - params_ranges : dictionary, optional Minimum and maximum values for any model free parameters to select a model grid subset. E.g., ``params_ranges = {'Teff': [1000, 1200], 'logg': [4., 5.]}`` to consider spectra within those Teff and logg ranges. If a parameter range is not provided, the full range in ``model_dir`` is considered. - model_wl_range : float array (optional) Minimum and maximum wavelength (in microns) to cut model spectra to keep only wavelengths of interest. Default values are the minimum and maximum wavelengths in ``wl_resample``, if provided, with a padding to avoid issues with the resampling. Otherwise, ``model_wl_range=None``, so model spectra will not be trimmed. - fit_phot_range : float array or list, optional Minimum and maximum wavelengths (in micron) where photometry will be compared to the models. E.g., ``fit_phot_range = np.array([fit_phot_min1, fit_phot_max1])``. This parameter is used if ``fit_photometry`` but ignored if only ``fit_spectra``. Default values are the minimum and the maximum of the filter effective wavelengths from SVO. - filters : float array Filters to derive synthetic photometry following SVO filter IDs http://svo2.cab.inta-csic.es/theory/fps/ - path_save_syn_phot: str, optional Directory path to store the synthetic fluxes (in erg/s/cm2/A). If not provided (default), the synthetic photometry will not be saved. If the directory does not exist, it will be created. Otherwise, the photometry will be added to the existing folder. The synthetic photometry for different filters derived from the same model spectrum will be saved in a single ASCII table, named after the model with the suffix "_syn_phot.dat". If a synthetic photometry file for a given model spectrum already exists, it will be updated to include photometry for any new filters as needed. - skip_syn_phot : {``True``, ``False``}, optional (default ``False``) Synthetic photometry calculation (the lowest process when fitting photometry) can (``True``) or cannot (``False``) be avoided. If 'True', ``model_dir`` should correspond to the directory with the synthetic photometry for ``filters`` in ``input_parameters.InputData``. Returns: -------- Dictionary with the model grid either convolved and resampled (if requested) or synthetic photometry: - ``'wavelength'`` : wavelengths in microns for the model spectra in the grid. - ``'flux'`` : fluxes in erg/s/cm2/A for synthetic photometry in the grid. - ``'params_unique'`` : dictionary with unique (non-repetitive) values for each model free parameter - ``'N_model_spectra'`` : dictionary with unique (non-repetitive) values for each model free parameter Example: -------- >>> import seda >>> >>> # models >>> model = 'Sonora_Elf_Owl' >>> model_dir = ['my_path/Sonora_Elf_Owl/spectra/output_700.0_800.0/', >>> 'my_path/Sonora_Elf_Owl/spectra/output_850.0_950.0/'] >>> >>> # set ranges for some (Teff and logg) free parameters to select only a grid subset >>> params_ranges = {'Teff': [700, 900], 'logg': [4.0, 5.0]} >>> >>> # read the grid >>> out_read_grid = seda.utils.read_grid_phot(model=model, model_dir=model_dir, >>> params_ranges=params_ranges) Author: Genaro Suárez Date: 2025-11-22 ''' ini_time_grid = time.time() # to estimate the time elapsed reading the grid # ensure model_dir is a list model_dir = var_to_list(model_dir) # add path separator at the end of model_dir if needed model_dir = ensure_trailing_separator(model_dir) # read the model spectra names and their parameters in the input folders and meeting the indicated parameters ranges out_select_model_spectra = select_model_spectra(model=model, model_dir=model_dir, params_ranges=params_ranges) spectra_name_full = out_select_model_spectra['spectra_name_full'] spectra_name = out_select_model_spectra['spectra_name'] params = out_select_model_spectra['params'] # unique values for each free parameter in the selected spectra params_unique = {} # initialize dictionary for param in params.keys(): params_unique[param] = np.unique(params[param]) # save unique values for each free parameter # create an array with as many dimensions as the number of free parameters in the models # and each dimension's size as the number of unique values for the corresponding parameter arr = np.array(0.)*np.nan # initialize float array of NaNs with dimension zero for param in params.keys(): # for each free parameter dim_size = len(params_unique[param]) # number of unique values in each parameter new_dim = np.expand_dims(arr, -1) # add a dimension (at the end) arr = np.repeat(new_dim, dim_size, axis=-1) # change size of new dimension # load model grid with the selected synthetic fluxes # create a tqdm progress bar desc = 'Deriving synthetic photometry from model spectra' grid_bar = tqdm(total=len(spectra_name), desc=desc) # save synthetic fluxes for each combination of free parameter values # define arrays to save the grid # add a last dimension with the number of synthetic fluxes wl_grid = np.repeat(np.expand_dims(arr, -1), len(filters), axis=-1) # to save the effective wavelength at each grid point flux_grid = np.repeat(np.expand_dims(arr, -1), len(filters), axis=-1) # to save the synthetic flux at each grid point first_spec = True # reference to read effective wavelengths from the first spectrum only for index in np.ndindex(arr.shape): # iterate over all possible combinations of the free parameter unique values # update the progress bar grid_bar.update(1) # mask to select the corresponding spectrum for each combination of parameters mask = np.ones(len(spectra_name), bool) # initialize mask with Trues for i,param in enumerate(params.keys()): # for each free parameter mask &= params[param]==params_unique[param][index[i]] # apply criteria to the mask and update it # verify if there is a model spectrum for the combination of parameters if not np.any(mask): # if there is not a spectrum print(' Caveat: No spectrum in "model_dir" for the combination:') for i,param in enumerate(params.keys()): print(f' {param}={params_unique[param][index[i]]}') else: # if there is a spectrum # read the spectrum with the parameter combination in the iteration and cut it to model_wl_range (default value: fit_wl_range plus padding) spectrum_name_full = spectra_name_full[mask][0] spectrum_name = spectra_name[mask][0] if not skip_syn_phot: # do not avoid synthetic photometry calculation # read model spectrum with original resolution in the full model_wl_range_each out_read_model_spectrum = models.read_model_spectrum(spectrum_name_full=spectrum_name_full, model=model, model_wl_range=model_wl_range) wl_model = out_read_model_spectrum['wl_model'] # um flux_model = out_read_model_spectrum['flux_model'] # erg/s/cm2/A # derive synthetic photometry out_syn_phot = synthetic_photometry(wl=wl_model, flux=flux_model, flux_unit='erg/s/cm2/A', filters=filters) flux_syn = out_syn_phot['syn_flux(erg/s/cm2/A)'] # erg/s/cm2/A # estimate filters' effective wavelengths from the first spectrum to be the wavelength reference if first_spec: lambda_eff = out_syn_phot['lambda_eff(um)'] # um first_spec = False # to avoid estimating effective wavelengths again # store synthetic photometric if path_save_syn_phot is not None: file_name = path_save_syn_phot+spectra_name[i]+'_syn_phot.dat' if not os.path.exists(file_name): # file with synthetic photometry does not exist yet # make a dictionary with the parameters to be saved dict_syn_phot = {} keys = ['filters', 'syn_flux(erg/s/cm2/A)', 'lambda_eff(um)', 'width_eff(um)'] # parameters of interest for key in keys: if key=='filters': dict_syn_phot[key] = out_syn_phot[key] else: dict_syn_phot[key] = np.round(out_syn_phot[key], 6) # keep six decimals # sort dictionary with respect to filter name sort_ind = np.argsort(dict_syn_phot['filters']) for key in dict_syn_phot.keys(): dict_syn_phot[key] = dict_syn_phot[key][sort_ind] # save the dictionary as prettytable table if not os.path.exists(path_save_syn_phot): os.makedirs(path_save_syn_phot) # make directory (if not existing) to store synthetic photometry save_prettytable(my_dict=dict_syn_phot, table_name=file_name) else: # file already exist # open file to see whether the flux for a given filter is already stored dict_syn_phot = read_prettytable(file_name) for j, filt in enumerate(filters_fit): # for each filter used to derived synthetic photometry if filt not in dict_syn_phot['filters']: # filter with synthetic photometry is not in the table for key in dict_syn_phot.keys(): # for each parameter in the table if key=='filters': dict_syn_phot[key] = np.append(dict_syn_phot[key], out_syn_phot[key][j]) else: dict_syn_phot[key] = np.append(dict_syn_phot[key], np.round(out_syn_phot[key][j], 6)) # keep six decimals # sort dictionary with respect to filter name sort_ind = np.argsort(dict_syn_phot['filters']) for key in dict_syn_phot.keys(): dict_syn_phot[key] = dict_syn_phot[key][sort_ind] # update the existing file with new synthetic photometry save_prettytable(my_dict=dict_syn_phot, table_name=file_name) else: # read pre-computed synthetic photometry out_syn_phot = read_prettytable(filename=spectrum_name_full+'_syn_phot.dat') # get from the table only parameters for the input filters within the fit range flux_syn_each = [] lambda_eff_each = [] for filt in filters_fit: if filt in out_syn_phot['filters']: # filter is in the table with synthetic photometry ind = out_syn_phot['filters']==filt # index in the table for filter in the iteration # store filters' parameters in the lists flux_syn_each.append(out_syn_phot['syn_flux(erg/s/cm2/A)'][ind][0]) # erg/s/cm2/A lambda_eff_each.append(out_syn_phot['lambda_eff(um)'][ind][0]) # um else: raise Exception(f'There is not synthetic photometry for filter "{filt}" and model "{spectrum_name}".') # store filters' parameters in the lists flux_syn = np.array(flux_syn_each) # consider the effective wavelengths from the first model as wavelength references if first_spec: lambda_eff = np.array(lambda_eff_each) first_spec = False # to avoid estimating effective wavelengths again # save synthetic fluxes for each combination wl_grid[index] = lambda_eff flux_grid[index] = flux_syn # close the progress bar grid_bar.close() fin_time_grid = time.time() print_time(fin_time_grid-ini_time_grid) out = {'wavelength': wl_grid, 'flux': flux_grid, 'params_unique': params_unique, 'N_model_spectra': len(spectra_name)} return out
##########################
[docs]def best_bayesian_fit(output_bayes, grid=None, model_dir_ori=None, ori_res=False, save_spectrum=False): ''' Description: ------------ Generate model spectrum with the posterior parameters. Parameters: ----------- - 'output_bayes' : dictionary or str Output dictionary with the results from the nested sampling by ``bayes``. It can be either the name of the pickle file or simply the output dictionary. - grid : dictionary, optional Model grid (``'wavelength'`` and ``'flux'``) generated by ``seda.utils.read_grid`` for interpolations. If not provided (default), then a grid subset with model spectra around the median posteriors is read. If provided, the code will skip reading the grid, which will save some time. - ori_res : {``True``, ``False``}, optional (default ``False``) Read (``True``) or do not read (``False``) model spectrum for the best fit with the original resolution. - model_dir_ori : str, list, or array Path to the directory (str, list, or array) or directories (as a list or array) containing the model spectra with the original resolution. This parameter is needed if ``ori_res=True`` and `seda.bayes_fit.bayes` was run skipping the model spectra convolution (if `skip_convolution=True``). - save_spectrum : {``True``, ``False``}, optional (default ``False``) Save (``True``) or do not save (``False``) best model fit from the nested sampling. Returns: -------- - '``model``\_best\_fit\_bayesian\_sampling.dat' : ascii table Table with best model fit (if ``save_spectrum``). - dictionary Dictionary with the best model fit from the nested sampling: - ``'wl_spectra_fit'`` : (if ``fit_spectra``) wavelength in um of the input spectra within ``fit_wl_range``. - ``'flux_spectra_fit'`` : (if ``fit_spectra``) fluxes in erg/cm2/s/A of the input spectra within ``fit_wl_range``. - ``'eflux_spectra_fit'`` : (if ``fit_spectra``) flux uncertainties in erg/cm2/s/A of the input spectra within ``fit_wl_range``. - ``'phot_fit'`` : (if ``fit_photometry``) flux in erg/cm2/s/A of the input photometry within ``fit_wl_range``. - ``'ephot_fit'`` : (if ``fit_photometry``) flux uncertainties in erg/cm2/s/A of the input photometry within ``fit_wl_range``. - ``'params_med'`` : median values for sampled parameters. - ``'params_errors'`` : lower and upper parameter errors considering the confidence interval ``params_confidence_interval``. - ``'params_confidence_interval'`` : confidence interval for sampled parameters. - ``'confidence_interval(%)'`` : central percentage considered to calculate the confidence interval. - ``'wl_model'`` : wavelength in um of the best scaled model fit (convolved, and resampled when fit_spectra or synthetic fluxes when fit_spectra) within ``fit_wl_range``. - ``'flux_model'`` : fluxes in erg/cm2/s/A of the best scaled, convolved, and resampled model fit - ``'wl_model_best'`` : (if ``ori_res`` is ``True``) wavelength in um of the best scaled model fit with its original resolution - ``'flux_model_best'`` : (if ``ori_res`` is ``True``) fluxes in erg/cm2/s/A of the best scaled model fit with its original resolution Author: Genaro Suárez Date: 2024-09 ''' # open dictionary if need it output_bayes = load_output_fit(output_bayes) fit_spectra = output_bayes['my_bayes'].fit_spectra fit_photometry = output_bayes['my_bayes'].fit_photometry model = output_bayes['my_bayes'].model model_dir = output_bayes['my_bayes'].model_dir filename_pattern = output_bayes['my_bayes'].filename_pattern path_save_spectra_conv = output_bayes['my_bayes'].path_save_spectra_conv skip_convolution = output_bayes['my_bayes'].skip_convolution res = output_bayes['my_bayes'].res lam_res = output_bayes['my_bayes'].lam_res distance = output_bayes['my_bayes'].distance if fit_spectra: wl_spectra_min = output_bayes['my_bayes'].wl_spectra_min wl_spectra_max = output_bayes['my_bayes'].wl_spectra_max wl_spectra_fit = output_bayes['my_bayes'].wl_spectra_fit flux_spectra_fit = output_bayes['my_bayes'].flux_spectra_fit eflux_spectra_fit = output_bayes['my_bayes'].eflux_spectra_fit N_spectra = output_bayes['my_bayes'].N_spectra if fit_photometry: phot_fit = output_bayes['my_bayes'].phot_fit ephot_fit = output_bayes['my_bayes'].ephot_fit filters_fit = output_bayes['my_bayes'].filters_fit lambda_eff_SVO_fit = output_bayes['my_bayes'].lambda_eff_SVO_fit width_eff_SVO_fit = output_bayes['my_bayes'].width_eff_SVO_fit fit_phot_range = output_bayes['my_bayes'].fit_phot_range fit_wl_range = output_bayes['my_bayes'].fit_wl_range params_priors = output_bayes['my_bayes'].params_priors out_dynesty = output_bayes['out_dynesty'] # compute median values and confidence intervals for all sampled parameters conf_interval = 68 # 68% confidence interval params_med = {} params_conf = {} params_errors = {} for i,param in enumerate(params_priors): # for each parameter in the sampling # percentiles params_med[param] = np.median(out_dynesty.samples[:,i]) # add the median of each parameter to the dictionary quantile_low = 50 - conf_interval/2 quantile_high = 50 + conf_interval/2 params_quantile_low = np.percentile(out_dynesty.samples[:,i], quantile_low) # parameter value at quantile_low params_quantile_high = np.percentile(out_dynesty.samples[:,i], quantile_high) # parameter value at quantile_high params_conf[param] = [params_quantile_low, params_quantile_high] # add the confidence range of each parameter to the dictionary # lower and upper errors lower = params_med[param] - params_quantile_low upper = params_quantile_high - params_med[param] params_errors[param] = [lower, upper] # round median parameters params_models = models.Models(model).params_unique # free parameters in the models for i,param in enumerate(params_med): # for each sampled parameter if param in params_models: # for free parameters in the model grid # percentiles params_med[param] = round(params_med[param], max_decimals(params_models[param])+1) # round to the precision (plus one decimal place) of the parameter in models conf_low = round(params_conf[param][0], max_decimals(params_models[param])+1) conf_high = round(params_conf[param][1], max_decimals(params_models[param])+1) params_conf[param] = [conf_low, conf_high] # errors lower = round(params_errors[param][0], max_decimals(params_models[param])+1) upper = round(params_errors[param][1], max_decimals(params_models[param])+1) params_errors[param] = [lower, upper] else: # parameters other than those in the grid (e.g. radius) # percentiles params_med[param] = round(params_med[param], 2) # consider two decimals conf_low = round(params_conf[param][0], 2) conf_high = round(params_conf[param][1], 2) params_conf[param] = [conf_low, conf_high] # errors lower = round(params_errors[param][0], 2) upper = round(params_errors[param][1], 2) params_errors[param] = [lower, upper] # read grid, if needed if grid is None: # grid values around the desired parameter values params_ranges = {} for param in params_models: # for each free parameter in the grid params_ranges[param] = find_two_nearest(params_models[param], params_med[param]) # read grid, convolve it (if not skip_convolution), and resample it to the input spectra if fit_spectra: grid_spec = [] # to save a grid appropriate for each input spectrum for i in range(N_spectra): # for each input observed spectrum print(f'\nFor input spectrum {i+1} of {N_spectra}') if not skip_convolution: # read and convolve original model spectra grid_each = read_grid(model=model, model_dir=model_dir, params_ranges=params_ranges, convolve=True, res=res[i], lam_res=lam_res[i], fit_wl_range=fit_wl_range[i], wl_resample=wl_spectra_fit[i]) else: # read model spectra already convolved to the data resolution # set filename_pattern to look for model spectra with the corresponding resolution filename_pattern.append(models.Models(model).filename_pattern+f'_R{res[i]}at{lam_res[i]}um.nc') grid_each = read_grid(model=model, model_dir=model_dir, params_ranges=params_ranges, res=res[i], lam_res=lam_res[i], fit_wl_range=fit_wl_range[i], wl_resample=wl_spectra_fit[i], skip_convolution=skip_convolution, filename_pattern=filename_pattern[i]) # add resampled grid for each input spectrum to the same list grid_spec.append(grid_each) if fit_photometry: # set filename_pattern to look for model spectra filename_pattern = [models.Models(model).filename_pattern] grid_phot = read_grid_phot(model=model, model_dir=model_dir, params_ranges=params_ranges, filters=filters_fit) grid_phot = [grid_phot] # grid as list to follow structure from then fit_spectra # define grid depending whether spectra and/or photometry were provided if fit_spectra and not fit_photometry: grid = grid_spec if not fit_spectra and fit_photometry: grid = grid_phot if fit_spectra and fit_photometry: grid = grid_spec + grid_phot # generate a synthetic spectrum with the median parameter values for only the free parameters in the models # (avoid radius, if included in params_med) params = {} for param in params_models: # for each free parameter in the grid params[param] = params_med[param] wl = [] flux = [] for i in range(len(grid)): # for each input observed spectrum syn_spectrum = generate_model_spectrum(params=params, model=model, grid=grid[i]) wl.append(syn_spectrum['wavelength']) flux.append(syn_spectrum['flux']) # scale synthetic spectrum if distance is not None: # when radius was constrained wl_scaled = [] flux_scaled = [] for i in range(len(grid)): # for each input observed spectrum flux_scaled.append(scale_synthetic_spectrum(wl=wl[i], flux=flux[i], distance=distance, radius=params_med['R'])) wl_scaled.append(wl[i]) else: # when radius was not constrained wl_scaled = [] flux_scaled = [] for i in range(len(grid)): # for each input observed spectrum # DOUBLE CHECK scaling = np.sum(flux_obs[i]*flux/eflux_obs[i]**2) / np.sum(flux_model**2/eflux_obs[i]**2) # scaling that minimizes chi2 flux_scaled.append(scaling*flux) wl_scaled.append(wl) # read best fits with the original resolution if ori_res: if skip_convolution: # when the convolution was skipped if model_dir_ori is None: raise Exception(f'parameter "model_dir_ori" is needed to read model spectra with the original resolution') else: model_dir = model_dir_ori # generate spectrum syn_spectrum = generate_model_spectrum(params=params, model=model, model_dir=model_dir) wl_model_best = syn_spectrum['wavelength'] flux_model_best = syn_spectrum['flux'] # scale synthetic spectrum flux_model_best = scale_synthetic_spectrum(wl=wl_model_best, flux=flux_model_best, distance=distance, radius=params_med['R']) # output dictionary out = {'wl_model': wl_scaled, 'flux_model': flux_scaled, 'params_med': params_med, 'params_errors': params_errors, 'params_confidence_interval': params_conf, 'confidence_interval(%)': conf_interval} if fit_spectra: out['wl_spectra_fit'] = wl_spectra_fit out['flux_spectra_fit'] = flux_spectra_fit out['eflux_spectra_fit'] = eflux_spectra_fit if fit_photometry: out['phot_fit'] = phot_fit out['ephot_fit'] = ephot_fit if ori_res: out['wl_model_best'] = wl_model_best out['flux_model_best'] = flux_model_best return out
##########################
[docs]def select_model_spectra(model, model_dir, params_ranges=None, filename_pattern=None, save_results=False, out_file=None): ''' Description: ------------ Select model spectra from the indicated models and meeting given parameters ranges. Parameters: ----------- - model : str Atmospheric models. See available models in ``seda.models.Models().available_models``. - model_dir : str or list Path to the directory (str or list) or directories (as a list) containing the model spectra (e.g., ``model_dir = ['path_1', 'path_2']``). - params_ranges : dictionary, optional Minimum and maximum values for any model free parameters to select a model grid subset. E.g., ``params_ranges = {'Teff': [1000, 1200], 'logg': [4., 5.]}`` to consider spectra within those Teff and logg ranges. If a parameter range is not provided, the full range in ``model_dir`` is considered. - filename_pattern : str, optional Pattern to select only files including it. Default is a common pattern in all spectra original filenames in ``model``, as indicated by ``models.Models(model).filename_pattern``. - save_results : {``True``, ``False``}, optional (default ``False``) Save (``True``) or do not save (``False``) the output as a pickle file named '``model``\_free\_parameters.pickle'. - out_file : str, optional File name to save the results as a pickle file (it can include a path e.g. my_path/free\_params.pickle). Default name is '``model``\_free_parameters.pickle' and is stored at the notebook location. Returns: -------- Dictionary with the parameters: - ``spectra_name``: selected model spectra names. - ``spectra_name_full``: selected model spectra names with full path. - ``params``: parameters for the selected model spectra, as given by the ``seda.models.separate_params`` output dictionary. Example: -------- >>> import seda >>> >>> model = 'Sonora_Elf_Owl' >>> model_dir = ['my_path/output_575.0_650.0/', >>> 'my_path/output_700.0_800.0/'] # folders to seek model spectra >>> # set ranges for some (Teff and logg) free parameters to select only a grid subset >>> params_ranges = {'Teff': [700, 900], 'logg': [4.0, 5.0]} >>> out = seda.utils.select_model_spectra(model=model, model_dir=model_dir, >>> params_ranges=params_ranges) Author: Genaro Suárez Date: 2020-05 ''' # make sure model_dir is a list model_dir = var_to_list(model_dir) # add path separator at the end of model_dir if needed model_dir = ensure_trailing_separator(model_dir) # set default parameters # if params_ranges is provided, verified that there is a minimum and a maximum values for each provided parameter if params_ranges is not None: for param in params_ranges: if len(params_ranges[param])!=2: raise Exception(f'{param} in "params_ranges" must have two values (minimum and maximum), ' f'but {len(params_ranges[param])} values were given') # if params_ranges is not provided, define params_ranges as an empty dictionary if params_ranges is None: params_ranges = {} # empty dictionary # if filename_pattern is not provided, consider the common pattern in original file names if filename_pattern is None: filename_pattern = models.Models(model).filename_pattern # to store files in model_dir files = [] # with full path files_short = [] # only spectra names for i in range(len(model_dir)): files_model_dir = fnmatch.filter(os.listdir(model_dir[i]), filename_pattern) files_model_dir.sort() # just to sort the files wrt their names for file in files_model_dir: files.append(model_dir[i]+file) files_short.append(file) # read free parameters from each model spectrum out_separate_params = models.separate_params(model=model, spectra_name=files_short) params_spectra = out_separate_params['params'] # select spectra within the desired parameter ranges spectra_name_full = [] # full name with path spectra_name = [] # only spectra names # select spectra within the desired parameter ranges spectra_name_full = [] # full name with path spectra_name = [] # only spectra names for i in range(len(files)): # for each file in model_dir mask = True # initialize mask to apply parameter ranges criteria for param in params_spectra: # for each free parameter if param in params_ranges: # if there is an input range to constrain the free parameter if not params_ranges[param][0] <= params_spectra[param][i] <= params_ranges[param][1]: mask = False # change mask if the file is for a spectrum out of the input ranges if mask: # keep only spectra within the input parameter ranges spectra_name_full.append(files[i]) # file with full path spectra_name.append(files_short[i]) # only file name if len(spectra_name_full)==0: # show up an error when there are no models in the indicated ranges raise Exception(f'No model spectra in "model_dir": {model_dir} within params_ranges: {params_ranges}') # convert lists into numpy arrays spectra_name_full = np.array(spectra_name_full) spectra_name = np.array(spectra_name) # separate parameters from selected spectra out_separate_params = models.separate_params(model=model, spectra_name=spectra_name, save_results=save_results, out_file=out_file) # all free parameters params = out_separate_params['params'] # free parameters no constrained params_noranges = {} for key, value in params.items(): if key not in params_ranges: params_noranges[key] = [value.min(), value.max()] print(f'\n {len(spectra_name)} model spectra') print(f' user-constrained parameters:') for param in params_ranges: print(f' {param} range = {params_ranges[param]}') print(f' model-constrained parameters:') for param in params_noranges: print(f' {param} range = {params_noranges[param]}') out = {'spectra_name_full': spectra_name_full, 'spectra_name': spectra_name, 'params': out_separate_params['params']} return out
##########################
[docs]def read_SVO_params(filters, params): ''' Description: ------------ Read parameters of interest from SVO for a list of filters. Parameters: ----------- - filters : list or array Filter names to retrieve parameters from SVO. - params : list or array Parameter of interest following SVO names. Returns: -------- Dictionary with the parameters ``params`` for the input filters. Example: -------- >>> import seda >>> >>> filters = ['2MASS/2MASS.J', '2MASS/2MASS.H', '2MASS/2MASS.Ks'] >>> params = ['filterID', 'WavelengthEff', 'WidthEff'] >>> seda.utils.read_SVO_params(filters=filters, params=params) {'filterID': array(['2MASS/2MASS.J', '2MASS/2MASS.H', '2MASS/2MASS.Ks'], dtype=object), 'WavelengthEff': array([12350., 16620., 21590.]), 'WidthEff': array([1624.3190191 , 2509.40349871, 2618.86953322])} Author: Genaro Suárez Date: 2025-09-05 ''' # # verify input parameters are numpy arrays # filters = var_to_numpy(filters) # params = var_to_numpy(params) # read SVO table SVO_data = read_SVO_table() SVO_filterID = SVO_data['filterID'] # SVO ID # verify that all params are in SVO params_good = [] params_bad = [] for param in params: if param in SVO_data.colnames: params_good.append(param) else: params_bad.append(param) params = params_good if len(params_bad)>0: print(f'Parameters {params_bad} are not in SVO, so will be ignored') # indices in SVO matching input filters mask = np.isin(SVO_filterID, filters) # subset of the SVO table for the desired filters # for clarity add the parameter 'filterID', if not requested params_ori = params.copy() # copy to save input params recognized by SVO if 'filterID' not in params: params.insert(0, 'filterID') SVO_data_sel = SVO_data[mask][params] # dictionary with the table subset filters_params = {} for param in params: filters_params[param] = SVO_data_sel[param].data.data # sort dictionary so filters_params['filterID'] is in the same order as input filters # indices that sort filters_params['filterID'] sort_ind = np.array([], dtype=int) # initialize array of integers for indices for i,filt in enumerate(filters): if filt in filters_params['filterID']: ind = np.where(filters_params['filterID']==filt)[0] sort_ind = np.append(sort_ind, ind) # sort dictionary for param in params: filters_params[param] = filters_params[param][sort_ind] # input filters not in SVO mask_noinSVO = ~np.isin(filters, filters_params['filterID']) filters = var_to_numpy(filters) if len(filters[mask_noinSVO])>0: print(f'Filters {filters[mask_noinSVO]} are not in SVO, so will be ingored') # remove 'filterID' if it wasn't requested if 'filterID' not in params_ori: del filters_params['filterID'] return filters_params
########################## # set wavelength range for the model comparison via chi-square or Bayes techniques def set_fit_wl_range(fit_wl_range, N_spectra, wl_spectra): # handle fit_wl_range if fit_wl_range is None: # define fit_wl_range when not provided fit_wl_range = np.zeros((N_spectra, 2)) # Nx2 array, N:number of spectra and 2 for the minimum and maximum values for each spectrum for i in range(N_spectra): fit_wl_range[i,:] = np.array((wl_spectra[i].min(), wl_spectra[i].max())) elif isinstance(fit_wl_range, list): # when it is a list fit_wl_range = np.array(fit_wl_range).reshape(1,2) else: # fit_wl_range is provided if len(fit_wl_range.shape)==1: fit_wl_range = fit_wl_range.reshape((1, 2)) # reshape fit_wl_range array return fit_wl_range ########################## # set wavelength range for the model comparison via chi-square or Bayes techniques def set_disp_wl_range(disp_wl_range, N_spectra, wl_spectra): # same handling as for fit_wl_range disp_wl_range = set_fit_wl_range(fit_wl_range=disp_wl_range, N_spectra=N_spectra, wl_spectra=wl_spectra) return disp_wl_range ########################## # set wavelength range to cut models for comparisons via chi-square or Bayes techniques def set_model_wl_range(model_wl_range, wl_spectra_min, wl_spectra_max): # define model_wl_range (if not provided) in terms of input spectra coverage if model_wl_range is None: #model_wl_range = np.array([0.9*wl_spectra_min, 1.1*wl_spectra_max]) # add padding to have enough spectral coverage in models model_wl_range = add_pad(wl_spectra_min, wl_spectra_max) # add padding to have enough spectral coverage in models # # it may need an update to work with multiple spectra # if (model_wl_range.min()>=fit_wl_range.min()): # model_wl_range[0] = 0.9*fit_wl_range.min() # add padding to shorter wavelengths # if (model_wl_range.max()<=fit_wl_range.max()): # model_wl_range[1] = 1.1*fit_wl_range.max() # add padding to longer wavelengths return model_wl_range ########################## # set wavelength range for photometry to cut models for comparisons via chi-square or Bayes techniques def set_fit_phot_range(fit_phot_range, filters): if fit_phot_range is None: # define fit_phot_range when not provided # get effective wavelengths from SVO for the input filters SVO_data = read_SVO_table() SVO_filterID = SVO_data['filterID'] # SVO ID SVO_WavelengthEff = u.Quantity(SVO_data['WavelengthEff'].data, u.nm*0.1).to(u.micron).value # effective wavelength in um matching_indices = [] for index, element in enumerate(SVO_filterID): if element in filters: matching_indices.append(SVO_WavelengthEff[index]) lambda_eff_SVO = np.array(matching_indices) fit_phot_range = np.array([lambda_eff_SVO.min(), lambda_eff_SVO.max()]) elif isinstance(fit_phot_range, list): # when it is a list fit_phot_range = np.array(fit_phot_range).reshape(1,2) return fit_phot_range ########################## # read the SVO table with filter properties and save it locally if it doesn't already exist def read_SVO_table(): dir_sep = os.sep # directory separator for the current operating system path_synthetic_photometry = os.path.dirname(__file__)+dir_sep # read zero points for each filter svo_table = f'{path_synthetic_photometry}synthetic_photometry{dir_sep}FPS_info.xml' if os.path.exists(svo_table): svo_data = Table.read(svo_table, format='votable') # open downloaded table with filters' info else: svo_data = Table.read('https://svo.cab.inta-csic.es/files/svo/Public/HowTo/FPS/FPS_info.xml', format='votable') # this SVO link will be updated as soon as new filters are added to FPS. svo_data.write(svo_table, format='votable') # save the table to avoid reading it from the web each time the code is run, which can take a few seconds return svo_data ########################## # function to generate a padding on both sizes of a range def add_pad(min_value, max_value): array = np.array([0.9*min_value, 1.1*max_value]) # add padding to have enough spectral coverage in models return array ########################## # set wavelength of reference associated to a given resolution as the median wavelength for a spectrum def set_lam_res(wl_spectrum): lam_res = np.median(wl_spectrum) #lam_res = round(lam_res) return lam_res ##########################
[docs]def app_to_abs_flux(flux, distance, eflux=None, edistance=None, reverse=False): ''' Description: ------------ Convert apparent fluxes into absolute fluxes considering a distance. Parameters: ----------- - flux : float array or float Fluxes (in any flux units). - distance : float Target distance (in pc). - eflux : float array or float, optional Flux uncertainties (in any flux units). - edistance : float, optional Distance error (in pc). - reverse : {``True``, ``False``}, optional (default ``False``) Label to indicate if apparent fluxes are converted into absolute fluxes (``False``) or vice versa (``True``). Returns: -------- - Dictionary with absolute fluxes and input parameters: - ``'flux_abs'`` : absolute fluxes in the same units as the input fluxes. - ``'eflux_abs'`` : (if ``eflux`` or ``edistance`` is provided) absolute flux uncertainties. - ``'flux_app'`` : input apparent fluxes. - ``'eflux_app'`` : (if provided) input apparent flux errors. - ``'distance'`` : input distance. - ``'edistance'`` : (if provided) input distance error. Example: -------- >>> import seda >>> >>> # input parameters >>> d = 5.00 # pc >>> ed = 0.06 # pc >>> flux = 2.10 # mJy >>> eflux = 0.01 # mJy >>> >>> # convert fluxes >>> seda.utils.app_to_abs_flux(flux=flux, distance=d, eflux=eflux, edistance=ed) {'flux_app': 2.1, 'flux_abs': 0.525, 'eflux_app': 0.01, 'eflux_abs': 0.01284562182223967, 'distance': 5.0, 'edistance': 0.06} Author: Genaro Suárez Date: 2025-05 ''' # set non-provided params if eflux is None: eflux=0 if edistance is None: edistance=0 # use numpy arrays flux = astropy_to_numpy(flux) eflux = astropy_to_numpy(eflux) if not reverse: # apparent fluxes to absolute fluxes # absolute fluxes flux_abs = flux * (distance/10.)**2 # absolute flux errors eflux_abs = flux_abs*np.sqrt((2.*edistance/distance)**2 + (eflux/flux)**2) # output dictionary out = {'flux_app': flux, 'flux_abs': flux_abs} if isinstance(eflux, np.ndarray): # if eflux is an array out['eflux_app'] = eflux out['eflux_abs'] = eflux_abs elif eflux!=0: # if flux is a float non-equal to zero out['eflux_app'] = eflux out['eflux_abs'] = eflux_abs out['distance'] = distance if edistance!=0: out['edistance'] = edistance out['eflux_abs'] = eflux_abs # if eflux_abs was stored above it would replace it without issues else: # absolute fluxes to apparent fluxes # apparent fluxes flux_app = flux / (distance/10.)**2 # absolute flux errors eflux_app = flux_app*np.sqrt((2.*edistance/distance)**2 + (eflux/flux)**2) # output dictionary out = {'flux_app': flux_app, 'flux_abs': flux} if isinstance(eflux, np.ndarray): # if eflux is an array out['eflux_app'] = eflux_app out['eflux_abs'] = eflux elif eflux!=0: # if flux is a float non-equal to zero out['eflux_app'] = eflux_app out['eflux_abs'] = eflux out['distance'] = distance if edistance!=0: out['edistance'] = edistance out['eflux_app'] = eflux_app # if eflux_app was stored above it would replace it without issues return out
##########################
[docs]def app_to_abs_mag(magnitude, distance, emagnitude=None, edistance=None): ''' Description: ------------ Convert apparent magnitudes into absolute fluxes considering a distance, assuming no extinction. Parameters: ----------- - magnitude : float array or float Magnitude (in any magnitude units). - distance : float Target distance (in pc). - emagnitude : float array or float, optional Magnitude uncertainties (in any magnitude units). - edistance : float, optional Distance error (in pc). Returns: -------- - Dictionary with absolute mages and input parameters: - ``'mag_abs'`` : absolute mages in the same units as the input mages. - ``'emag_abs'`` : (if ``emag`` or ``edistance`` is provided) absolute mag uncertainties. - ``'mag_app'`` : input apparent mages. - ``'emag_app'`` : (if provided) input apparent mag errors. - ``'distance'`` : input distance. - ``'edistance'`` : (if provided) input distance error. Example: -------- >>> import seda >>> magnitude, emagnitude = 17.05, 0.03 # mag >>> distance, edistance = 14.43, 0.79 # pc >>> >>> seda.utils.app_to_abs_mag(magnitude=magnitude, emagnitude=emagnitude, >>> distance=distance, edistance=edistance) {'mag_app': 17.05, 'mag_abs': 16.253668344532528, 'emag_app': 0.03, 'emag_abs': 0.12260857671946264, 'distance': 14.43, 'edistance': 0.79} Author: Genaro Suárez Date: 2025-04-28 ''' # set non-provided params if emagnitude is None: emagnitude=0 if edistance is None: edistance=0 # use numpy arrays magnitude = astropy_to_numpy(magnitude) emagnitude = astropy_to_numpy(emagnitude) distance = astropy_to_numpy(distance) edistance = astropy_to_numpy(edistance) # absolute magnitudes mag_abs = magnitude - 5.*np.log10(distance) + 5. # absolute magnitude errors emag_abs = np.sqrt(emagnitude**2 + ((5./np.log(10))*edistance/distance)**2) # output dictionary out = {'mag_app': magnitude, 'mag_abs': mag_abs} if isinstance(emagnitude, np.ndarray): # if emag is an array out['emag_app'] = emagnitude out['emag_abs'] = emag_abs elif emagnitude!=0: # if mag is a float non-equal to zero out['emag_app'] = emagnitude out['emag_abs'] = emag_abs out['distance'] = distance if isinstance(edistance, np.ndarray): # if edistance is an array out['edistance'] = edistance elif edistance!=0: # if distance is a float non-equal to zero out['edistance'] = edistance return out
##########################
[docs]def spt_to_teff(spt, spt_type, ref=None): ''' Description: ------------ Convert spectral type into effective temperature using relationships in the literature. Parameters: ----------- - spt : float, str, array, or list Spectral types given as conventional letters or number, as indicated in ``spt_type``. Convention between spectral type and float: M9=9, L0=10, ..., T0=20, ... - spt_type : str Label indicating whether the input spectral type is a string ('str') or a number ('float') - ref : str, optional (default 'F15') Reference for the spectral type-temperature relationships. 'F15': Filippazzo et al. (2015), valid for M6-T9 (6-29) 'K21': Kirkpatrick et al. (2021), valid for M7-Y2 (7-32) Returns: -------- - teff : array Effective temperature (in K) corresponding to the input spectral types according to the ``ref`` reference. Example: -------- >>> import seda >>> >>> # spectral type as a number >>> spt = [15, 25] # for L5 and T5 types >>> seda.utils.spt_to_teff(spt, spt_type='float') # Teff in K array([1581.053125, 1033.328125]) >>> >>> # spectral type as a string >>> spt = ['L5', 'T5'] >>> seda.utils.spt_to_teff(spt, spt_type='str') # Teff in K array([1581.053125, 1033.328125]) Author: Genaro Suárez Date: 2023-02 ''' # make sure the input spt is a numpy array spt = var_to_numpy(spt) # verify that spt_type is provided try: spt_type except: raise Exception(f'"spt_type" must be provided') # assigned default values if spt_type is None: spt_type = 'float' # spectral type as float number if ref is None: ref = 'F15' # Filippazzo et al. (2015) # save the original spt spt_ori = spt.copy() # if spt is provided as strings, convert it into a float if spt_type=='str': for i in range(len(spt)): spt[i] = spt_str_to_float(spt[i]) # convert array of string to an array of floats spt = spt.astype(float) # verify the spectral type is within the valid range for the indicated relationship # valid range for available relationships if ref=='F15': spt_valid = ['M6', 'T9'] #[6, 29] if ref=='K21': spt_valid = ['M7', 'Y2'] #[7, 32] (Table 13 in K21 says the range is L0-Y2, but the fit in Fig. 22 covers M7-Y2) # verification spt_valid_flt = [spt_str_to_float(spt_valid[0]), spt_str_to_float(spt_valid[1])] for i,sp in enumerate(spt): if sp<spt_valid_flt[0] or sp>spt_valid_flt[1]: print(f'Caveat for spt={spt_ori[i]}: it is out of the "{ref}" coverage, so Teff was extrapolated.') print(f' The valid spt range is {spt_valid} ({spt_valid_flt})') # polynomial fits if ref=='F15': # coefficients of the polynomial c0 = 4.747e+03 c1 = -7.005e+02 c2 = 1.155e+02 c3 = -1.191e+01 c4 = 6.318e-01 c5 = -1.606e-02 c6 = 1.546e-04 teff = c0 + c1*spt + c2*spt**2 + c3*spt**3 + c4*spt**4 + c5*spt**5 + c6*spt**6 if ref=='K21': teff = np.zeros(len(spt)) for i,sp in enumerate(spt): sp = sp-10 # L0 in K21 is 0 instead of 10 as in F15 if sp<=8.75: # L0-L8.75 c0 = 2.2375e+03 c1 = -1.4496e+02 c2 = 4.0301e+00 elif (sp>8.75) & (sp<=14.75): # L8.75-T4.75 c0 = 1.4379e+03 c1 = -1.8309e+01 c2 = 0 else: # T4.75-Y2 c0 = 5.1413e+03 c1 = -3.6865e+02 c2 = 6.7301e+00 teff[i] = c0 + c1*sp + c2*sp**2 return teff
##########################
[docs]def teff_to_spt(teff, ref=None): ''' Description: ------------ Estimate the spectral type (returned as a string) from effective temperature, using numerical inversion of spt_to_teff() and the same spectral-type conventions. Parameters: ----------- - teff : float, array Effective temperatures (K) - ref : str, optional (default 'F15') Reference for the spectral type-temperature relationships. 'F15': Filippazzo et al. (2015), valid for M6-T9 (6-29) 'K21': Kirkpatrick et al. (2021), valid for M7-Y2 (7-32) Returns: -------- - spt_str : array Estimated spectral type(s), formatted like 'M6', 'L3.5', 'T8', 'Y1', etc. Example: -------- >>> import seda >>> >>> # effective temperature to spectral type >>> teff = [2000, 1500, 1000] # K >>> seda.utils.teff_to_spt(teff) array(['L1.7', 'L5.8', 'T5.4'], dtype='<U4') Author: Genaro Suárez Date: 2025-12-12 ''' # assigned default values if ref is None: ref = 'F15' # Filippazzo et al. (2015) # Valid spectral-type numerical ranges if ref == 'F15': spt_min, spt_max = 6, 29 # M6-T9 elif ref == 'K21': spt_min, spt_max = 7, 32 # M7-Y2 else: raise ValueError('"ref" must be "F15" or "K21"') # construct dense numerical grid of spectral types spt_grid = np.linspace(spt_min, spt_max, 2000) # get Teff on this grid using spt_to_teff teff_grid = spt_to_teff(spt_grid, spt_type='float', ref=ref) # build inverse mapping (Teff -> numerical spt) inv_interp = interp1d(teff_grid, spt_grid, fill_value='extrapolate', assume_sorted=False) # force array teff = np.array(teff, ndmin=1) spt_float = inv_interp(teff) # round spt_float to a few decimal to avoid having # e.g., 19.99999945 -> L10 instead of T0 spt_float = np.round(spt_float, 5) spt_str = [spt_float_to_str(sf) for sf in spt_float] # decide output: single value or list if len(spt_str) == 1: output = spt_str[0] else: output = spt_str # output list as numpy array output = np.array(output) return output
##########################
[docs]def parallax_to_distance(parallax, eparallax): ''' Description: ------------ Obtain distance as the inverse of the parallax. Parameters: ----------- - parallax : float, array Parallax in mas. - eparallax : float, array Parallax uncertainty in mas. Returns: -------- - distance : float, array Distance in pc. - edistance : float, array Distance uncertainty in pc. Example: -------- >>> import seda >>> parallax = 175.2 # mas >>> eparallax = 1.7 # mas >>> # distance >>> seda.utils.parallax_to_distance(parallax=parallax, eparallax=eparallax) (5.707762557077626, 0.055383540793561434) Author: Genaro Suárez Date: 2025-04-28 ''' # convert input arrays into numpy array, if needed parallax = astropy_to_numpy(parallax ) eparallax = astropy_to_numpy(eparallax ) distance = 1000. / parallax # in pc if eparallax is not None: edistance = distance * eparallax / parallax # pc return distance, edistance
##########################
[docs]def convert_photometric_table(table, save_table=False, table_name=None): ''' Description: ------------ Convert a table with photometry in separate columns to a table with all magnitudes or fluxes in a column and all errors in another column. Parameters: ----------- - table : astropy table Table with photometric measurements and their corresponding errors listed in separate columns. The magnitude or flux columns must be labeled with the corresponding SVO filter names. The table must include only photometry, structured such that each magnitude or flux column is immediately followed by its associated error column i.e. magnitude1, error1, magnitude2, error2, etc. Note: names of columns with photometric errors are irrelevant. - save_table : {``True``, ``False``}, optional (default ``False``) Locally store (``True``) or do not store (``False``) the output dictionary as an ascii table using PrettyTable. - table_name : str, optional File name to save the output table (it can include a path e.g. my_path/output_table.dat). Default name is 'photometry_prettytable.dat'. Returns: -------- Dictionary with three keys: SVO filter names, magnitudes or fluxes, and corresponding errors. Example: -------- >>> import seda >>> from astropy.io import ascii >>> >>> # path to the seda package >>> path_seda = os.path.dirname(os.path.dirname(seda.__file__)) >>> # read ascii table with photometry for 0415 >>> phot_file = path_seda+'/docs/notebooks/data/0415-0935_photometry.dat' >>> photometry = ascii.read(phot_file) >>> >>> # keep columns with magnitudes of interest >>> photometry = photometry['2MASS/2MASS.J', '2MASS/2MASS.eJ', >>> '2MASS/2MASS.H', '2MASS/2MASS.eH', >>> '2MASS/2MASS.Ks', '2MASS/2MASS.eKs'] >>> >>> # convert table >>> seda.utils.convert_photometric_table(photometry) {'filters': array(['2MASS/2MASS.J', '2MASS/2MASS.H', '2MASS/2MASS.Ks'], dtype='<U32'), 'phot': array([15.695, 15.537, 15.429]), 'ephot': array([0.058, 0.113, 0.201])} Author: Genaro Suárez Date: 2025-09-07 ''' # initialize arrays to save filter names, values, and uncertainties phot = np.array([]) ephot = np.array([]) filters = np.array([]) # create arrays for all values, errors, and filters for i,col in enumerate(table.colnames): # for each column if i%2 == 0: # only even columns (the ones with photometric values) phot = np.append(phot, table[col]) filters = np.append(filters, col) else: # odd columns (the ones with uncertainties) ephot = np.append(ephot, table[col]) # save all as a dictionary out = {'filters': filters, 'phot': phot, 'ephot': ephot} if save_table: if table_name is None: table_name = 'photometry_prettytable.dat' save_prettytable(my_dict=out, table_name=table_name) return out
##########################
[docs]def merge_MRS(fits_files): ''' Description: ------------ Merge spectra from different MRS channels and grating settings Parameters: ----------- - fits_files : array, list File names (with full path) to the spectra to be merged. It is not necessary to include all four channels and three grating settings (short, medium, and long). The function works with any combination of channels and grating settings. Returns: -------- Dictionary with the merged spectrum: - ``'channel_grating'``: channel-grating setting label - ``'channel_grating_range'``: wavelength range used for each channel-grating setting - ``'wl_merge'``: wavelength value at which two consecutive overlapping spectra where merged - ``'wl'``: wavelength in micron of the merged spectrum - ``'flux_Jy'``: flux in Jy of the merged spectrum - ``'eflux_Jy'``: flux uncertainties in Jy of the merged spectrum - ``'flux_erg/s/cm2/A'``: flux in erg/s/cm2/A of the merged spectrum - ``'eflux_erg/s/cm2/A'``: flux uncertainties in erg/s/cm2/A of the merged spectrum Example: -------- >>> import seda >>> >>> # list of fits files to be merged >>> fits_files = ['jw05474-o001_t001_miri_ch1-long_x1d.fits', ... 'jw05474-o001_t001_miri_ch1-medium_x1d.fits', ... 'jw05474-o001_t001_miri_ch1-short_x1d.fits', ... 'jw05474-o001_t001_miri_ch2-long_x1d.fits', ... 'jw05474-o001_t001_miri_ch2-medium_x1d.fits', ... 'jw05474-o001_t001_miri_ch2-short_x1d.fits', ... 'jw05474-o001_t001_miri_ch3-long_x1d.fits', ... 'jw05474-o001_t001_miri_ch3-medium_x1d.fits', ... 'jw05474-o001_t001_miri_ch3-short_x1d.fits'] >>> # merge files >>> out_merge_MRS = seda.utils.merge_MRS(fits_files) Author: Genaro Suárez Date: 2025-10-13 ''' # directory separator for the current operating system dir_sep = os.sep # convert to numpy array if input files are a list if isinstance (fits_files, list): fits_files = np.array(fits_files) # combine channels file_wl_min = np.zeros(len(fits_files)) for i,file in enumerate(fits_files): # minimum wavelength cover by each fits file file_wl_min[i] = Spectrum1D.read(file).wavelength.to(u.um).value.min() # rearrange fits files from the one covering the minimum # wavelength to the one with the maximum wavelength mask_ind = np.argsort(file_wl_min) fits_files = fits_files[mask_ind] # read each fits file wl_MRS_each = [] flux_MRS_each = [] eflux_MRS_each = [] for i,file in enumerate(fits_files): out_read_JWST_spectrum = Spectrum1D.read(file) wl_MRS_each.append(out_read_JWST_spectrum.wavelength.to(u.um).value) # um flux_MRS_each.append(out_read_JWST_spectrum.flux.value) # Jy eflux_MRS_each.append(out_read_JWST_spectrum.uncertainty.array) # Jy # merge all fits file spectra wl_MRS = np.array([]) flux_MRS = np.array([]) eflux_MRS = np.array([]) wl_merge = np.zeros(len(fits_files)-1) channel_grating_range = np.zeros((len(fits_files), 2)) channel_grating = np.array([]) for i,file in enumerate(fits_files): sfile = file.split(dir_sep)[-1] # file name without full path channel_grating = np.append(channel_grating, sfile.split('_')[-2]) # label with corresponding channel and grating setting # mean wavelength in the overlapping region between two consecutive individual spectra if i<(len(fits_files)-1): # avoid last spectrum wl_min_overlap = wl_MRS_each[i+1].min() wl_max_overlap = wl_MRS_each[i].max() wl_merge[i] = np.mean([wl_min_overlap, wl_max_overlap]) if i==0: # for the first spectrum mask = wl_MRS_each[i]<=wl_merge[i] channel_grating_range[i,:] = np.array([wl_MRS_each[i].min(), wl_merge[i]]) elif i==(len(fits_files))-1: # for the last spectrum mask = wl_MRS_each[i]>wl_merge[i-1] channel_grating_range[i,:] = np.array([wl_merge[i-1], wl_MRS_each[i].max()]) else: # for intermediate spectra mask = (wl_MRS_each[i]<=wl_merge[i]) & (wl_MRS_each[i]>wl_merge[i-1]) channel_grating_range[i,:] = np.array([wl_merge[i-1], wl_merge[i]]) wl_MRS = np.append(wl_MRS, wl_MRS_each[i][mask]) flux_MRS = np.append(flux_MRS, flux_MRS_each[i][mask]) eflux_MRS = np.append(eflux_MRS, eflux_MRS_each[i][mask]) # sort wl_MRS sort_ind = np.argsort(wl_MRS) wl_MRS = wl_MRS[sort_ind] flux_Jy_MRS = flux_MRS[sort_ind] eflux_Jy_MRS = eflux_MRS[sort_ind] # convert fluxes from Jy to erg/s/cm2/A flux_MRS = (flux_Jy_MRS*u.Jy).to(u.erg/u.s/u.cm**2/(u.nm*0.1), equivalencies=u.spectral_density(wl_MRS*u.micron)).value eflux_MRS = (eflux_Jy_MRS*u.Jy).to(u.erg/u.s/u.cm**2/(u.nm*0.1), equivalencies=u.spectral_density(wl_MRS*u.micron)).value # output dictionary out = {'channel_grating': channel_grating, 'channel_grating_range': channel_grating_range, 'wl_merge': wl_merge, 'wl': wl_MRS, 'flux_Jy': flux_Jy_MRS, 'eflux_Jy': eflux_Jy_MRS, 'flux_erg/s/cm2/A': flux_MRS, 'eflux_erg/s/cm2/A': eflux_MRS} return out
##########################
[docs]def fill_gap_spectrum(wl, flux, eflux, disp_threshold=None): ''' Description: ------------ Function to identify and fill a gap in a spectrum. It does a linear interpolation between the median flux before and after the gap and fill the gap with data points with the median wavelength step. Parameters: ----------- - wl : array Wavelength (any units) of input spectrum. - flux : array Fluxes (any units) of input spectrum. - eflux : array Flux uncertainties (any units) of input spectrum. - disp_threshold : float, optional Wavelength dispersion threshold used to identify gaps. Data points with a dispersion above this limit are classified as gaps. Default value is 50. Returns: -------- Dictionary with: - ``'gap_region'``: minimum and maximum wavelengths of the gap - ``'wl_nogap'``: wavelengths after filling the gap - ``'flux_nogap'``: fluxes after filling the gap - ``'eflux_nogap'``: flux errors after filling the gap Author: Genaro Suárez Date: 2025-01-04 ''' if disp_threshold is None: disp_threshold=50 # find the gap wl_disp = wl[1:] - wl[:-1] # (um) wavelength dispersion of spectra wl_disp = np.append(wl_disp, wl_disp[-1]) # add an element equal to the last row to keep the same shape as wl # identify gaps in the data mask = wl_disp>=disp_threshold*np.median(wl_disp) if np.any(mask): # gaps detected if np.sum(mask)==1: # only one gap detected # wl array index at the beginning and end of the gap wl_gap_before_ind = np.where(wl_disp==wl_disp[mask])[0][0] wl_gap_after_ind = wl_gap_before_ind+1 # wavelength right before and after the gap wl_gap_before = wl[wl_gap_before_ind] wl_gap_after = wl[wl_gap_after_ind] wl_gap_size = wl_gap_after - wl_gap_before # size of the gap print(f'A gap was identified between {wl_gap_before} and {wl_gap_after} micron') else: # more than one gap detected print(f'{np.sum(mask)} paps detected') else: raise Exception(f'No gap found in the spectrum') # median flux before and after the gap pad_gap = 20 # number of data points to obtain the median flux before and after the gap flux_gap_before_median = np.median(flux[wl_gap_before_ind-pad_gap:wl_gap_before_ind]) flux_gap_after_median = np.median(flux[wl_gap_after_ind:wl_gap_after_ind+pad_gap]) # data to fill the gap wl_gap = np.arange(wl_gap_before, wl_gap_after, np.median(wl_disp)) # wavelengths with the median wavelength step flux_gap = ((flux_gap_after_median-flux_gap_before_median)/(wl_gap_after-wl_gap_before)) * (wl_gap-wl_gap_after) + flux_gap_after_median # fluxes from a line fit # flux errors in a window before and after the gap eflux_window = eflux[wl_gap_before_ind-pad_gap:wl_gap_after_ind+pad_gap] if isinstance(eflux_window, astropy.table.column.MaskedColumn): # in case there are masked values in flux error window unmasked = np.logical_not(eflux_window.mask).nonzero() eflux_gap_median = np.median(eflux_window[unmasked]) else: eflux_gap_median = np.median(eflux_window) eflux_gap = np.repeat(eflux_gap_median, len(flux_gap)) # attached interpolation to the NIRSpec spectrum wl_nogap = np.append(wl.data, wl_gap) flux_nogap = np.append(flux.data, flux_gap) eflux_nogap = np.append(eflux.data.data, eflux_gap) # sort wl sort_wl = np.argsort(wl_nogap) wl_nogap = wl_nogap[sort_wl] flux_nogap = flux_nogap[sort_wl] eflux_nogap = eflux_nogap[sort_wl] out = {'gap_region': np.array([wl_gap_before, wl_gap_after]), 'wl_nogap': wl_nogap, 'flux_nogap': flux_nogap, 'eflux_nogap': eflux_nogap} return out
##########################
[docs]def read_prettytable(filename): ''' Description: ------------ Read ascii table created with ``prettytable``. Parameters: ----------- - filename : str PrettyTable table. Returns: -------- Astropy table with the information in the input file. Example: -------- >>> import seda >>> >>> out = seda.utils.open_chi2_table('Sonora_Elf_Owl_chi2_minimization_multiple_spectra.dat') Author: Rocio Kiman Date: 2025-02-11 ''' # open table content with open(filename, 'r') as f: # read all lines lines = f.readlines() # keep only lines that do not start with "+" lines_sel = [] for line in lines: if not line.startswith('+'): lines_sel.append(line) # read the cleaned data using ascii.read table = ascii.read(lines_sel, format='fixed_width') # remove last column with empty information last_column_name = table.colnames[-1] table.remove_column(last_column_name) # convert table into a dictionary my_dict = {} for col in table.columns: my_dict[col] = table[col].data return my_dict
########################## # save dictionary as ascii table using prettytable def save_prettytable(my_dict, table_name): # create a PrettyTable object table = PrettyTable() # add the dictionary keys as column headers table.field_names = my_dict.keys() # ensure every value is iterable (wrap scalars in a list) values = [] for v in my_dict.values(): if isinstance(v, (list, tuple, np.ndarray)): values.append(v) else: values.append([v]) # add the dictionary values as rows for row in zip(*values): table.add_row(row) # get the ASCII string representation ascii_table = table.get_string() # save file with open(table_name, 'w') as f: f.write(ascii_table) ########################## # convert spectral type from string to float def spt_str_to_float(spt): if 'M' in spt: spt = spt.replace('M', '0') if 'L' in spt: spt = spt.replace('L', '1') if 'T' in spt: spt = spt.replace('T', '2') if 'Y' in spt: spt = spt.replace('Y', '3') try: spt = float(spt) except: raise Exception(f'"{spt}" is not recognized. Overall, "spt" can be M, L, T, and Y with any subtypes.') return spt ########################## # convert numerical spt to string spt def spt_float_to_str(spt): spt = float(spt) # determine prefix and subtype if 0 <= spt < 10: prefix = 'M'; subtype = spt - 0 elif 10 <= spt < 20: prefix = 'L'; subtype = spt - 10 elif 20 <= spt < 30: prefix = 'T'; subtype = spt - 20 elif 30 <= spt < 40: prefix = 'Y'; subtype = spt - 30 else: output = f'OUT_OF_RANGE_{spt:.2f}' return output # this is still required for out-of-range values # integer or fractional subtype output if abs(subtype - round(subtype)) < 1e-6: output = f"{prefix}{int(round(subtype))}" else: output = f"{prefix}{subtype:.1f}".rstrip('0').rstrip('.') return output ########################## # make sure a variable is a list def var_to_list(x): if isinstance(x, str): x = [x] if isinstance(x, np.ndarray): x = x.tolist() if isinstance(x, float): x = [x] return x ########################## # make sure a variable is a numpy array def var_to_numpy(x): if isinstance(x, (str, int, float)): x = np.array([x]) if isinstance(x, list): x = np.array(x) return x ########################## # convert an astropy array into a numpy array def astropy_to_numpy(x): if x is None: return None # keep None as-is # if the variable is an astropy Column if isinstance(x, Column): if isinstance(x, MaskedColumn): # if MaskedColumn x = x.filled(np.nan) # fill masked values with nan x = x.data else: # if column x = x.data # if the variable is an astropy Quantity (with units) if isinstance(x, u.Quantity): x = x.value # convert to numpy array safely x = np.array(x) # if numeric, force float64 if np.issubdtype(x.dtype, np.number): x = x.astype(np.float64) return x ###################### # Ensure all folder paths in a list end with the OS-specific path separator. def ensure_trailing_separator(x): # for each path for i,p in enumerate(x): # check if path already ends with separator if not p.endswith(os.sep): # update path with path separator if needed x[i] = p + os.sep return x ######################
[docs]def normalize_flux(flx: ArrayLike) -> np.ndarray: """ Description: ------------ Median-normalize a flux array while ignoring non-finite values. The function divides the input flux by the median of the finite (non-NaN, non-infinite) values. This is commonly used to place spectra on a relative scale before computing spectral indices. Parameters: ----------- flx : array-like Input flux array. Can contain NaNs or infinite values, which are ignored when computing the median. Returns ------- normalized_flux : ndarray Flux array divided by the median of its finite values. If the median is zero, the original array is returned unchanged. Raises: ------- ValueError If the input array contains no finite values. Notes: ------ This is a simple normalization intended for index-based analysis. It does not perform any continuum fitting or band-specific scaling. Examples: --------- >>> import numpy as np >>> flux = np.array([1.0, 2.0, 3.0, np.nan]) >>> normalize_flux(flux) array([0.5, 1. , 1.5, nan]) """ flx = np.asarray(flx, dtype=float) m = np.isfinite(flx) if not np.any(m): raise ValueError("Flux array contains no finite values to normalize.") med = np.median(flx[m]) if med == 0: return flx return flx / med
#+++++++++++++++++++++++++++ # count the total number of data points in all input spectra def input_data_stats(wl_spectra): N_spectra = len(wl_spectra) # count the total number of data points in all input spectra N_datapoints = 0 for i in range(N_spectra): N_datapoints = N_datapoints + wl_spectra[i].size # minimum and maximum wavelength from the input spectra min_tmp1 = min(wl_spectra[0]) for i in range(N_spectra): min_tmp2 = min(wl_spectra[i]) if min_tmp2<min_tmp1: wl_spectra_min = min_tmp2 min_tmp1 = min_tmp2 else: wl_spectra_min = min_tmp1 max_tmp1 = max(wl_spectra[0]) for i in range(N_spectra): max_tmp2 = max(wl_spectra[i]) if max_tmp2>max_tmp1: wl_spectra_max = max_tmp2 max_tmp1 = max_tmp2 else: wl_spectra_max = max_tmp1 out = {'N_datapoints': N_datapoints, 'wl_spectra_min': wl_spectra_min, 'wl_spectra_max': wl_spectra_max} return out #+++++++++++++++++++++++++++ # find the data point before and after a given value def find_two_nearest(array, value): diff = array - value if all(diff<0) or all(diff>0): # value is out of the array coverage raise Exception('Parameter does not cover by the model grid') elif any(diff==0): # if the value is a grid node near_low = near_high = array[diff==0][0] else: # if the value is between two grid nodes near_low = array[diff<0].max() near_high = array[diff>0].min() return np.array([near_low, near_high]) #+++++++++++++++++++++++++++ # load a dictionary if need it def load_output_fit(output_fit): # if it is already a dictionary, use it directly if isinstance(output_fit, dict): data = output_fit # otherwise assume it is a file path and load it else: with open(output_fit, "rb") as f: data = pickle.load(f) return data #+++++++++++++++++++++++++++ def find_two_around_node(array, value): diff = array - value if any(diff<0): # if there are grid nodes smaller than the value near_low = array[diff<0].max() else: # if the value is the smallest grid point near_low = array.min() if any(diff>0): # if there are grid nodes greater than the value near_high = array[diff>0].min() else: # if the value is the greatest grid point near_high = array.max() return np.array([near_low, near_high]) #+++++++++++++++++++++++++++ # maximum number of decimals in an array elements def max_decimals(arr): max_places = 0 for num in arr: if isinstance(num, float): # if the element is a float num_str = str(num) decimal_part = num_str.split('.')[1] # select decimals as a string max_places = max(max_places, len(decimal_part)) # compare decimals return max_places #+++++++++++++++++++++++++++
[docs]def np_trapz(y, x=None, dx=1.0, axis=-1): """Version-safe trapezoidal integration for numpy 3.9-3.11+""" if hasattr(np, 'trapezoid'): return np.trapezoid(y, x=x, dx=dx, axis=axis) else: return np.trapz(y, x=x, dx=dx, axis=axis)