Source code for pyanno.sampling

# Copyright (c) 2011, Enthought, Ltd.
# Authors: Pietro Berkes <pberkes@enthought.com>, Andrey Rzhetsky
# License: Modified BSD license (2-clause)

"""This module defines functions to sample from a distribution given its
log likelihood.
"""

import numpy as np
from pyanno.util import PyannoValueError

import logging
logger = logging.getLogger(__name__)


[docs]def sample_distribution(likelihood, x0, arguments, step, nsamples, x_lower, x_upper): """General-purpose sampling routine for MCMC sampling. Draw samples from a distribution given its unnormalized log likelihood using the Metropolis-Hasting Monte Carlo algorithm. It is recommended to optimize the step size, `step`, using the function :func:`optimize_step_size` in order to reduce the autocorrelation between successive samples. Arguments --------- likelihood : function(params, arguments) Function returning the *unnormalized* log likelihood of data given the parameters. The function accepts two arguments: `params` is the vector of parameters; `arguments` contains any additional argument that is needed to compute the log likelihood. x0 : ndarray, shape = (n_parameters, ) Initial parameters value. arguments : any Additional argument passed to the function `likelihood`. step : ndarray, shape = (n_parameters, ) Width of the proposal distribution over the parameters. nsamples : int Number of samples to draw from the distribution. x_lower : ndarray, shape = (n_parameters, ) Lower bound for the parameters. x_upper : ndarray, shape = (n_parameters, ) Upper bound for the parameters. """ logger.info('Start collecting samples...') dim = len(x0) # array of samples samples = np.zeros((nsamples, dim)) # current sample x_curr = x0.copy() # log likelihood of current sample llhood = likelihood(x0, arguments) for i in range(nsamples): if (i + 1) % 100 == 0: logger.info('... collected {} samples'.format(i+1)) sum_log_q_ratio = 0. x_old = x_curr.copy() for j in range(dim): xj, log_q_ratio = sample_from_proposal_distribution( x_old[j], step[j], x_lower[j], x_upper[j] ) sum_log_q_ratio += log_q_ratio x_curr[j] = xj llhood_new = likelihood(x_curr, arguments) # rejection step; reject with probability `alpha` alpha = min(1, np.exp(llhood_new - llhood + sum_log_q_ratio)) if np.random.random() < alpha: # accept llhood = llhood_new else: # reject x_curr = x_old samples[i,:] = x_curr return samples
[docs]def optimize_step_size(likelihood, x0, arguments, x_lower, x_upper, n_samples, recomputing_cycle, target_rejection_rate, tolerance): """Compute optimum jump for MCMC estimation of credible intervals. Adjust jump size in Metropolis-Hasting MC to achieve target rejection rate. Jump size is estimated for each parameter separately. Arguments --------- likelihood : function(params, arguments) Function returning the *unnormalized* log likelihood of data given the parameters. The function accepts two arguments: `params` is the vector of parameters; `arguments` contains any additional argument that is needed to compute the log likelihood. x0 : ndarray, shape = (n_parameters, ) Initial parameters value. arguments : any Additional argument passed to the function `likelihood`. x_lower : ndarray, shape = (n_parameters, ) Lower bound for the parameters. x_upper : ndarray, shape = (n_parameters, ) Upper bound for the parameters. n_samples : int Total number of samples to draw during the optimization. recomputing_cycle : int Number of samples over which the rejection rates are computed. After `recomputing_cycle` samples, the step size is adapted and the rejection rates are reset to 0. target_rejection_rate : float Target rejection rate. If the rejection rate over the latest cycle is closer than `tolerance` to this target, the optimization phase is concluded. tolerance : float Tolerated deviation from `target_rejection_rate`. Returns ------- step : ndarray, shape = (n_parameters, ) The final optimized step size. """ logger.info('Estimate optimal step size') dim = len(x0) # rejection rate for each paramter separately rejection_rate = np.zeros((dim,)) # initial jump sizes step = (x_upper - x_lower) / 100. # *x_curr* is the current samples of the parameters x_curr = x0.copy() # log likelihood of current sample llhood = likelihood(x0, arguments) for i in range(n_samples): # we need to evaluate every dimension separately to have an estimate # of the rejection rate per parameter for j in range(dim): xj_old = x_curr[j] # draw new sample xj, log_q_ratio = sample_from_proposal_distribution( xj_old, step[j], x_lower[j], x_upper[j] ) x_curr[j] = xj llhood_new = likelihood(x_curr, arguments) # rejection step; accept with probability `alpha` alpha = min(1, np.exp(llhood_new - llhood + log_q_ratio)) if np.random.random() < alpha: llhood = llhood_new else: rejection_rate[j] += 1. / recomputing_cycle x_curr[j] = xj_old # adjust step size every `recomputing cycle` steps if i % recomputing_cycle == 0 and i > 0: logger.info('{} samples, adapt step size'.format(i)) logger.debug('Rejection rate: ' + repr(rejection_rate)) logger.debug('Step size: ' +repr(step)) step, terminate = _adjust_jump(step, rejection_rate, target_rejection_rate, tolerance) if terminate == True: logger.debug('Step size within accepted range -- ' 'exit optimization phase') break # reset all rejection rates rejection_rate *= 0. return step # TODO: simplify, move stopping condition ("check") to optimize_step_size
def _adjust_jump(step, rejection_rate, target_reject_rate, tolerance): """Adapt step size to get closer to target rejection rate. Returns ------- step : list new step sizes terminate : bool True if all rejection rates are within the required limits """ terminate = True for j in xrange(step.shape[0]): step[j] = max(1e-6, step[j]) if rejection_rate[j] != 0: if abs(rejection_rate[j] - target_reject_rate) > tolerance: step[j] *= (target_reject_rate / rejection_rate[j]) terminate = False elif rejection_rate[j] == 0: step[j] *= 5. terminate = False return step, terminate # TODO vectorize this to sample from all dimensions at once
[docs]def sample_from_proposal_distribution(theta, step, lower, upper): """Returns one sample from the proposal distribution. Arguments --------- theta : float current parameter value step : float width of the proposal distribution over `theta` lower : float lower bound for `theta` upper : float upper bound for `theta` Returns ------- theta_new : float new sample from the distribution over theta log_q_ratio : float log-ratio of probability of new value given old value to probability of old value given new value """ if theta < lower or theta > upper: raise PyannoValueError('Parameter values out or range') # *a* is a uniform random number a = np.random.random() # boundary conditions if theta == upper: theta = upper - a * min(step, upper - lower) q_new_to_old = -np.log(2 * min(step, upper - theta)) q_old_to_new = -np.log(min(step, upper - lower)) log_q_ratio = q_new_to_old - q_old_to_new return theta, log_q_ratio if theta == lower: theta = lower + a * min(step, upper - lower) q_new_to_old = -np.log(2. * min(step, theta - lower)) q_old_to_new = -np.log(min(step, upper - lower)) log_q_ratio = q_new_to_old - q_old_to_new return theta, log_q_ratio # go to the 'left' if a > 0.5: theta_old = theta #jump interval is *theta*, choose uniformly theta -= np.random.random() * min(step, theta_old - lower) #transition probability from old to new q_old_to_new = -np.log(min(step, theta_old - lower)) q_new_to_old = -np.log(min(step, upper - theta)) log_q_ratio = q_new_to_old - q_old_to_new return theta, log_q_ratio # go to the 'right' else: # jump interval is *upper_limit*-*theta*, choose uniformly theta_old = theta theta += np.random.random() * min(step, upper - theta_old) q_old_to_new = -np.log(min(step, upper - theta_old)) q_new_to_old = -np.log(min(step, theta - lower)) log_q_ratio = q_new_to_old - q_old_to_new return theta, log_q_ratio

Table Of Contents