Source code for pyanno.util

# Copyright (c) 2011, Enthought, Ltd.
# Authors: Pietro Berkes <pberkes@enthought.com>, Andrey Rzhetsky,
#          Bob Carpenter
# License: Modified BSD license (2-clause)

"""Utility functions."""

import numpy as np
from numpy import log
from numpy.core import getlimits
from scipy.special import gammaln
import time

import logging
logger = logging.getLogger(__name__)

#: In annotations arrays, this is the value used to indicate missing values
MISSING_VALUE = -1

#: Smallest possible floating point number, somtimes used instead of -np.inf
#: to make numberical calculation return a meaningful value
SMALLEST_FLOAT = getlimits.finfo(np.float).min


[docs]class PyannoValueError(ValueError): """ValueError subclass raised by pyAnno functions and methods. """ pass
[docs]def random_categorical(distr, nsamples): """Return an array of samples from a categorical distribution. Arguments --------- distr : ndarray distr[i] is the probability of item i nsamples : int Number of samples to draw from the distribution Returns ------- samples : ndarray, shape = (n_samples, ) Samples from the distribution """ assert np.allclose(distr.sum(), 1., atol=1e-8) cumulative = distr.cumsum() return cumulative.searchsorted(np.random.random(nsamples))
[docs]def ninf_to_num(x): """Substitute -inf with smallest floating point number.""" is_neg_inf = np.isneginf(x) x[is_neg_inf] = SMALLEST_FLOAT return x
[docs]def dirichlet_llhood(theta, alpha): """Compute the log likelihood of theta under Dirichlet(alpha). Arguments --------- theta : ndarray Categorical probability distribution. theta[i] is the probability of item i. Elements of the array have to sum to 1.0 (not forced for efficiency reasons) alpha : ndarray Parameters of the Dirichlet distribution Returns ------- log_likelihood : float Log lihelihood of theta given alpha """ # substitute -inf with SMALLEST_FLOAT, so that 0*log(0) is 0 when necessary log_theta = ninf_to_num(log(theta)) #log_theta = np.nan_to_num(log_theta) return (gammaln(alpha.sum()) - (gammaln(alpha)).sum() + ((alpha - 1.) * log_theta).sum()) # TODO remove default condition when x[i] == 0.
[docs]def normalize(x, dtype=float): """Returns a normalized distribution (sums to 1.0). If x consists only of zero element, the returned array has elements 1/n , where n is the length of x. """ x = np.asarray(x, dtype=dtype) z = x.sum() if z <= 0: x = np.ones_like(x) z = float(len(x)) return x / z
[docs]def create_band_matrix(shape, diagonal_elements): """Create a symmetrical band matrix from a list of elements. Arguments --------- shape : int Width of the matrix diagonal_elements : list or array List of elements in the first row. If the list is smaller than `shape`, the last element is used to fill the the remaining items. """ diagonal_elements = np.asarray(diagonal_elements) def diag(i,j): x = np.absolute(i-j) x = np.minimum(diagonal_elements.shape[0]-1, x).astype(int) return diagonal_elements[x] return np.fromfunction(diag, (shape, shape)) # TODO clean up and simplify and rename
[docs]def compute_counts(annotations, nclasses): """Transform annotation data in counts format. At the moment, it is hard coded for 8 annotators, 3 annotators active at any time. Arguments --------- annotations : ndarray, shape = (n_items, 8) Annotations array nclasses : int Numer of label classes Returns ------- data : ndarray, shape = (n_classes^3, 9) data[i,m] is the number of times the combination of annotators number m voted according to pattern i """ index = np.array([[0, 1, 2], [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6], [5, 6, 7], [0, 6, 7], [0, 1, 7]], int) m = annotations.shape[0] n = annotations.shape[1] annotations = np.asarray(annotations, dtype=int) if n != 8: raise ValueError('Number of annotators must be 8') # compute counts of 3-annotator patterns for 8 triplets of annotators data = np.zeros((nclasses ** 3, 8), dtype=int) # transform each triple of annotations into a code in base `nclasses` for i in range(m): ind = np.where(annotations[i, :] >= 0) code = annotations[i, ind[0][0]] * (nclasses ** 2) +\ annotations[i, ind[0][1]] * nclasses +\ annotations[i, ind[0][2]] # o = index of possible combination of annotators in the loop design o = -100 for j in range(8): k = 0 for l in range(3): if index[j, l] == ind[0][l]: k += 1 if k == 3: o = j if o >= 0: data[code, o] += 1 else: logger.debug(str(code) + " " + str(ind) + " = homeless code") return data
[docs]def labels_count(annotations, nclasses, missing_val=MISSING_VALUE): """Compute the total count of labels in observed annotations. Arguments --------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation made by annotator j on item i nclasses : int Number of label classes in `annotations` missing_val : int Value used to indicate missing values in the annotations. Default is :attr:`MISSING_VALUE` Returns ------- count : ndarray, shape = (n_classes, ) count[k] is the number of elements of class k in `annotations` """ valid = annotations!=missing_val nobservations = valid.sum() if nobservations == 0: # no valid observations raise PyannoValueError('No valid annotations') return np.bincount(annotations[valid], minlength=nclasses)
[docs]def labels_frequency(annotations, nclasses, missing_val=MISSING_VALUE): """Compute the total frequency of labels in observed annotations. Arguments --------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation made by annotator j on item i nclasses : int Number of label classes in `annotations` missing_val : int Value used to indicate missing values in the annotations. Default is :attr:`MISSING_VALUE` Returns ------- freq : ndarray, shape = (n_classes, ) freq[k] is the frequency of elements of class k in `annotations`, i.e. their count over the number of total of observed (non-missing) elements """ valid = annotations!=missing_val nobservations = valid.sum() if nobservations == 0: # no valid observations raise PyannoValueError('No valid annotations') return (np.bincount(annotations[valid], minlength=nclasses) / float(nobservations))
[docs]def is_valid(annotations): """Return True if annotation is valid. An annotation is valid if it is not equal to the missing value, MISSING_VALUE. """ return annotations != MISSING_VALUE
[docs]def majority_vote(annotations): """Compute an estimate of the real class by majority vote. In case of ties, return the class with smallest number. Arguments --------- annotations : ndarray, shape = (n_items, n_annotators) annotations[i,j] is the annotation made by annotator j on item i Return ------ vote : ndarray, shape = (n_items, ) vote[i] is the majority vote estimate for item i """ nitems = annotations.shape[0] valid = is_valid(annotations) vote = np.empty((nitems,), dtype=int) for i in xrange(nitems): count = np.bincount(annotations[i,valid[i,:]]) vote[i] = count.argmax() return vote
[docs]def string_wrap(st, mode): st = str(st) if mode == 1: st = "\033[1;29m" + st + "\033[0m" elif mode == 2: st = "\033[1;34m" + st + "\033[0m" elif mode == 3: st = "\033[1;44m" + st + "\033[0m" elif mode == 4: st = "\033[1;35m" + st + "\033[0m" elif mode == 5: st = "\033[1;33;44m" + st + "\033[0m" elif mode == 5: st = "\033[1;47;34m" + st + "\033[0m" else: st = st + ' ' return st
[docs]class benchmark(object): """Simple context manager to simplify benchmarking. Usage: :: with benchmark('fast computation'): do_something() """ def __init__(self,name): self.name = name def __enter__(self): print '---- start ----' self.start = time.time() def __exit__(self,ty,val,tb): end = time.time() print '---- stop ----' print("%s : %0.3f seconds" % (self.name, end-self.start)) return False

Table Of Contents