Source code for taurex.binning.fluxbinner

"""Module for the flux binner class"""

import typing as t

import numpy as np
import numpy.typing as npt

from taurex import OutputSize
from taurex.util import compute_bin_edges

from ..types import ModelOutputType
from .binner import BinDownType, BinnedSpectrumType, Binner


[docs] class FluxBinner(Binner): """ Bins to a wavenumber grid given by ``wngrid`` using a more accurate method that takes into account the amount of contribution from each native bin. This method also handles cases where bins are not continuous and/or overlapping. Parameters ---------- wngrid: :obj:`array` Wavenumber grid wngrid_width: :obj:`array`, optional Must have same shape as ``wngrid`` Full bin widths for each wavenumber grid point given in ``wngrid``. If not provided then this is automatically computed from ``wngrid``. """ def __init__( self, wngrid: npt.NDArray[np.float64], wngrid_width: t.Optional[npt.NDArray[np.float64]] = None, ): super().__init__() sort_grid = wngrid.argsort() self._wngrid = wngrid[sort_grid] self._wngrid_width = wngrid_width if self._wngrid_width is None: self._wngrid_width = compute_bin_edges(self._wngrid)[-1] elif hasattr(self._wngrid_width, "__len__"): if len(self._wngrid_width) != len(self._wngrid): raise ValueError( "Wavenumber width should be signel value or " "same shape as wavenumber grid" ) self._wngrid_width = wngrid_width[sort_grid] if not hasattr(self._wngrid_width, "__len__"): self._wngrid_width = np.ones_like(self._wngrid) * self._wngrid_width
[docs] def bindown( self, wngrid: npt.NDArray[np.float64], spectrum: npt.NDArray[np.float64], grid_width: t.Optional[npt.NDArray[np.float64]] = None, error: t.Optional[npt.NDArray[np.float64]] = None, ) -> BinDownType: """Bins down spectrum. Parameters ---------- wngrid : :obj:`array` The wavenumber grid of the spectrum to be binned down. spectrum: :obj:`array` The spectra we wish to bin-down. Must be same shape as ``wngrid``. grid_width: :obj:`array`, optional Wavenumber grid full-widths for the spectrum to be binned down. Must be same shape as ``wngrid``. Optional. error: :obj:`array`, optional Associated errors or noise of the spectrum. Must be same shape as ``wngrid``.Optional parameter. Returns ------- binned_wngrid : :obj:`array` New wavenumber grid spectrum: :obj:`array` Binned spectrum. grid_width: :obj:`array` New grid-widths error: :obj:`array` or None Binned error if given else ``None`` """ sorted_input = wngrid.argsort() wngrid = wngrid[sorted_input] spectrum = spectrum[..., sorted_input] if error is not None: error = error[..., sorted_input] bin_spectrum = np.zeros(spectrum[..., 0].shape + self._wngrid.shape) if error is not None: bin_error = np.zeros(spectrum[..., 0].shape + self._wngrid.shape) else: bin_error = None old_spect_wn = wngrid old_spect_flux = spectrum old_spect_err = error old_spect_width = grid_width if old_spect_width is None: old_spect_width = compute_bin_edges(old_spect_wn)[-1] old_spect_min = old_spect_wn - old_spect_width / 2 old_spect_max = old_spect_wn + old_spect_width / 2 new_spec_lhs = self._wngrid new_spec_rhs = self._wngrid_width new_spec_wn = self._wngrid new_spec_wn_min = new_spec_lhs - new_spec_rhs / 2 new_spec_wn_max = new_spec_lhs + new_spec_rhs / 2 save_start = 0 save_stop = 0 for idx, res in enumerate(zip(new_spec_wn, new_spec_wn_min, new_spec_wn_max)): wn, wn_min, wn_max = res sum_spectrum = 0 sum_noise = 0 sum_weight = 0 save_start = np.searchsorted(old_spect_max, wn_min, side="right") save_stop = np.searchsorted(old_spect_min[1:], wn_max, side="right") save_stop = min(save_stop, old_spect_min.shape[0] - 1) save_start = min(save_start, old_spect_min.shape[0] - 1) if ( not wn_min <= old_spect_max[save_start] or not old_spect_min[save_stop] <= wn_max ): continue spect_min = old_spect_min[save_start : save_stop + 1] spect_max = old_spect_max[save_start : save_stop + 1] weight = (np.minimum(wn_max, spect_max) - np.maximum(spect_min, wn_min)) / ( wn_max - wn_min ) sum_weight = np.sum(weight) sum_spectrum = np.sum( weight / sum_weight * old_spect_flux[..., save_start : save_stop + 1], axis=-1, ) if error is not None: sum_noise = np.sum( weight * weight * old_spect_err[..., save_start : save_stop + 1] ** 2, axis=0, ) sum_noise = np.sqrt(sum_noise / sum_weight / sum_weight) bin_spectrum[..., idx] = sum_spectrum if error is not None: bin_error[idx] = sum_noise return self._wngrid, bin_spectrum, bin_error, self._wngrid_width
[docs] def generate_spectrum_output( self, model_output: ModelOutputType, output_size: t.Optional[OutputSize] = OutputSize.heavy, ) -> BinnedSpectrumType: output = super().generate_spectrum_output(model_output, output_size=output_size) output["binned_wngrid"] = self._wngrid output["binned_wlgrid"] = 10000 / self._wngrid output["binned_wnwidth"] = self._wngrid_width output["binned_wlwidth"] = 1.0 / self._wngrid_width return output