Source code for autofit.non_linear.search.mcmc.emcee.search

import logging
import os
from pathlib import Path
from typing import Dict, Optional

import numpy as np

from autoconf import conf

from autofit.database.sqlalchemy_ import sa
from autofit.mapper.model_mapper import ModelMapper
from autofit.mapper.prior_model.abstract import AbstractPriorModel
from autofit.non_linear.fitness import Fitness
from autofit.non_linear.initializer import Initializer
from autofit.non_linear.search.mcmc.abstract_mcmc import AbstractMCMC
from autofit.non_linear.search.mcmc.auto_correlations import AutoCorrelationsSettings
from autofit.non_linear.search.mcmc.auto_correlations import AutoCorrelations
from autofit.non_linear.test_mode import is_test_mode
from autofit.non_linear.samples.sample import Sample
from autofit.non_linear.samples.mcmc import SamplesMCMC

logger = logging.getLogger(__name__)


[docs] class Emcee(AbstractMCMC): __identifier_fields__ = ("nwalkers",) def __init__( self, name: Optional[str] = None, path_prefix: Optional[str] = None, unique_tag: Optional[str] = None, nwalkers: int = 50, nsteps: int = 2000, initializer: Optional[Initializer] = None, auto_correlation_settings=AutoCorrelationsSettings(), iterations_per_quick_update: int = None, iterations_per_full_update: int = None, number_of_cores: int = 1, silence: bool = False, session: Optional[sa.orm.Session] = None, **kwargs, ): """ An Emcee non-linear search. For a full description of Emcee, checkout its Github and readthedocs webpages: https://github.com/dfm/emcee https://emcee.readthedocs.io/en/stable/ If you use `Emcee` as part of a published work, please cite the package following the instructions under the *Attribution* section of the GitHub page. Parameters ---------- name The name of the search, controlling the last folder results are output. path_prefix The path of folders prefixing the name folder where results are output. unique_tag The name of a unique tag for this model-fit, which will be given a unique entry in the sqlite database and also acts as the folder after the path prefix and before the search name. nwalkers The number of walkers in the ensemble used to sample parameter space. nsteps The number of steps that must be taken by every walker. initializer Generates the initialize samples of non-linear parameter space (see autofit.non_linear.initializer). auto_correlation_settings Customizes and performs auto correlation calculations performed during and after the search. number_of_cores The number of cores sampling is performed using a Python multiprocessing Pool instance. silence If True, the default print output of the non-linear search is silenced. session An SQLalchemy session instance so the results of the model-fit are written to an SQLite database. """ super().__init__( name=name, path_prefix=path_prefix, unique_tag=unique_tag, initializer=initializer, auto_correlation_settings=auto_correlation_settings, iterations_per_quick_update=iterations_per_quick_update, iterations_per_full_update=iterations_per_full_update, number_of_cores=number_of_cores, silence=silence, session=session, **kwargs, ) self.nwalkers = nwalkers self.nsteps = nsteps if is_test_mode(): self.apply_test_mode() self.logger.debug("Creating Emcee Search") conf.instance["output"]["search_internal"] = True
[docs] def apply_test_mode(self): logger.warning( "TEST MODE 1 (reduced iterations): Sampler will run with " "minimal iterations for faster completion." ) self.nwalkers = 20 self.nsteps = 10
def _fit(self, model: AbstractPriorModel, analysis): """ Fit a model using Emcee and the Analysis class which contains the data and returns the log likelihood from instances of the model, which the `NonLinearSearch` seeks to maximize. Parameters ---------- model : ModelMapper The model which generates instances for different points in parameter space. analysis : Analysis Contains the data and the log likelihood function which fits an instance of the model to the data, returning the log likelihood the `NonLinearSearch` maximizes. Returns ------- A result object comprising the Samples object that inclues the maximum log likelihood instance and full chains used by the fit. """ import emcee fitness = Fitness( model=model, analysis=analysis, paths=self.paths, fom_is_log_likelihood=False, resample_figure_of_merit=-np.inf, ) pool = self.make_sneaky_pool(fitness) try: backend = emcee.backends.HDFBackend(filename=self.backend_filename) except TypeError: backend = None search_internal = emcee.EnsembleSampler( nwalkers=self.nwalkers, ndim=model.prior_count, log_prob_fn=fitness.call_wrap, backend=backend, pool=pool, ) try: state = search_internal.get_last_sample() samples = self.samples_from(model=model, search_internal=search_internal) total_iterations = search_internal.iteration if samples.converged: iterations_remaining = 0 else: iterations_remaining = self.nsteps - total_iterations self.logger.info( "Resuming Emcee non-linear search (previous samples found)." ) except AttributeError: ( unit_parameter_lists, parameter_lists, log_posterior_list, ) = self.initializer.samples_from_model( total_points=search_internal.nwalkers, model=model, fitness=fitness, paths=self.paths, n_cores=self.number_of_cores, ) self.plot_start_point( parameter_vector=parameter_lists[0], model=model, analysis=analysis, ) state = np.zeros(shape=(search_internal.nwalkers, model.prior_count)) self.logger.info( "Starting new Emcee non-linear search (no previous samples found)." ) for index, parameters in enumerate(parameter_lists): state[index, :] = np.asarray(parameters) total_iterations = 0 iterations_remaining = self.nsteps while iterations_remaining > 0: if self.iterations_per_full_update > iterations_remaining: iterations = iterations_remaining else: iterations = self.iterations_per_full_update for sample in search_internal.sample( initial_state=state, iterations=iterations, progress=True, skip_initial_state_check=True, store=True, ): pass state = search_internal.get_last_sample() total_iterations += iterations iterations_remaining = self.nsteps - total_iterations samples = self.samples_from(model=model, search_internal=search_internal) if self.auto_correlation_settings.check_for_convergence: if ( search_internal.iteration > self.auto_correlation_settings.check_size ): if samples.converged: iterations_remaining = 0 if iterations_remaining > 0: self.perform_update( model=model, analysis=analysis, search_internal=search_internal, fitness=fitness, during_analysis=True, ) return search_internal, fitness
[docs] def output_search_internal(self, search_internal): """ Output the sampler results to hard-disk in their internal format. Emcee uses a backend to store and load results, therefore the outputting of the search internal to a dill file is disabled. Parameters ---------- sampler The nautilus sampler object containing the results of the model-fit. """ pass
[docs] def samples_info_from(self, search_internal=None): search_internal = search_internal or self.backend auto_correlations = self.auto_correlations_from(search_internal=search_internal) return { "check_size": auto_correlations.check_size, "required_length": auto_correlations.required_length, "change_threshold": auto_correlations.change_threshold, "total_walkers": len(search_internal.get_chain()[0, :, 0]), "total_steps": len(search_internal.get_log_prob()), "time": self.timer.time if self.timer else None, }
[docs] def samples_via_internal_from(self, model, search_internal=None): """ Returns a `Samples` object from the emcee internal results. The samples contain all information on the parameter space sampling (e.g. the parameters, log likelihoods, etc.). The internal search results are converted from the native format used by the search to lists of values (e.g. `parameter_lists`, `log_likelihood_list`). Parameters ---------- model Maps input vectors of unit parameter values to physical values and model instances via priors. """ search_internal = search_internal or self.backend if is_test_mode(): samples_after_burn_in = search_internal.get_chain( discard=5, thin=5, flat=True ) else: auto_correlations = self.auto_correlations_from( search_internal=search_internal ) discard = int(3.0 * np.max(auto_correlations.times)) thin = int(np.max(auto_correlations.times) / 2.0) samples_after_burn_in = search_internal.get_chain( discard=discard, thin=thin, flat=True ) parameter_lists = samples_after_burn_in.tolist() log_prior_list = model.log_prior_list_from(parameter_lists=parameter_lists) total_samples = len(parameter_lists) log_posterior_list = search_internal.get_log_prob(flat=True)[ -total_samples - 1 : -1 ].tolist() log_likelihood_list = [ log_posterior - log_prior for log_posterior, log_prior in zip(log_posterior_list, log_prior_list) ] weight_list = len(log_likelihood_list) * [1.0] sample_list = Sample.from_lists( model=model, parameter_lists=parameter_lists, log_likelihood_list=log_likelihood_list, log_prior_list=log_prior_list, weight_list=weight_list, ) return SamplesMCMC( model=model, sample_list=sample_list, samples_info=self.samples_info_from(search_internal=search_internal), auto_correlation_settings=self.auto_correlation_settings, auto_correlations=self.auto_correlations_from( search_internal=search_internal ), )
[docs] def auto_correlations_from(self, search_internal=None): import emcee search_internal = search_internal or self.backend times = search_internal.get_autocorr_time(tol=0) previous_auto_correlation_times = emcee.autocorr.integrated_time( x=search_internal.get_chain()[ : -self.auto_correlation_settings.check_size, :, : ], tol=0, ) return AutoCorrelations( check_size=self.auto_correlation_settings.check_size, required_length=self.auto_correlation_settings.required_length, change_threshold=self.auto_correlation_settings.change_threshold, times=times, previous_times=previous_auto_correlation_times, )
@property def backend_filename(self): return self.paths.search_internal_path / "search_internal.hdf" @property def backend(self) -> "emcee.backends.HDFBackend": """ The `Emcee` hdf5 backend, which provides access to all samples, likelihoods, etc. of the non-linear search. The sampler is described in the "Results" section at https://dynesty.readthedocs.io/en/latest/quickstart.html """ import emcee if Path(self.backend_filename).is_file(): return emcee.backends.HDFBackend(filename=str(self.backend_filename)) else: raise FileNotFoundError( f"The file search_internal.hdf does not exist at the path {self.paths.search_internal_path}" )