Source code for encore.events.event_manager

#
#
# (C) Copyright 2011-2022 Enthought, Inc., Austin, TX
# All right reserved.
#
# This file is open source software distributed according to the terms in LICENSE.txt
#
""" This module defines an event registry, notification and filtering class.

The main class of the module is the `EventManager`.
"""

# Standard library imports.
import logging
import itertools
import bisect
import heapq
import threading
import weakref
from types import MethodType
import traceback

# Logging.
logger = logging.getLogger(__name__)

# Local imports
from .abstract_event_manager import BaseEvent, BaseEventManager


###############################################################################
# Logging Trace Function.
###############################################################################
class LoggingTracer(object):
    """ A tracer object for event manager to log events.

    Usage
    -----
    event_manager.set_trace(LoggingTracer())

    """
    def __init__(self, logger=logger, level=logging.INFO):
        self._level = level
        self._logger = logger

    def __call__(self, name, func, *args, **kwds):
        self._logger.log(self._level,
                         'event_log: %s, function: %s,\nargs=%s,\nkwds=%s',
                         name, func, args, kwds)

###############################################################################
# Notifier classes for callables: lightweight weakref substitutes.
###############################################################################
class CallableNotifier(object):
    """ Notifier for general callables, whose strong reference is stored.
    """
    __slots__ = ['func']
    def __init__(self, func, notify=None, args=()):
        self.func = func

    def __call__(self):
        """ Return the original listener callable.
        """
        return self.func

class MethodNotifier(object):
    """ Notifiers for methods, for which weak refs of object is stored.
    """
    __slots__ = ['func', 'cls', 'obj', '_notify', '_args']
    def __init__(self, meth, notify=None, args=()):
        self.func = meth.__func__

        self.cls = meth.__self__.__class__
        self.obj = weakref.ref(meth.__self__, self.notify)
        if notify:
            self._notify = notify
            self._args = args

    def notify(self, ref):
        """ Notify the garbage collection listeners.
        """
        self._notify(*self._args)

    def __call__(self):
        """ Return the original listener method, or None if it no longer exists.
        """
        obj = self.obj
        objc = obj()
        if objc is None:
            # Bound method whose object has been garbage collected.
            return

        return MethodType(self.func, objc)

###############################################################################
# `EventInfo` Private Class.
###############################################################################
class EventInfo(object):
    """ A class which manages handling of a single event.
    """
    def __init__(self, cls):
        """ Constructor.

        Parameters:
        -----------
        cls : class
            Class of the event.
        """
        self.cls = cls
        self._priority_list = [] # sorted priority list
        self._priority_info = {}
        self._listener_filters = {}
        self._filter_keys = set() # to precompute filters on event emit
        self._disable = False

        self._priority_list_lock = threading.Lock()


    def connect(self, func, filter=None, priority=0, count=0):
        """ Add a listener for the event.


        Parameters
        ----------
        cls : class
            The class of events for which the listener is registered.

        func : callable
            A callable to be called when the event is emitted.  The function
            should expect one argument which is the event instance which was
            emitted.

        filter : dict
            Filters to match for before calling the listener. The listener is
            called only when the event matches all of the filter .

            Filter specification:
                - key: string which is extended name of an attribute of the
                    event instance. For example string 'source.name' will be
                    the attribute `event.source.name`
                - value: the value of the specified attribute.

        priority : int
            The priority of the listener. Higher priority listeners are called
            before lower priority listeners.

        count : int
            A unique integer to break a tie in priority. This is generally
            an incremental number assigned by EventManager in order of
            registration.

        Note
        ----

        Reconnecting an already connected listener will disconnect the
        old listener. This may have rammifications in changing the filters
        and the priority.

        The filtering is added so that future optimizations can be done
        on specific events with large number of handlers. For example there
        should be a fast way to filter key events to specific listeners rather
        than iterating through all listeners.

        """
        id = self.get_id(func)
        if id in self._priority_info:
            # Ensure a function is connected only once.
            # Reconnecting will update its sequence and filters.
            self._disconnect(id)
        with self._priority_list_lock:
            sub = self._get_notifier(func, self._listener_deleted)
            if filter:
                self._listener_filters[id] = filter
                for key in filter:
                    self._filter_keys.add(key)
            key = (-priority, count, sub)
            bisect.insort_left(self._priority_list, key)
            self._priority_info[id] = key

    def disconnect(self, func):
        """ Disconnects a listener from being notified about the event'

        Parameters
        ----------
        func : callable
            The callable which was registered.

        """
        self._disconnect(self.get_id(func))

    def _listener_deleted(self, id):
        if id in self._priority_info:
            self._disconnect(id)

    def _disconnect(self, id):
        with self._priority_list_lock:
            key = self._priority_info[id]
            idx = bisect.bisect_left(self._priority_list, key)
            del self._priority_info[id]
            del self._priority_list[idx]
            if id in self._listener_filters:
                del self._listener_filters[id]

    def get_id(self, func):
        """ Get an id as unique key for the function. """
        if type(func) is MethodType:
            obj = func.__self__
            if obj is None:
                # Unbound method
                return weakref.ref(func.__func__),weakref.ref(func.__self__.__class__)
            else:
                # Bound method.
                return weakref.ref(func.__func__),weakref.ref(func.__self__)
        else:
            return func

    def _get_notifier(self, func, notify=None):
        """ Notify is callable to be called when the bound func's object
        is garbage collected.
        """
        if type(func) is MethodType:
            if notify is None:
                args = ()
            else:
                args = (self.get_id(func),)
            return MethodNotifier(func, notify, args)
        else:
            return CallableNotifier(func, notify)

    def get_listeners(self, event):
        """ Return listeners which will be called on specified event.

        If ``event`` is None, all listeners are returned.
        If ``event`` is an event, only listeners which will be called for the
        event are returned (satisfying any filters on the listeners).
        """
        with self._priority_list_lock:
            if event is None or not self._listener_filters:
                return self._priority_list[:]
            ret = []
            l_filter = self._listener_filters
            for linfo in self._priority_list:
                listener = linfo[-1]
                id = self.get_id(listener())
                if id in l_filter:
                    for key, value in l_filter[id].items():
                        attr = event
                        try:
                            # Get extended attributes of the event.
                            for part in key.split('.'):
                                attr = getattr(attr, part)
                        except AttributeError as e:
                            logger.info('Error filtering listener: %s; %s',
                                        linfo, e)
                            break
                        if attr != value:
                            break
                    else:
                        ret.append(linfo)
                else:
                    ret.append(linfo)
            return ret

    def disable(self):
        """ Disable the event from generating notifications.
        """
        self._disable = True

    def enable(self):
        """ Enable the event again to generate notifications.
        """
        self._disable = False

    def is_enabled(self):
        """ Check if the event is enabled.
        """
        return not self._disable


###############################################################################
# `EventManager` Class.
###############################################################################

[docs]class EventManager(BaseEventManager): """ A single registry point for all application events. """ # store the length of the BaseEvent's __mro__ bmro_clip = -len(BaseEvent.__mro__)+1 def __init__(self): self.event_map = {} self.count = itertools.count() self._trace_func = None ########################################################################### # `EventManager` Interface ########################################################################### def register(self, cls): """ Register an event with the event manager. Calling this is not generally nececssary. An event is automatically registered the first time a listener connects to it. Parameters: ----------- cls : str The ``class`` of the event. """ if cls in self.event_map: raise ValueError('Event {0} already registered'.format(cls)) else: self.event_map[cls] = EventInfo(cls)
[docs] def connect(self, cls, func, filter=None, priority=0): """ Add a listener for the event. Parameters ---------- cls : class The class of events for which the listener is registered. func : callable A callable to be called when the event is emitted. The function should expect one argument which is the event instance which was emitted. filter : dict Filters to match for before calling the listener. The listener is called only when the event matches all of the filter . Filter specification: - key: string which is name of an attribute of the event instance. - value: the value of the specified attribute. priority : int The priority of the listener. Higher priority listeners are called before lower priority listeners. Note ---- Reconnecting an already connected listener will disconnect the old listener. This may have rammifications in changing the filters and the priority. The filtering is added so that future optimizations can be done on specific events with large number of handlers. For example there should be a fast way to filter key events to specific listeners rather than iterating through all listeners. """ if self._trace_func is not None: if self._trace_func('connect', self.connect, (cls, func, filter, priority)): return if cls not in self.event_map: self.register(cls) self.event_map[cls].connect(func, filter, priority, next(self.count))
[docs] def disconnect(self, cls, func): """ Disconnects a listener from being notified about the event' Parameters ---------- cls : class The class of events for which the listener is registered. func : callable The callable which was registered for that class. Raises ------ KeyError : if `func` is not already connected. """ if self._trace_func is not None: if self._trace_func('disconnect', self.disconnect, (cls, func)): return self.event_map[cls].disconnect(func)
[docs] def emit(self, event, block=True): """ Notifies all listeners about the event with the specified arguments. Parameters ---------- event : instance of :py:class:`BaseEvent` The :py:class:`BaseEvent` instance to emit. block : bool Whether to block the call until the event handling is finished. If block is False, the event will be emitted in a separate thread and the thread will be returned, so you can later query its status or do ``wait()`` on the thread. Note ---- Listeners of superclasses of the event are also called. Eg. a :py:class:`BaseEvent` listener will also be notified about any derived class events. """ if not block: t = threading.Thread(target=self.emit, args=(event, True), name='Event emit: {0}'.format(event)) t.start() return t trace_func = self._trace_func if trace_func is not None: if trace_func('emit', self.emit, (event,)): return cls = type(event) if not self.is_enabled(cls): return listeners = self.get_listeners(event, cls) event.pre_emit() for listener in listeners: try: if trace_func is not None: if trace_func('listen', listener, (event,)): continue listener(event) except Exception as e: logger.warn('Exception {0} occurred in listener: {1} for ' 'event: {2}:\n{3}'.format(e, listener, event, traceback.format_exc())) if event._handled: # Only enable when debugging -- very slow even if it doesn't get emitted because # it must still do the string formatting. #logger.debug('Event: {0} handled by listener: {1}'.format( # event, listener)) break event.post_emit()
def get_event(self, cls=None): """ Returns an ``EventInfo`` instance for the event. Parameters ---------- cls : class The class of the event we want the ``EventInfo`` for. If ``cls`` is ``None``, then all known event types are returned. Returns ------- event_info : An ``EventInfo`` instance for the class, or a dictionary mapping classes to ``EventInfo`` instances. """ if cls is None: return self.event_map else: return self.event_map[cls] def get_listeners(self, event, cls=None): """ Return listeners which will be called on specified event. Parameters ---------- event : event instance or class The event we want to get the listeners for. If ``event`` is an instance of BaseEvent(), the listeners which will be called for the event are returned (satisfying any filters on the listeners). If ``event`` is BaseEvent subclass, all listeners for specified event class are returned (but no filtering is performed). cls : BaseEvent subclass ``cls`` argument is generally not needed, it is for internal use. If ``cls`` is specified as a subclass of ``BaseEvent``, then only listeners for the specified event class and superclasses are returned. """ evt_map = self.event_map if cls is None: if isinstance(event, BaseEvent): cls = type(event) else: cls = event event = None classes = self.get_event_hierarchy(cls) listeners = heapq.merge(*[evt_map[cls].get_listeners(event) for cls in classes if cls in evt_map]) listeners = (l[-1]() for l in listeners) return listeners
[docs] def disable(self, cls): """ Disable the event from generating notifications. Parameters ---------- cls : class The class of events which we want to disable. """ if cls not in self.event_map: self.register(cls) self.event_map[cls].disable()
[docs] def enable(self, cls): """ Enable the event again to generate notifications. Parameters ---------- cls : class The class of events which we want to enable. """ if cls not in self.event_map: self.register(cls) self.event_map[cls].enable()
[docs] def is_enabled(self, cls): """ Check if the event is enabled. Parameters ---------- cls : class The class of events which we want check the status of. """ for cls in self.get_event_hierarchy(cls): if cls in self.event_map and not self.event_map[cls].is_enabled(): return False return True
def get_event_hierarchy(self, cls): """ The the sequence of event classes which are notified for given cls. """ return cls.__mro__[:self.bmro_clip] def set_trace(self, func): """ Set a trace method for various actions performed. func is a callable which takes three arguments: name, method and args name - str An identifiable name of the method being called Either of 'connect', 'disconnect', 'emit' or 'listen' The subsequent arguments depend on this value. method - callable The method which would be called as a consequence of the action If name=='listen', this is the listener which would be called. args - tuple The arguments specific to each method name are specified below: connect - cls, func, filter, priority disconnect - cls, func emit - event listen - event If the trace function returns a True-like value: the corresponding action is not performed. For listen action, the corresponding listener is not called, but subsequent ones are called (depending on return value for their trace function calls). Note: Calling set_trace with None as the callable argument removes existing trace function set. Also, only a single trace method can be active at a time, calling set_trace removes the existing trace function. """ self._trace_func = func