# Copyright (c) 2011, Enthought, Ltd.
# Authors: Pietro Berkes <pberkes@enthought.com>, Andrey Rzhetsky
# License: Modified BSD license (2-clause)
"""This module defines model B-with-theta, optimized for a loop design.
The implementation assumes that there are a total or 8 annotators. Each item is
annotated by a triplet of annotators, according to the loop design described
in Rzhetsky et al., 2009.
E.g., for 16 items the loop design looks like this (`A` indicates a label,
`*` indicates a missing value): ::
A A A * * * * *
A A A * * * * *
* A A A * * * *
* A A A * * * *
* * A A A * * *
* * A A A * * *
* * * A A A * *
* * * A A A * *
* * * * A A A *
* * * * A A A *
* * * * * A A A
* * * * * A A A
A * * * * * A A
A * * * * * A A
A A * * * * * A
A A * * * * * A
"""
import numpy as np
import scipy.optimize
import scipy.stats
from traits.api import Int, Array
from pyanno.abstract_model import AbstractModel
from pyanno.sampling import optimize_step_size, sample_distribution
from pyanno.util import (random_categorical, compute_counts,
SMALLEST_FLOAT, MISSING_VALUE, labels_frequency,
is_valid, ninf_to_num)
import logging
logger = logging.getLogger(__name__)
# map of `n` to list of all possible triplets of `n` elements
_triplet_combinations = {}
def _get_triplet_combinations(n):
"""Return array of all possible combinations of n elements in triplets.
"""
if not _triplet_combinations.has_key(n):
_triplet_combinations[n] = (
np.array([i for i in np.ndindex(n,n,n)]) )
return _triplet_combinations[n]
[docs]class ModelBtLoopDesign(AbstractModel):
"""Implementation of Model B-with-theta from (Rzhetsky et al., 2009).
The model assumes the existence of "true" underlying labels for each item,
which are drawn from a categorical distribution, gamma. Annotators report
this labels with some noise.
This model is closely related to :class:`~ModelB`, but, crucially,
the noise distribution is described by a small number of parameters (one
per annotator), which makes their estimation efficient and less sensitive
to local optima.
The model parameters are:
- gamma[k] is the probability of label k
- theta[j] parametrized the probability that annotator j reports label k'.
More specifically, P( annotator j chooses k' | real label = k) is
theta[j] for k' = k, or (1 - theta[j]) / sum(theta) if k' != k .
This implementation is optimized for he loop design introduced in
(Rzhetsky et al., 2009), which assumes that each item is annotated by 3
out of 8 annotators. For a more general implementation, see
:class:`~ModelBt`
See the documentation for a more detailed description of the model.
**Reference**
* Rzhetsky A., Shatkay, H., and Wilbur, W.J. (2009). "How to get the most
from your curation effort", PLoS Computational Biology, 5(5).
"""
nclasses = Int
nannotators = Int(8)
# number of annotators rating each item in the loop design
nannotators_per_item = Int(3)
gamma = Array(dtype=float, shape=(None,))
theta = Array(dtype=float, shape=(None,))
def __init__(self, nclasses, gamma, theta, **traits):
"""Create an instance of ModelB.
Arguments
----------
nclasses : int
Number of possible annotation classes
nannotators : int
Number of annotators
gamma : ndarray, shape = (n_classes, )
gamma[k] is the prior probability of label class k
theta : ndarray, shape = (n_annotators, )
theta[j] parametrizes the accuracy of annotator j. Specifically,
`P( annotator j chooses k' | real label = k)` is
`theta[j]` for k' = k, or `(1 - theta[j]) / sum(theta)`
if `k' != k `.
"""
self.nclasses = nclasses
self.gamma = gamma
self.theta = theta
super(ModelBtLoopDesign, self).__init__(**traits)
##### Model and data generation methods ###################################
@staticmethod
[docs] def create_initial_state(nclasses, gamma=None, theta=None):
"""Factory method returning a model with random initial parameters.
It is often more convenient to use this factory method over the
constructor, as one does not need to specify the initial model
parameters.
The parameters theta and gamma, controlling accuracy and prevalence,
are initialized at random as follows:
:math:`\\theta_j \sim \mathrm{Uniform}(0.6, 0.95)`
:math:`\gamma \sim \mathrm{Dirichlet}(2.0)`
Arguments
---------
nclasses : int
number of categories
gamma : nparray
An array of floats with size that holds the probability of each
annotation value. Default is None
theta : nparray
An array of floats that the parameters of P( v_i | psi ) (one for
each annotator)
Returns
-------
model : :class:`~ModelBtLoopDesign`
Instance of ModelBtLoopDesign
"""
if gamma is None:
gamma = ModelBtLoopDesign._random_gamma(nclasses)
if theta is None:
nannotators = 8
theta = ModelBtLoopDesign._random_theta(nannotators)
model = ModelBtLoopDesign(nclasses, gamma, theta)
return model
@staticmethod
def _random_gamma(nclasses):
beta = 2.*np.ones((nclasses,))
return np.random.dirichlet(beta)
@staticmethod
def _random_theta(nannotators):
return np.random.uniform(low=0.6, high=0.95,
size=(nannotators,))
[docs] def generate_labels(self, nitems):
"""Generate random labels from the model."""
return random_categorical(self.gamma, nitems)
[docs] def generate_annotations_from_labels(self, labels):
"""Generate random annotations from the model, given labels
The method samples random annotations from the conditional probability
distribution of annotations, :math:`x_i^j`
given labels, :math:`y_i`.
Arguments
----------
labels : ndarray, shape = (n_items,), dtype = int
Set of "true" labels
Returns
-------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
"""
theta = self.theta
nannotators = self.nannotators
nitems = labels.shape[0]
nitems_per_loop = np.ceil(float(nitems) / nannotators)
annotations = np.empty((nitems, nannotators), dtype=int)
for j in xrange(nannotators):
for i in xrange(nitems):
distr = self._theta_to_categorical(theta[j], labels[i])
annotations[i,j] = random_categorical(distr, 1)
# mask annotation value according to loop design
for l in xrange(nannotators):
label_idx = np.arange(l+self.nannotators_per_item, l+nannotators) % 8
annotations[l*nitems_per_loop:(l+1)*nitems_per_loop,
label_idx] = MISSING_VALUE
return annotations
[docs] def generate_annotations(self, nitems):
"""Generate a random annotation set from the model.
Sample a random set of annotations from the probability distribution
defined the current model parameters:
1) Label classes are generated from the prior distribution, pi
2) Annotations are generated from the conditional distribution of
annotations given classes, parametrized by theta
Arguments
---------
nitems : int
Number of items to sample
Returns
-------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
"""
labels = self.generate_labels(nitems)
return self.generate_annotations_from_labels(labels)
def _theta_to_categorical(self, theta, psi):
"""Returns P( v_i = psi | theta_i ) as a distribution."""
distr = np.empty((self.nclasses,))
distr.fill((1.-theta)/(self.nclasses-1.))
distr[psi] = theta
assert np.allclose(distr.sum(), 1.)
return distr
##### Parameters estimation methods #######################################
[docs] def mle(self, annotations, estimate_gamma=True):
"""Computes maximum likelihood estimate (MLE) of parameters.
Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of
observed annotations using maximum likelihood estimation.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
estimate_gamma : bool
If True, the parameters :attr:`gamma` are estimated by the empirical
class frequency. If False, :attr:`gamma` is left unchanged.
"""
self._raise_if_incompatible(annotations)
# wrap log likelihood function to give it to optimize.fmin
_llhood_counts = self._log_likelihood_counts
def _wrap_llhood(params, counts):
self.gamma, self.theta = self._vector_to_params(params)
# minimize *negative* likelihood
return - _llhood_counts(counts)
self._parameter_estimation(_wrap_llhood, annotations,
estimate_gamma=estimate_gamma)
[docs] def map(self, annotations, estimate_gamma=True):
"""Computes maximum a posteriori (MAP) estimate of parameters.
Estimate the parameters :attr:`theta` and :attr:`gamma` from a set of
observed annotations using maximum a posteriori estimation.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
estimate_gamma : bool
If True, the parameters :attr:`gamma` are estimated by the empirical
class frequency. If False, :attr:`gamma` is left unchanged.
"""
self._raise_if_incompatible(annotations)
# wrap log likelihood function to give it to optimize.fmin
_llhood_counts = self._log_likelihood_counts
_log_prior = self._log_prior
def _wrap_llhood(params, counts):
self.gamma, self.theta = self._vector_to_params(params)
# minimize *negative* posterior probability of parameters
return - (_llhood_counts(counts) + _log_prior())
self._parameter_estimation(_wrap_llhood, annotations,
estimate_gamma=estimate_gamma)
def _parameter_estimation(self, objective, annotations,
estimate_gamma=True):
counts = compute_counts(annotations, self.nclasses)
params_start = self._random_initial_parameters(annotations,
estimate_gamma)
logger.info('Start parameters optimization...')
# TODO: use gradient, constrained optimization
params_best = scipy.optimize.fmin(objective,
params_start,
args=(counts,),
xtol=1e-4, ftol=1e-4,
disp=False, maxiter=10000)
logger.info('Parameters optimization finished')
# parse arguments and update
self.gamma, self.theta = self._vector_to_params(params_best)
def _random_initial_parameters(self, annotations, estimate_gamma):
if estimate_gamma:
# estimate gamma from observed annotations
gamma = labels_frequency(annotations, self.nclasses)
else:
gamma = ModelBtLoopDesign._random_gamma(self.nclasses)
theta = ModelBtLoopDesign._random_theta(self.nannotators)
return self._params_to_vector(gamma, theta)
def _params_to_vector(self, gamma, theta):
"""Convert the tuple (gamma, theta) to a parameters vector.
Used to interface with the optimization routines.
"""
return np.r_[gamma[:-1], theta]
def _vector_to_params(self, params):
"""Convert a parameters vector to (gamma, theta) tuple.
Used to interface with the optimization routines.
"""
nclasses = self.nclasses
gamma = np.zeros((nclasses,))
gamma[:nclasses-1] = params[:nclasses-1]
gamma[-1] = 1. - gamma[:nclasses-1].sum()
theta = params[nclasses-1:]
return gamma, theta
##### Model likelihood methods ############################################
[docs] def log_likelihood(self, annotations):
"""Compute the log likelihood of a set of annotations given the model.
Returns :math:`\log P(\mathbf{x} | \gamma, \\theta)`,
where :math:`\mathbf{x}` is the array of annotations.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
Returns
-------
log_lhood : float
log likelihood of `annotations`
"""
self._raise_if_incompatible(annotations)
counts = compute_counts(annotations, self.nclasses)
return self._log_likelihood_counts(counts)
def _log_likelihood_counts(self, counts):
"""Compute the log likelihood of annotations given the model.
This method assumes the data is in counts format.
"""
# TODO: check if it's possible to replace these constraints with bounded optimization
# check boundary conditions
if (min(min(self.gamma), min(self.theta)) < 0.
or max(max(self.gamma), max(self.theta)) > 1.):
#return np.inf
return SMALLEST_FLOAT
llhood = 0.
# loop over the 8 combinations of annotators
for i in range(8):
# extract the theta parameters for this triplet
triplet_indices = np.arange(i, i+3) % self.nannotators
triplet_indices.sort()
theta_triplet = self.theta[triplet_indices]
# compute the likelihood for the triplet
llhood += self._log_likelihood_triplet(counts[:,i],
theta_triplet)
return llhood
def _log_likelihood_triplet(self, counts_triplet, theta_triplet):
"""Compute the log likelihood of data for one triplet of annotators.
Input:
counts_triplet -- count data for one combination of annotators
theta_triplet -- theta parameters of the current triplet
"""
# log \prod_n P(v_{ijk}^{n} | params)
# = \sum_n log P(v_{ijk}^{n} | params)
# = \sum_v_{ijk} count(v_{ijk}) log P( v_{ijk} | params )
#
# where n is n-th annotation of triplet {ijk}]
# compute P( v_{ijk} | params )
pf = self._pattern_frequencies(theta_triplet)
log_pf = ninf_to_num(np.log(pf))
l = (counts_triplet * log_pf).sum()
return l
def _pattern_frequencies(self, theta_triplet):
"""Compute vector of P(v_{ijk}|params) for each combination of v_{ijk}.
"""
gamma = self.gamma
nclasses = self.nclasses
# list of all possible combinations of v_i, v_j, v_k elements
v_ijk_combinations = _get_triplet_combinations(nclasses)
# P( v_{ijk} | params ) = \sum_psi P( v_{ijk} | psi, params ) P( psi )
pf = 0.
not_theta = (1.-theta_triplet) / (nclasses-1.)
p_v_ijk_given_psi = np.empty_like(v_ijk_combinations, dtype=float)
for psi in range(nclasses):
for j in range(3):
p_v_ijk_given_psi[:,j] = np.where(v_ijk_combinations[:,j]==psi,
theta_triplet[j],
not_theta[j])
pf += p_v_ijk_given_psi.prod(1) * gamma[psi]
return pf
def _log_prior(self):
"""Compute log probability of prior on the theta parameters."""
log_prob = scipy.stats.beta._logpdf(self.theta, 2., 1.).sum()
return log_prob
##### Sampling posterior over parameters ##################################
[docs] def sample_posterior_over_accuracy(self, annotations, nsamples,
burn_in_samples = 100,
thin_samples = 5,
target_rejection_rate = 0.3,
rejection_rate_tolerance = 0.2,
step_optimization_nsamples = 500,
adjust_step_every = 100):
"""Return samples from posterior distribution over theta given data.
Samples are drawn using a variant of a Metropolis-Hasting Markov Chain
Monte Carlo (MCMC) algorithm. Sampling proceeds in two phases:
1) *step size estimation phase*: first, the step size in the
MCMC algorithm is adjusted to achieve a given rejection rate.
2) *sampling phase*: second, samples are collected using the
step size from phase 1.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
nsamples : int
Number of samples to return (i.e., burn-in and thinning samples
are not included)
burn_in_samples : int
Discard the first `burn_in_samples` during the initial burn-in
phase, where the Monte Carlo chain converges to the posterior
thin_samples : int
Only return one every `thin_samples` samples in order to reduce
the auto-correlation in the sampling chain. This is called
"thinning" in MCMC parlance.
target_rejection_rate : float
target rejection rate for the step size estimation phase
rejection_rate_tolerance : float
the step size estimation phase is ended when the rejection rate for
all parameters is within `rejection_rate_tolerance` from
`target_rejection_rate`
step_optimization_nsamples : int
number of samples to draw in the step size estimation phase
adjust_step_every : int
number of samples after which the step size is adjusted during
the step size estimation pahse
Returns
-------
samples : ndarray, shape = (n_samples, n_annotators)
samples[i,:] is one sample from the posterior distribution over the
parameters `theta`
"""
self._raise_if_incompatible(annotations)
nsamples = self._compute_total_nsamples(nsamples,
burn_in_samples,
thin_samples)
# optimize step size
counts = compute_counts(annotations, self.nclasses)
# wrap log likelihood function to give it to optimize_step_size and
# sample_distribution
_llhood_counts = self._log_likelihood_counts
_log_prior = self._log_prior
def _wrap_llhood(params, counts):
self.theta = params
return _llhood_counts(counts) + _log_prior()
# TODO this save-reset is rather ugly, refactor: create copy of
# model and sample over it
# save internal parameters to reset at the end of sampling
save_params = (self.gamma, self.theta)
try:
# compute optimal step size for given target rejection rate
params_start = self.theta.copy()
params_upper = np.ones((self.nannotators,))
params_lower = np.zeros((self.nannotators,))
step = optimize_step_size(_wrap_llhood, params_start, counts,
params_lower, params_upper,
step_optimization_nsamples,
adjust_step_every,
target_rejection_rate,
rejection_rate_tolerance)
# draw samples from posterior distribution over theta
samples = sample_distribution(_wrap_llhood, params_start, counts,
step, nsamples,
params_lower, params_upper)
return self._post_process_samples(samples, burn_in_samples,
thin_samples)
finally:
# reset parameters
self.gamma, self.theta = save_params
##### Posterior distributions #############################################
[docs] def infer_labels(self, annotations):
"""Infer posterior distribution over label classes.
Compute the posterior distribution over label classes given observed
annotations, :math:`P( \mathbf{y} | \mathbf{x}, \\theta, \omega)`.
Arguments
----------
annotations : ndarray, shape = (n_items, n_annotators)
annotations[i,j] is the annotation of annotator j for item i
Returns
-------
posterior : ndarray, shape = (n_items, n_classes)
posterior[i,k] is the posterior probability of class k given the
annotation observed in item i.
"""
self._raise_if_incompatible(annotations)
nitems = annotations.shape[0]
gamma = self.gamma
nclasses = self.nclasses
# get indices of annotators active in each row
valid_entries = is_valid(annotations).nonzero()
annotator_indices = np.reshape(valid_entries[1],
(nitems, self.nannotators_per_item))
valid_annotations = annotations[valid_entries]
valid_annotations = np.reshape(valid_annotations,
(nitems, self.nannotators_per_item))
# thetas of active annotators
theta_equal = self.theta[annotator_indices]
theta_not_equal = (1. - theta_equal) / (nclasses - 1.)
# compute posterior over psi
psi_distr = np.zeros((nitems, nclasses))
for psi in xrange(nclasses):
tmp = np.where(valid_annotations == psi,
theta_equal, theta_not_equal)
psi_distr[:,psi] = gamma[psi] * tmp.prod(1)
# normalize distribution
psi_distr /= psi_distr.sum(1)[:,np.newaxis]
return psi_distr
##### Verify input ########################################################
[docs] def are_annotations_compatible(self, annotations):
"""Check if the annotations are compatible with the models' parameters.
"""
if not super(ModelBtLoopDesign, self).are_annotations_compatible(
annotations):
return False
masked_annotations = np.ma.masked_equal(annotations, MISSING_VALUE)
# exactly 3 annotations per row
nvalid = (~masked_annotations.mask).sum(1)
if not np.all(nvalid == self.nannotators_per_item):
return False
return True