"""Optimizer using multinest library."""
import os
import pathlib
import typing as t
import numpy as np
import numpy.typing as npt
from taurex import OutputSize
from taurex.model import ForwardModel
from taurex.mpi import barrier, get_rank
from taurex.output import OutputGroup
from taurex.spectrum import BaseSpectrum
from taurex.types import AnyValType, PathLike
from taurex.util import (
quantile_corner,
read_error_into_dict,
read_table,
recursively_save_dict_contents_to_output,
)
from .optimizer import FitParam, Optimizer
NestMarginalOutput = t.TypedDict(
"NestMarginalOutput",
{
"median": float,
"sigma": float,
"1sigma": float,
"2sigma": float,
"3sigma": float,
"5sigma": float,
"q75%": float,
"q25%": float,
"q95%": float,
"q01%": float,
"q90%": float,
"q10%": float,
},
)
"""Marginal output."""
NestModeStatsOutput = t.TypedDict(
"NestModeStatsOutput",
{
"Strictly Local Log-Evidence": t.Optional[float],
"Strictly Local Log-Evidence error": t.Optional[float],
"Local Log-Evidence": t.Optional[float],
"Local Log-Evidence error": t.Optional[float],
"mean": t.List[float],
"sigma": t.List[float],
"maximum": t.List[float],
"maximum a posterior": t.List[float],
},
)
"""Nested Mode stats output"""
NestStatsOutput = t.TypedDict(
"NestStatsOutput",
{
"modes": t.List[NestModeStatsOutput],
"marginals": t.List[NestMarginalOutput],
"global evidence": t.Optional[float],
"global evidence error": t.Optional[float],
},
)
[docs]
class NestFitParam(FitParam):
"""Fit parameter for multinest."""
nest_map: float
nest_median: float
[docs]
class NestSolutionOutput(t.TypedDict):
type: str
local_logE: t.Tuple[float, float] # noqa: N815
weights: npt.NDArray[np.float64]
tracedata: npt.NDArray[np.float64]
fit_params: t.Dict[str, NestFitParam]
[docs]
class NestOutputType(t.TypedDict):
"""Multinest output type"""
NEST_stats: NestStatsOutput
global_logE: t.Tuple[float, float] # noqa: N815
solutions: t.Dict[str, NestSolutionOutput]
[docs]
class MultiNestOptimizer(Optimizer):
def __init__(
self,
multi_nest_path: t.Optional[PathLike] = None,
observed: t.Optional[BaseSpectrum] = None,
model: t.Optional[ForwardModel] = None,
sampling_efficiency: t.Optional[t.Literal["parameter"]] = "parameter",
num_live_points: t.Optional[int] = 1500,
max_iterations: t.Optional[int] = 0,
search_multi_modes: t.Optional[bool] = True,
num_params_cluster: t.Optional[int] = None,
maximum_modes: t.Optional[int] = 100,
constant_efficiency_mode: t.Optional[bool] = False,
evidence_tolerance: t.Optional[float] = 0.5,
mode_tolerance: t.Optional[float] = -1e90,
importance_sampling: t.Optional[bool] = False,
resume: t.Optional[bool] = False,
only_finish: t.Optional[bool] = False,
n_iter_before_update: t.Optional[int] = 100,
multinest_prefix: t.Optional[str] = "1-",
verbose_output: t.Optional[bool] = True,
sigma_fraction: t.Optional[float] = 0.1,
):
r"""Setup multinest optimizer.
Parameters
----------
multi_nest_path:
Path to multinest output directory, by default None
observed:
Observed spectrum to optimize to
model:
Forward model to optimize
sampling_efficiency:
Sampling efficiency (parameter, ...)
num_live_points:
Number of live points
max_iterations:
Maximum no. of iterations (0=inf)
search_multi_modes:
Search for multiple modes
num_params_cluster:
Parameters on which to cluster
e.g. if nclust_par = 3, it will cluster on the first 3 parameters only.
if ncluster_par = -1 it clusters on all parameters
maximum_modes:
Maximum number of modes
constant_efficiency_mode:
Run in constant efficiency mode
evidence_tolerance:
Set log likelihood tolerance. :math:`\Delta \log Z <` evidence_tolerance
mode_tolerance:
Mode tolerance
importance_sampling:
Use importance nested sampling
resume:
Resume from previous run
n_iter_before_update:
Number of iterations before updating
multinest_prefix:
Prefix for multinest output files
verbose_output:
Verbose output
sigma_fraction:
Fraction of sigma to use for prior. Default is 10%
"""
super().__init__("Multinest", observed, model, sigma_fraction)
multi_nest_path = pathlib.Path(multi_nest_path)
# sampling chains directory
self.nest_path = "chains/"
self.nclust_par = -1
# sampling efficiency (parameter, ...)
self.sampling_eff = sampling_efficiency
# number of live points
self.n_live_points = int(num_live_points)
# maximum no. of iterations (0=inf)
self.max_iter = int(max_iterations)
# search for multiple modes
self.multimodes = search_multi_modes
self.n_iter_before_update = int(n_iter_before_update)
# parameters on which to cluster, e.g. if nclust_par = 3, it will
# cluster on the first 3 parameters only.
# if ncluster_par = -1 it clusters on all parameters
self.nclust_par = num_params_cluster
# maximum number of modes
self.max_modes = int(maximum_modes)
# run in constant efficiency mode
self.const_eff = constant_efficiency_mode
self._only_finish = only_finish
# set log likelihood tolerance. If change is smaller,
# multinest will have converged
self.evidence_tolerance = evidence_tolerance
self.mode_tolerance = mode_tolerance
# importance nested sampling
self.imp_sampling = importance_sampling
if self.imp_sampling:
self.multimodes = False
self.multinest_prefix = multinest_prefix
if get_rank() == 0:
if not multi_nest_path.exists():
self.info("Directory %s does not exist, creating", multi_nest_path)
multi_nest_path.mkdir(parents=True)
barrier()
# Convert to standard string path
self.dir_multinest = multi_nest_path
self.info("Found directory %s", self.dir_multinest)
self.resume = resume
self.verbose = verbose_output
[docs]
def compute_fit(self):
"""Compute the fit."""
import json
import pymultinest
def multinest_loglike(cube, ndim, nparams):
data = np.array([cube[i] for i in range(len(self.fitting_parameters))])
return self.log_likelihood(data)
def multinest_uniform_prior(cube, ndim, nparams):
# prior distributions called by multinest. Implements a uniform prior
# converting parameters from normalised grid to uniform prior
# print(type(cube))
data = np.array([cube[i] for i in range(nparams)])
prior = self.prior_transform(data)
for idx, c in enumerate(prior):
cube[idx] = c
ndim = len(self.fitting_parameters)
self.warning(f"Number of dimensions {ndim}")
self.warning(f"Fitting parameters {self.fitting_parameters}")
ncluster = self.nclust_par
if isinstance(ncluster, float):
ncluster = int(ncluster)
if ncluster is not None and ncluster <= 0:
ncluster = None
if ncluster is None:
self.nclust_par = ndim # For writing to output later on
# Will help for live plotting
with open(self.dir_multinest / f"{self.multinest_prefix}params.json", "w") as f:
json.dump(self.fit_latex, f)
# write param json file
self.info("Beginning fit......")
if not self._only_finish:
pymultinest.run(
LogLikelihood=multinest_loglike,
Prior=multinest_uniform_prior,
n_dims=ndim,
multimodal=self.multimodes,
n_clustering_params=ncluster,
max_modes=self.max_modes,
n_iter_before_update=self.n_iter_before_update,
outputfiles_basename=str(self.dir_multinest / self.multinest_prefix),
const_efficiency_mode=self.const_eff,
importance_nested_sampling=self.imp_sampling,
resume=self.resume,
verbose=self.verbose,
sampling_efficiency=self.sampling_eff,
evidence_tolerance=self.evidence_tolerance,
mode_tolerance=self.mode_tolerance,
n_live_points=self.n_live_points,
max_iter=self.max_iter,
)
self.info("Fit complete.....")
self._multinest_output = self.store_nest_solutions()
self.debug("Multinest output %s", self._multinest_output)
[docs]
def write_optimizer(self, output: OutputGroup) -> OutputGroup:
"""Write optimizer to output."""
opt = super().write_optimizer(output)
# sampling efficiency (parameter, ...)
opt.write_scalar("sampling_eff ", self.sampling_eff)
# number of live points
opt.write_scalar("num_live_points", self.n_live_points)
# maximum no. of iterations (0=inf)
opt.write_scalar("max_iterations", self.max_iter)
# search for multiple modes
opt.write_scalar("search_multimodes", self.multimodes)
# parameters on which to cluster, e.g. if nclust_par = 3,
# it will cluster on the first 3 parameters only.
# if ncluster_par = -1 it clusters on all parameters
opt.write_scalar("nclust_parameter", self.nclust_par)
# maximum number of modes
opt.write_scalar("max_modes", self.max_modes)
# run in constant efficiency mode
opt.write_scalar("constant_efficiency", self.const_eff)
# set log likelihood tolerance. If change is smaller,
# multinest will have converged
opt.write_scalar("evidence_tolerance", self.evidence_tolerance)
opt.write_scalar("mode_tolerance", self.mode_tolerance)
# importance nested sampling
opt.write_scalar("importance_sampling ", self.imp_sampling)
return opt
[docs]
def write_fit(self, output):
fit = super().write_fit(output)
if self._multinest_output:
recursively_save_dict_contents_to_output(output, self._multinest_output)
return fit
# Laziness brought us to this point
# This function is so big and I cannot be arsed to rewrite
# this in a nicer way, if some angel does it
# for me then I will buy them TWO beers.
# Also I think pymultinest does most of this so why??!
[docs]
def store_nest_solutions(self) -> NestOutputType: # noqa: C901
"""Store the multinest results."""
import pymultinest
self.warning("Store the multinest results")
nest_out = {"solutions": {}}
data = np.loadtxt(
self.dir_multinest
/ f"{self.multinest_prefix}.txt"
# os.path.join(self.dir_multinest, "{}.txt".format(self.multinest_prefix))
)
nest_analyser = pymultinest.Analyzer(
n_params=len(self.fitting_parameters),
outputfiles_basename=os.path.join(
self.dir_multinest, self.multinest_prefix
),
)
nest_stats = nest_analyser.get_stats()
nest_stats = t.cast(NestStatsOutput, nest_stats)
nest_out["NEST_stats"] = nest_stats
if "global evidence" in nest_stats:
nest_out["global_logE"] = (
nest_out["NEST_stats"]["global evidence"],
nest_out["NEST_stats"]["global evidence error"],
)
# Bypass if run in multimodes = False. Pymultinest.Analyser does
# not report means and sigmas in this case
if len(nest_out["NEST_stats"]["modes"]) == 0:
with open(
self.dir_multinest
/ f"{self.multinest_prefix}stats.dat"
# os.path.join(
# self.dir_multinest, "{}stats.dat".format(self.multinest_prefix)
# )
) as file:
lines = file.readlines()
stats = {"modes": []}
read_error_into_dict(lines[0], stats)
text = "".join(lines[2:])
# without INS:
parts = text.split("\n\n")
mode = {"index": 0}
modelines = parts[0]
tab = read_table(modelines, title="Parameter")
mode["mean"] = tab[:, 1].tolist()
mode["sigma"] = tab[:, 2].tolist()
mode["maximum"] = read_table(parts[1], title="Parameter", d=None)[
:, 1
].tolist()
mode["maximum a posterior"] = read_table(
parts[2], title="Parameter", d=None
)[:, 1].tolist()
if "Nested Sampling Global Log-Evidence".lower() in stats:
mode["Local Log-Evidence".lower()] = stats[
"Nested Sampling Global Log-Evidence".lower()
]
mode["Local Log-Evidence error".lower()] = stats[
"Nested Sampling Global Log-Evidence error".lower()
]
else:
mode["Local Log-Evidence".lower()] = stats[
"Nested Importance Sampling Global Log-Evidence".lower()
]
mode["Local Log-Evidence error".lower()] = stats[
"Nested Importance Sampling Global Log-Evidence error".lower()
]
mode["Strictly Local Log-Evidence".lower()] = mode[
"Local Log-Evidence".lower()
]
mode["Strictly Local Log-Evidence error".lower()] = mode[
"Local Log-Evidence error".lower()
]
nest_out["NEST_stats"]["modes"] = [mode]
modes = []
modes_weights = []
chains = []
chains_weights = []
if self.multimodes:
# separate modes. get individual samples for each mode
# get parameter values and sample probability (=weight) for each mode
with open(
os.path.join(
self.dir_multinest / f"{self.multinest_prefix}post_separate.dat",
)
) as f:
lines = f.readlines()
for idx, line in enumerate(lines):
if idx > 2: # skip the first two lines
if lines[idx - 1] == "\n" and lines[idx - 2] == "\n":
modes.append(chains)
modes_weights.append(chains_weights)
chains = []
chains_weights = []
chain = [float(x) for x in line.split()[2:]]
if len(chain) > 0:
chains.append(chain)
chains_weights.append(float(line.split()[0]))
modes.append(chains)
modes_weights.append(chains_weights)
modes_array = []
for mode in modes:
mode_array = np.zeros((len(mode), len(mode[0])))
for idx, line in enumerate(mode):
mode_array[idx, :] = line
modes_array.append(mode_array)
else:
# not running in multimode. Get chains directly from file 1-.txt
modes_array = [data[:, 2:]]
chains_weights = [data[:, 0]]
modes_weights.append(chains_weights[0])
modes = [0]
for nmode in range(len(modes)):
self.debug(f"Nmode: {nmode}")
mydict = {
"type": "nest",
"local_logE": (
nest_out["NEST_stats"]["modes"][0]["local log-evidence"],
nest_out["NEST_stats"]["modes"][0]["local log-evidence error"],
),
"weights": np.asarray(modes_weights[nmode]),
"tracedata": modes_array[nmode],
"fit_params": {},
}
nest_stats
for idx, param_name in enumerate(self.fit_names):
trace = modes_array[nmode][:, idx]
q_16, q_50, q_84 = quantile_corner(
trace, [0.16, 0.5, 0.84], weights=np.asarray(modes_weights[nmode])
)
mydict["fit_params"][param_name] = {
"value": q_50,
"sigma_m": q_50 - q_16,
"sigma_p": q_84 - q_50,
"nest_map": nest_stats["modes"][nmode]["maximum a posterior"][idx],
"mean": nest_stats["modes"][nmode]["mean"][idx],
"nest_sigma": nest_stats["modes"][nmode]["sigma"][idx],
"trace": trace,
}
nest_out["solutions"][f"solution{nmode}"] = mydict
return nest_out
[docs]
def generate_solution(
self, output_size=OutputSize.heavy
) -> t.Dict[str, AnyValType]:
"""Generate solution."""
solution = super().generate_solution(output_size=output_size)
solution["GlobalStats"] = self._multinest_output["NEST_stats"]
return solution
[docs]
def get_samples(self, solution_idx: int) -> npt.NDArray[np.float64]:
"""Get samples from solution."""
return self._multinest_output["solutions"][f"solution{solution_idx}"][
"tracedata"
]
[docs]
def get_weights(self, solution_idx: int) -> npt.NDArray[np.float64]:
"""Get weights from solution."""
return self._multinest_output["solutions"][f"solution{solution_idx}"]["weights"]
[docs]
def get_solution(
self,
) -> t.Generator[
t.Tuple[
int,
npt.NDArray[np.float64],
npt.NDArray[np.float64],
t.Tuple[
t.Tuple[
t.Literal["Statistics"],
t.Dict[
t.Literal["local log-evidence", "local log-evidence error"],
float,
],
],
t.Tuple[t.Literal["fit_params"], t.Dict[str, NestFitParam]],
t.Tuple[t.Literal["tracedata"], npt.NDArray[np.float64]],
t.Tuple[t.Literal["weights"], npt.NDArray[np.float64]],
],
],
None,
None,
]:
"""Get solution as generator."""
names = self.fit_names
opt_values = self.fit_values
opt_map = self.fit_values
solutions = [
(k, v)
for k, v in self._multinest_output["solutions"].items()
if "solution" in k
]
for k, v in solutions:
solution_idx = int(k[8:])
for p_name, p_value in v["fit_params"].items():
idx = names.index(p_name)
opt_map[idx] = p_value["nest_map"]
opt_values[idx] = p_value["value"]
yield solution_idx, opt_map, opt_values, [
(
"Statistics",
{
"local log-evidence": self._multinest_output["NEST_stats"][
"modes"
][solution_idx]["local log-evidence"],
"local log-evidence error": self._multinest_output[
"NEST_stats"
]["modes"][solution_idx]["local log-evidence error"],
},
),
("fit_params", v["fit_params"]),
("tracedata", v["tracedata"]),
("weights", v["weights"]),
]
BIBTEX_ENTRIES = [
r"""
@article{ refId0,
author = {{Buchner, J.} and {Georgakakis, A.} and {Nandra, K.} and {Hsu, L.}
and {Rangel, C.} and {Brightman, M.} and {Merloni, A.} and {Salvato, M.}
and {Donley, J.} and {Kocevski, D.}},
title = {X-ray spectral modelling of the AGN obscuring region in the
CDFS: Bayesian model selection and catalogue},
DOI= "10.1051/0004-6361/201322971",
url= "https://doi.org/10.1051/0004-6361/201322971",
journal = {A\&A},
year = 2014,
volume = 564,
pages = "A125",
month = "",
}
""",
"""
@ARTICLE{Feroz_multinest,
author = {{Feroz}, F. and {Hobson}, M.~P. and {Bridges}, M.},
title = "{MULTINEST: an efficient and robust Bayesian
inference tool for cosmology and particle physics}",
journal = {MNRAS},
keywords = {methods: data analysis, methods: statistical, Astrophysics},
year = "2009",
month = "Oct",
volume = {398},
number = {4},
pages = {1601-1614},
doi = {10.1111/j.1365-2966.2009.14548.x},
archivePrefix = {arXiv},
eprint = {0809.3437},
primaryClass = {astro-ph},
adsurl = {https://ui.adsabs.harvard.edu/abs/2009MNRAS.398.1601F},
adsnote = {Provided by the SAO/NASA Astrophysics Data System}
}
""",
]