# Copyright (c) 2011, Enthought, Ltd.
# Authors: Pietro Berkes <pberkes@enthought.com>, Andrey Rzhetsky
# License: Modified BSD license (2-clause)
"""This module defines model B-with-theta.
pyAnno includes another implementation of B-with-theta,
:py:mod:`pyanno.modelBt_loopdesign`, which is optimized for a loop design
where each item is annotated by 3 out of 8 annotators.
"""
import numpy as np
import scipy.optimize
import scipy.stats
from traits.api import Int, Array
from pyanno.abstract_model import AbstractModel
from pyanno.sampling import optimize_step_size, sample_distribution
from pyanno.util import (random_categorical,
SMALLEST_FLOAT, labels_frequency,
is_valid )
import logging
logger = logging.getLogger(__name__)
[docs]class ModelBt(AbstractModel):
"""Implementation of Model B-with-theta from (Rzhetsky et al., 2009).
The model assumes the existence of "true" underlying labels for each item,
which are drawn from a categorical distribution, gamma. Annotators report
this labels with some noise, according to their accuracy, theta.
This model is closely related to :class:`~ModelB`, but, crucially,
the noise distribution is described by a small number of parameters (one
per annotator), which makes their estimation efficient and less sensitive
to local optima.
The model parameters are:
- `gamma[k]` is the probability of label k
- `theta[j]` parametrizes the probability that annotator `j` reports label
`k'` given ground truth, `k`. More specifically,
`P( annotator j chooses k' | real label = k)` is
`theta[j]` for k' = k, or `(1 - theta[j]) / sum(theta)` if `k' != k `.
See the documentation for a more detailed description of the model.
For a version of this model optimized for the loop design described
in (Rzhetsky et al., 2009), see :class:`~ModelBtLoopDesign`.
**Reference**
* Rzhetsky A., Shatkay, H., and Wilbur, W.J. (2009). "How to get the most
from your curation effort", PLoS Computational Biology, 5(5).
"""
######## Model traits
# number of label classes
nclasses = Int
# number of annotators
nannotators = Int
# gamma[k] is prior probability of class k
gamma = Array(dtype=float, shape=(None,))
# theta[j] parametrizes P(annotator j chooses : | real label = k)
theta = Array(dtype=float, shape=(None,))
def __init__(self, nclasses, nannotators,
gamma, theta, **traits):
"""Create an instance of ModelB.
Parameters
----------
nclasses : int
Number of possible annotation classes
nannotators : int
Number of annotators
gamma : ndarray, shape = (n_classes, )
gamma[k] is the prior probability of label class k
theta : ndarray, shape = (n_annotators, )
theta[j] parametrizes the accuracy of annotator j. Specifically,
`P( annotator j chooses k' | real label = k)` is
`theta[j]` for k' = k, or `(1 - theta[j]) / sum(theta)`
if `k' != k `.
"""
self.nclasses = nclasses
self.nannotators = nannotators
self.gamma = gamma
self.theta = theta
super(ModelBt, self).__init__(**traits)
##### Model and data generation methods ###################################
@staticmethod
[docs] def create_initial_state(nclasses, nannotators, gamma=None, theta=None):
"""Factory method returning a model with random initial parameters.
It is often more convenient to use this factory method over the
constructor, as one does not need to specify the initial model
parameters.
The parameters theta and gamma, controlling accuracy and prevalence,
are initialized at random as follows:
:math:`\\theta_j \sim \mathrm{Uniform}(0.6, 0.95)`
:math:`\gamma \sim \mathrm{Dirichlet}(2.0)`
Arguments
---------
nclasses : int
Number of label classes
nannotators : int
Number of annotators
gamma : ndarray, shape = (n_classes, )
gamma[k] is the prior probability of label class k
theta : ndarray, shape = (n_annotators, )
theta[j] parametrizes the accuracy of annotator j. Specifically,
`P( annotator j chooses k' | real label = k)` is
`theta[j]` for k' = k, or `(1 - theta[j]) / sum(theta)`
if `k' != k `.
Returns
-------
model : :class:`~ModelBt`
Instance of ModelBt
"""
if gamma is None:
gamma = ModelBt._random_gamma(nclasses)
if theta is None:
theta = ModelBt._random_theta(nannotators)
model = ModelBt(nclasses, nannotators, gamma, theta)
return model
@staticmethod
def _random_gamma(nclasses):
beta = 2.*np.ones((nclasses,))
return np.random.dirichlet(beta)
@staticmethod
def _random_theta(nannotators):
return np.random.uniform(low=0.6, high=0.95,
size=(nannotators,))
[docs] def generate_labels(self, nitems):
"""Generate random labels from the model."""
return random_categorical(self.gamma, nitems)
[docs] def generate_annotations_from_labels(self, labels):
"""Generate random annotations from the model, given labels
The method samples random annotations from the conditional probability
distribution of annotations, :math:`x_i^j`
given labels, :math:`y_i`.
Arguments
----------
labels : ndarray, shape = (n_items,), dtype = int
Set of "true" labels
Returns
-------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
"""
theta = self.theta
nitems = labels.shape[0]
annotations = np.empty((nitems, self.nannotators), dtype=int)
for j in xrange(self.nannotators):
for i in xrange(nitems):
distr = self._theta_to_categorical(theta[j], labels[i])
annotations[i,j] = random_categorical(distr, 1)
return annotations
[docs] def generate_annotations(self, nitems):
"""Generate a random annotation set from the model.
Sample a random set of annotations from the probability distribution
defined the current model parameters:
1) Label classes are generated from the prior distribution, pi
2) Annotations are generated from the conditional distribution of
annotations given classes, parametrized by theta
Arguments
---------
nitems : int
Number of items to sample
Returns
-------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
"""
labels = self.generate_labels(nitems)
return self.generate_annotations_from_labels(labels)
def _theta_to_categorical(self, theta, psi):
"""Returns P( v_i = psi | theta_i ) as a distribution."""
distr = np.empty((self.nclasses,))
distr.fill((1.-theta)/(self.nclasses-1.))
distr[psi] = theta
assert np.allclose(distr.sum(), 1.)
return distr
##### Parameters estimation methods #######################################
[docs] def mle(self, annotations, estimate_gamma=True):
"""Computes maximum likelihood estimate (MLE) of parameters.
Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of
observed annotations using maximum likelihood estimation.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
estimate_gamma : bool
If True, the parameters :attr:`gamma` are estimated by the empirical
class frequency. If False, :attr:`gamma` is left unchanged.
"""
self._raise_if_incompatible(annotations)
# mask missing annotations
missing_mask_nclasses = self._missing_mask(annotations)
# wrap log likelihood function to give it to optimize.fmin
_llhood = self._log_likelihood_core
def _wrap_llhood(params):
gamma, theta = self._vector_to_params(params)
# minimize *negative* likelihood
return - _llhood(annotations, gamma, theta, missing_mask_nclasses)
self._parameter_estimation(_wrap_llhood, annotations,
estimate_gamma=estimate_gamma)
[docs] def map(self, annotations, estimate_gamma=True):
"""Computes maximum a posteriori (MAP) estimate of parameters.
Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of
observed annotations using maximum a posteriori estimation.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
estimate_gamma : bool
If True, the parameters :attr:`gamma` are estimated by the empirical
class frequency. If False, :attr:`gamma` is left unchanged.
"""
self._raise_if_incompatible(annotations)
# mask missing annotations
missing_mask_nclasses = self._missing_mask(annotations)
# wrap objective function to give it to optimize.fmin
_llhood = self._log_likelihood_core
_log_prior = self._log_prior
def _wrap_llhood(params):
gamma, theta = self._vector_to_params(params)
# minimize *negative* posterior probability of parameters
return - (_llhood(annotations, gamma, theta, missing_mask_nclasses)
+ _log_prior(theta))
self._parameter_estimation(_wrap_llhood, annotations,
estimate_gamma=estimate_gamma)
def _parameter_estimation(self, objective, annotations,
estimate_gamma=True):
params_start = self._random_initial_parameters(annotations,
estimate_gamma)
logger.info('Start parameters optimization...')
# TODO: use gradient, constrained optimization
params_best = scipy.optimize.fmin(objective,
params_start,
xtol=1e-4, ftol=1e-4,
disp=False, maxiter=10000)
logger.info('Parameters optimization finished')
# parse arguments and update
self.gamma, self.theta = self._vector_to_params(params_best)
def _random_initial_parameters(self, annotations, estimate_gamma):
if estimate_gamma:
# estimate gamma from observed annotations
gamma = labels_frequency(annotations, self.nclasses)
else:
gamma = ModelBt._random_gamma(self.nclasses)
theta = ModelBt._random_theta(self.nannotators)
return self._params_to_vector(gamma, theta)
def _params_to_vector(self, gamma, theta):
"""Convert the tuple (gamma, theta) to a parameters vector.
Used to interface with the optimization routines.
"""
return np.r_[gamma[:-1], theta]
def _vector_to_params(self, params):
"""Convert a parameters vector to (gamma, theta) tuple.
Used to interface with the optimization routines.
"""
nclasses = self.nclasses
gamma = np.zeros((nclasses,))
gamma[:nclasses-1] = params[:nclasses-1]
gamma[-1] = 1. - gamma[:nclasses-1].sum()
theta = params[nclasses-1:]
return gamma, theta
##### Model likelihood methods ############################################
[docs] def log_likelihood(self, annotations):
"""Compute the log likelihood of a set of annotations given the model.
Returns :math:`\log P(\mathbf{x} | \gamma, \\theta)`,
where :math:`\mathbf{x}` is the array of annotations.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
Returns
-------
log_lhood : float
log likelihood of `annotations`
"""
self._raise_if_incompatible(annotations)
# mask missing annotations
missing_mask_nclasses = self._missing_mask(annotations)
return self._log_likelihood_core(annotations,
self.gamma, self.theta,
missing_mask_nclasses)
def _log_likelihood_core(self, annotations,
gamma, theta,
missing_mask_nclasses):
# check boundary conditions
if (min(min(gamma), min(theta)) < 0.
or max(max(gamma), max(theta)) > 1.):
#return -np.inf
return SMALLEST_FLOAT
unnorm_category = self._compute_category(
annotations, gamma, theta,
missing_mask_nclasses=missing_mask_nclasses,
normalize=False
)
llhood = np.log(unnorm_category.sum(1)).sum()
if np.isnan(llhood):
llhood = SMALLEST_FLOAT
return llhood
def _compute_category(self, annotations, gamma, theta,
missing_mask_nclasses=None, normalize=True):
"""Compute P(category[i] = k | model, annotations).
Arguments
---------
annotations : ndarray
Array of annotations
gamma : ndarray
Gamma parameters
theta : ndarray
Theta parameters
missing_mask_nclasses : ndarray, shape=(nitems, nannotators, n_classes)
Mask with True at missing values, tiled in the third dimension.
If None, it is computed, but it can be specified to speed-up
computations.
normalize : bool
If False, do not normalize the distribution.
Returns
-------
category : ndarray, shape = (n_items, n_classes)
category[i,k] is the (unnormalized) probability of class k for
item i
"""
nitems, nannotators = annotations.shape
# compute mask of invalid entries in annotations if necessary
if missing_mask_nclasses is None:
missing_mask_nclasses = self._missing_mask(annotations)
accuracy = self._accuracy_tensor(theta)
# unnorm_category is P(category[i] = k | model, data), unnormalized
unnorm_category = np.tile(gamma.copy(), (nitems, 1))
# mask missing annotations
annotators = np.arange(nannotators)[None, :]
tmp = np.ma.masked_array(accuracy[annotators, :, annotations],
mask=missing_mask_nclasses)
unnorm_category *= tmp.prod(1)
if normalize:
return unnorm_category / unnorm_category.sum(1)[:,None]
return unnorm_category
def _accuracy_tensor(self, theta):
"""Return the accuracy tensor.
theta[j,k,k'] = P( annotator j emits k' | real class is k)
"""
nannotators = self.nannotators
nclasses = self.nclasses
accuracy = np.empty((nannotators, nclasses, nclasses))
for j in range(nannotators):
for k in range(nclasses):
accuracy[j,k,:] = self._theta_to_categorical(theta[j], k)
return accuracy
def _missing_mask(self, annotations):
missing_mask = ~ is_valid(annotations)
missing_mask_nclasses = np.tile(missing_mask[:, :, None],
(1, 1, self.nclasses))
return missing_mask_nclasses
def _log_prior(self, theta=None):
"""Compute log probability of prior on the theta parameters."""
if theta is None:
theta = self.theta
log_prob = scipy.stats.beta._logpdf(theta, 2., 1.).sum()
return log_prob
##### Sampling posterior over parameters ##################################
[docs] def sample_posterior_over_accuracy(self, annotations, nsamples,
burn_in_samples = 100,
thin_samples = 5,
target_rejection_rate = 0.3,
rejection_rate_tolerance = 0.2,
step_optimization_nsamples = 500,
adjust_step_every = 100):
"""Return samples from posterior distribution over theta given data.
Samples are drawn using a variant of a Metropolis-Hasting Markov Chain
Monte Carlo (MCMC) algorithm. Sampling proceeds in two phases:
1) *step size estimation phase*: first, the step size in the
MCMC algorithm is adjusted to achieve a given rejection rate.
2) *sampling phase*: second, samples are collected using the
step size from phase 1.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
nsamples : int
Number of samples to return (i.e., burn-in and thinning samples
are not included)
burn_in_samples : int
Discard the first `burn_in_samples` during the initial burn-in
phase, where the Monte Carlo chain converges to the posterior
thin_samples : int
Only return one every `thin_samples` samples in order to reduce
the auto-correlation in the sampling chain. This is called
"thinning" in MCMC parlance.
target_rejection_rate : float
target rejection rate for the step size estimation phase
rejection_rate_tolerance : float
the step size estimation phase is ended when the rejection rate for
all parameters is within `rejection_rate_tolerance` from
`target_rejection_rate`
step_optimization_nsamples : int
number of samples to draw in the step size estimation phase
adjust_step_every : int
number of samples after which the step size is adjusted during
the step size estimation pahse
Returns
-------
samples : ndarray, shape = (n_samples, n_annotators)
samples[i,:] is one sample from the posterior distribution over the
parameters `theta`
"""
self._raise_if_incompatible(annotations)
nsamples = self._compute_total_nsamples(nsamples,
burn_in_samples,
thin_samples)
# mask missing annotations
missing_mask_nclasses = self._missing_mask(annotations)
# wrap objective function to give it to optimize_step_size and
# sample_distribution
_llhood = self._log_likelihood_core
_log_prior = self._log_prior
gamma = self.gamma
def _wrap_llhood(params, args):
theta = params
return (_llhood(annotations, gamma, theta, missing_mask_nclasses)
+ _log_prior(theta))
# TODO this save-reset is rather ugly, refactor: create copy of
# model and sample over it
# save internal parameters to reset at the end of sampling
save_params = (self.gamma, self.theta)
try:
# compute optimal step size for given target rejection rate
params_start = self.theta.copy()
params_upper = np.ones((self.nannotators,))
params_lower = np.zeros((self.nannotators,))
step = optimize_step_size(_wrap_llhood, params_start, None,
params_lower, params_upper,
step_optimization_nsamples,
adjust_step_every,
target_rejection_rate,
rejection_rate_tolerance)
# draw samples from posterior distribution over theta
samples = sample_distribution(_wrap_llhood, params_start, None,
step, nsamples,
params_lower, params_upper)
return self._post_process_samples(samples, burn_in_samples,
thin_samples)
finally:
# reset parameters
self.gamma, self.theta = save_params
##### Posterior distributions #############################################
[docs] def infer_labels(self, annotations):
"""Infer posterior distribution over label classes.
Compute the posterior distribution over label classes given observed
annotations, :math:`P( \mathbf{y} | \mathbf{x}, \\theta, \omega)`.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
Returns
-------
posterior : ndarray, shape = (n_items, n_classes)
posterior[i,k] is the posterior probability of class k given the
annotation observed in item i.
"""
self._raise_if_incompatible(annotations)
category = self._compute_category(annotations,
self.gamma,
self.theta)
return category