Source code for pyanno.modelBt_loopdesign

# Copyright (c) 2011, Enthought, Ltd.
# Authors: Pietro Berkes <pberkes@enthought.com>, Andrey Rzhetsky
# License: Modified BSD license (2-clause)

"""This module defines model B-with-theta, optimized for a loop design.

The implementation assumes that there are a total or 8 annotators. Each item is
annotated by a triplet of annotators, according to the loop design described
in Rzhetsky et al., 2009.

E.g., for 16 items the loop design looks like this (`A` indicates a label,
`*` indicates a missing value): ::

    A A A * * * * *
    A A A * * * * *
    * A A A * * * *
    * A A A * * * *
    * * A A A * * *
    * * A A A * * *
    * * * A A A * *
    * * * A A A * *
    * * * * A A A *
    * * * * A A A *
    * * * * * A A A
    * * * * * A A A
    A * * * * * A A
    A * * * * * A A
    A A * * * * * A
    A A * * * * * A

"""

import numpy as np
import scipy.optimize
import scipy.stats
from traits.api import Int, Array
from pyanno.abstract_model import AbstractModel
from pyanno.sampling import optimize_step_size, sample_distribution
from pyanno.util import (random_categorical, compute_counts,
                         SMALLEST_FLOAT, MISSING_VALUE, labels_frequency,
                         is_valid, ninf_to_num)

import logging
logger = logging.getLogger(__name__)


# map of `n` to list of all possible triplets of `n` elements
_triplet_combinations = {}
def _get_triplet_combinations(n):
    """Return array of all possible combinations of n elements in triplets.
    """
    if not _triplet_combinations.has_key(n):
        _triplet_combinations[n] = (
            np.array([i for i in np.ndindex(n,n,n)]) )
    return _triplet_combinations[n]


[docs]class ModelBtLoopDesign(AbstractModel): """Implementation of Model B-with-theta from (Rzhetsky et al., 2009). The model assumes the existence of "true" underlying labels for each item, which are drawn from a categorical distribution, gamma. Annotators report this labels with some noise. This model is closely related to :class:`~ModelB`, but, crucially, the noise distribution is described by a small number of parameters (one per annotator), which makes their estimation efficient and less sensitive to local optima. The model parameters are: - gamma[k] is the probability of label k - theta[j] parametrized the probability that annotator j reports label k'. More specifically, P( annotator j chooses k' | real label = k) is theta[j] for k' = k, or (1 - theta[j]) / sum(theta) if k' != k . This implementation is optimized for he loop design introduced in (Rzhetsky et al., 2009), which assumes that each item is annotated by 3 out of 8 annotators. For a more general implementation, see :class:`~ModelBt` See the documentation for a more detailed description of the model. **Reference** * Rzhetsky A., Shatkay, H., and Wilbur, W.J. (2009). "How to get the most from your curation effort", PLoS Computational Biology, 5(5). """ nclasses = Int nannotators = Int(8) # number of annotators rating each item in the loop design nannotators_per_item = Int(3) gamma = Array(dtype=float, shape=(None,)) theta = Array(dtype=float, shape=(None,)) def __init__(self, nclasses, gamma, theta, **traits): """Create an instance of ModelB. Arguments ---------- nclasses : int Number of possible annotation classes nannotators : int Number of annotators gamma : ndarray, shape = (n_classes, ) gamma[k] is the prior probability of label class k theta : ndarray, shape = (n_annotators, ) theta[j] parametrizes the accuracy of annotator j. Specifically, `P( annotator j chooses k' | real label = k)` is `theta[j]` for k' = k, or `(1 - theta[j]) / sum(theta)` if `k' != k `. """ self.nclasses = nclasses self.gamma = gamma self.theta = theta super(ModelBtLoopDesign, self).__init__(**traits) ##### Model and data generation methods ################################### @staticmethod
[docs] def create_initial_state(nclasses, gamma=None, theta=None): """Factory method returning a model with random initial parameters. It is often more convenient to use this factory method over the constructor, as one does not need to specify the initial model parameters. The parameters theta and gamma, controlling accuracy and prevalence, are initialized at random as follows: :math:`\\theta_j \sim \mathrm{Uniform}(0.6, 0.95)` :math:`\gamma \sim \mathrm{Dirichlet}(2.0)` Arguments --------- nclasses : int number of categories gamma : nparray An array of floats with size that holds the probability of each annotation value. Default is None theta : nparray An array of floats that the parameters of P( v_i | psi ) (one for each annotator) Returns ------- model : :class:`~ModelBtLoopDesign` Instance of ModelBtLoopDesign """ if gamma is None: gamma = ModelBtLoopDesign._random_gamma(nclasses) if theta is None: nannotators = 8 theta = ModelBtLoopDesign._random_theta(nannotators) model = ModelBtLoopDesign(nclasses, gamma, theta) return model
@staticmethod def _random_gamma(nclasses): beta = 2.*np.ones((nclasses,)) return np.random.dirichlet(beta) @staticmethod def _random_theta(nannotators): return np.random.uniform(low=0.6, high=0.95, size=(nannotators,))
[docs] def generate_labels(self, nitems): """Generate random labels from the model.""" return random_categorical(self.gamma, nitems)
[docs] def generate_annotations_from_labels(self, labels): """Generate random annotations from the model, given labels The method samples random annotations from the conditional probability distribution of annotations, :math:`x_i^j` given labels, :math:`y_i`. Arguments ---------- labels : ndarray, shape = (n_items,), dtype = int Set of "true" labels Returns ------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i """ theta = self.theta nannotators = self.nannotators nitems = labels.shape[0] nitems_per_loop = np.ceil(float(nitems) / nannotators) annotations = np.empty((nitems, nannotators), dtype=int) for j in xrange(nannotators): for i in xrange(nitems): distr = self._theta_to_categorical(theta[j], labels[i]) annotations[i,j] = random_categorical(distr, 1) # mask annotation value according to loop design for l in xrange(nannotators): label_idx = np.arange(l+self.nannotators_per_item, l+nannotators) % 8 annotations[l*nitems_per_loop:(l+1)*nitems_per_loop, label_idx] = MISSING_VALUE return annotations
[docs] def generate_annotations(self, nitems): """Generate a random annotation set from the model. Sample a random set of annotations from the probability distribution defined the current model parameters: 1) Label classes are generated from the prior distribution, pi 2) Annotations are generated from the conditional distribution of annotations given classes, parametrized by theta Arguments --------- nitems : int Number of items to sample Returns ------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i """ labels = self.generate_labels(nitems) return self.generate_annotations_from_labels(labels)
def _theta_to_categorical(self, theta, psi): """Returns P( v_i = psi | theta_i ) as a distribution.""" distr = np.empty((self.nclasses,)) distr.fill((1.-theta)/(self.nclasses-1.)) distr[psi] = theta assert np.allclose(distr.sum(), 1.) return distr ##### Parameters estimation methods #######################################
[docs] def mle(self, annotations, estimate_gamma=True): """Computes maximum likelihood estimate (MLE) of parameters. Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of observed annotations using maximum likelihood estimation. Arguments ---------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i estimate_gamma : bool If True, the parameters :attr:`gamma` are estimated by the empirical class frequency. If False, :attr:`gamma` is left unchanged. """ self._raise_if_incompatible(annotations) # wrap log likelihood function to give it to optimize.fmin _llhood_counts = self._log_likelihood_counts def _wrap_llhood(params, counts): self.gamma, self.theta = self._vector_to_params(params) # minimize *negative* likelihood return - _llhood_counts(counts) self._parameter_estimation(_wrap_llhood, annotations, estimate_gamma=estimate_gamma)
[docs] def map(self, annotations, estimate_gamma=True): """Computes maximum a posteriori (MAP) estimate of parameters. Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of observed annotations using maximum a posteriori estimation. Arguments ---------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i estimate_gamma : bool If True, the parameters :attr:`gamma` are estimated by the empirical class frequency. If False, :attr:`gamma` is left unchanged. """ self._raise_if_incompatible(annotations) # wrap log likelihood function to give it to optimize.fmin _llhood_counts = self._log_likelihood_counts _log_prior = self._log_prior def _wrap_llhood(params, counts): self.gamma, self.theta = self._vector_to_params(params) # minimize *negative* posterior probability of parameters return - (_llhood_counts(counts) + _log_prior()) self._parameter_estimation(_wrap_llhood, annotations, estimate_gamma=estimate_gamma)
def _parameter_estimation(self, objective, annotations, estimate_gamma=True): counts = compute_counts(annotations, self.nclasses) params_start = self._random_initial_parameters(annotations, estimate_gamma) logger.info('Start parameters optimization...') # TODO: use gradient, constrained optimization params_best = scipy.optimize.fmin(objective, params_start, args=(counts,), xtol=1e-4, ftol=1e-4, disp=False, maxiter=10000) logger.info('Parameters optimization finished') # parse arguments and update self.gamma, self.theta = self._vector_to_params(params_best) def _random_initial_parameters(self, annotations, estimate_gamma): if estimate_gamma: # estimate gamma from observed annotations gamma = labels_frequency(annotations, self.nclasses) else: gamma = ModelBtLoopDesign._random_gamma(self.nclasses) theta = ModelBtLoopDesign._random_theta(self.nannotators) return self._params_to_vector(gamma, theta) def _params_to_vector(self, gamma, theta): """Convert the tuple (gamma, theta) to a parameters vector. Used to interface with the optimization routines. """ return np.r_[gamma[:-1], theta] def _vector_to_params(self, params): """Convert a parameters vector to (gamma, theta) tuple. Used to interface with the optimization routines. """ nclasses = self.nclasses gamma = np.zeros((nclasses,)) gamma[:nclasses-1] = params[:nclasses-1] gamma[-1] = 1. - gamma[:nclasses-1].sum() theta = params[nclasses-1:] return gamma, theta ##### Model likelihood methods ############################################
[docs] def log_likelihood(self, annotations): """Compute the log likelihood of a set of annotations given the model. Returns :math:`\log P(\mathbf{x} | \gamma, \\theta)`, where :math:`\mathbf{x}` is the array of annotations. Arguments ---------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i Returns ------- log_lhood : float log likelihood of `annotations` """ self._raise_if_incompatible(annotations) counts = compute_counts(annotations, self.nclasses) return self._log_likelihood_counts(counts)
def _log_likelihood_counts(self, counts): """Compute the log likelihood of annotations given the model. This method assumes the data is in counts format. """ # TODO: check if it's possible to replace these constraints with bounded optimization # check boundary conditions if (min(min(self.gamma), min(self.theta)) < 0. or max(max(self.gamma), max(self.theta)) > 1.): #return np.inf return SMALLEST_FLOAT llhood = 0. # loop over the 8 combinations of annotators for i in range(8): # extract the theta parameters for this triplet triplet_indices = np.arange(i, i+3) % self.nannotators triplet_indices.sort() theta_triplet = self.theta[triplet_indices] # compute the likelihood for the triplet llhood += self._log_likelihood_triplet(counts[:,i], theta_triplet) return llhood def _log_likelihood_triplet(self, counts_triplet, theta_triplet): """Compute the log likelihood of data for one triplet of annotators. Input: counts_triplet -- count data for one combination of annotators theta_triplet -- theta parameters of the current triplet """ # log \prod_n P(v_{ijk}^{n} | params) # = \sum_n log P(v_{ijk}^{n} | params) # = \sum_v_{ijk} count(v_{ijk}) log P( v_{ijk} | params ) # # where n is n-th annotation of triplet {ijk}] # compute P( v_{ijk} | params ) pf = self._pattern_frequencies(theta_triplet) log_pf = ninf_to_num(np.log(pf)) l = (counts_triplet * log_pf).sum() return l def _pattern_frequencies(self, theta_triplet): """Compute vector of P(v_{ijk}|params) for each combination of v_{ijk}. """ gamma = self.gamma nclasses = self.nclasses # list of all possible combinations of v_i, v_j, v_k elements v_ijk_combinations = _get_triplet_combinations(nclasses) # P( v_{ijk} | params ) = \sum_psi P( v_{ijk} | psi, params ) P( psi ) pf = 0. not_theta = (1.-theta_triplet) / (nclasses-1.) p_v_ijk_given_psi = np.empty_like(v_ijk_combinations, dtype=float) for psi in range(nclasses): for j in range(3): p_v_ijk_given_psi[:,j] = np.where(v_ijk_combinations[:,j]==psi, theta_triplet[j], not_theta[j]) pf += p_v_ijk_given_psi.prod(1) * gamma[psi] return pf def _log_prior(self): """Compute log probability of prior on the theta parameters.""" log_prob = scipy.stats.beta._logpdf(self.theta, 2., 1.).sum() return log_prob ##### Sampling posterior over parameters ##################################
[docs] def sample_posterior_over_accuracy(self, annotations, nsamples, burn_in_samples = 100, thin_samples = 5, target_rejection_rate = 0.3, rejection_rate_tolerance = 0.2, step_optimization_nsamples = 500, adjust_step_every = 100): """Return samples from posterior distribution over theta given data. Samples are drawn using a variant of a Metropolis-Hasting Markov Chain Monte Carlo (MCMC) algorithm. Sampling proceeds in two phases: 1) *step size estimation phase*: first, the step size in the MCMC algorithm is adjusted to achieve a given rejection rate. 2) *sampling phase*: second, samples are collected using the step size from phase 1. Arguments ---------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i nsamples : int Number of samples to return (i.e., burn-in and thinning samples are not included) burn_in_samples : int Discard the first `burn_in_samples` during the initial burn-in phase, where the Monte Carlo chain converges to the posterior thin_samples : int Only return one every `thin_samples` samples in order to reduce the auto-correlation in the sampling chain. This is called "thinning" in MCMC parlance. target_rejection_rate : float target rejection rate for the step size estimation phase rejection_rate_tolerance : float the step size estimation phase is ended when the rejection rate for all parameters is within `rejection_rate_tolerance` from `target_rejection_rate` step_optimization_nsamples : int number of samples to draw in the step size estimation phase adjust_step_every : int number of samples after which the step size is adjusted during the step size estimation pahse Returns ------- samples : ndarray, shape = (n_samples, n_annotators) samples[i,:] is one sample from the posterior distribution over the parameters `theta` """ self._raise_if_incompatible(annotations) nsamples = self._compute_total_nsamples(nsamples, burn_in_samples, thin_samples) # optimize step size counts = compute_counts(annotations, self.nclasses) # wrap log likelihood function to give it to optimize_step_size and # sample_distribution _llhood_counts = self._log_likelihood_counts _log_prior = self._log_prior def _wrap_llhood(params, counts): self.theta = params return _llhood_counts(counts) + _log_prior() # TODO this save-reset is rather ugly, refactor: create copy of # model and sample over it # save internal parameters to reset at the end of sampling save_params = (self.gamma, self.theta) try: # compute optimal step size for given target rejection rate params_start = self.theta.copy() params_upper = np.ones((self.nannotators,)) params_lower = np.zeros((self.nannotators,)) step = optimize_step_size(_wrap_llhood, params_start, counts, params_lower, params_upper, step_optimization_nsamples, adjust_step_every, target_rejection_rate, rejection_rate_tolerance) # draw samples from posterior distribution over theta samples = sample_distribution(_wrap_llhood, params_start, counts, step, nsamples, params_lower, params_upper) return self._post_process_samples(samples, burn_in_samples, thin_samples) finally: # reset parameters self.gamma, self.theta = save_params ##### Posterior distributions #############################################
[docs] def infer_labels(self, annotations): """Infer posterior distribution over label classes. Compute the posterior distribution over label classes given observed annotations, :math:`P( \mathbf{y} | \mathbf{x}, \\theta, \omega)`. Arguments ---------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation of annotator j for item i Returns ------- posterior : ndarray, shape = (n_items, n_classes) posterior[i,k] is the posterior probability of class k given the annotation observed in item i. """ self._raise_if_incompatible(annotations) nitems = annotations.shape[0] gamma = self.gamma nclasses = self.nclasses # get indices of annotators active in each row valid_entries = is_valid(annotations).nonzero() annotator_indices = np.reshape(valid_entries[1], (nitems, self.nannotators_per_item)) valid_annotations = annotations[valid_entries] valid_annotations = np.reshape(valid_annotations, (nitems, self.nannotators_per_item)) # thetas of active annotators theta_equal = self.theta[annotator_indices] theta_not_equal = (1. - theta_equal) / (nclasses - 1.) # compute posterior over psi psi_distr = np.zeros((nitems, nclasses)) for psi in xrange(nclasses): tmp = np.where(valid_annotations == psi, theta_equal, theta_not_equal) psi_distr[:,psi] = gamma[psi] * tmp.prod(1) # normalize distribution psi_distr /= psi_distr.sum(1)[:,np.newaxis] return psi_distr ##### Verify input ########################################################
[docs] def are_annotations_compatible(self, annotations): """Check if the annotations are compatible with the models' parameters. """ if not super(ModelBtLoopDesign, self).are_annotations_compatible( annotations): return False masked_annotations = np.ma.masked_equal(annotations, MISSING_VALUE) # exactly 3 annotations per row nvalid = (~masked_annotations.mask).sum(1) if not np.all(nvalid == self.nannotators_per_item): return False return True

Table Of Contents