#------------------------------------------------------------------------------
# Copyright (c) 2012, Enthought, Inc.
# All rights reserved.
#------------------------------------------------------------------------------
from abc import ABCMeta, abstractmethod
from heapq import heappush, heappop
from itertools import count
import logging
from threading import Lock
logger = logging.getLogger(__name__)
[docs]class ScheduledTask(object):
""" An object representing a task in the scheduler.
"""
#: A sentinel object indicating that the result of the task is
#: undefined or that the task has not yet been executed.
undefined = object()
[docs] def __init__(self, callback, args, kwargs):
""" Initialize a ScheduledTask.
Parameters
----------
callback : callable
The callable to run when the task is executed.
args : tuple
The tuple of positional arguments to pass to the callback.
kwargs : dict
The dict of keyword arguments to pass to the callback.
"""
self._callback = callback
self._args = args
self._kwargs = kwargs
self._result = self.undefined
self._valid = True
self._pending = True
self._notify = None
#--------------------------------------------------------------------------
# Private API
#--------------------------------------------------------------------------
def _execute(self):
""" Execute the underlying task. This should only been called
by the scheduler loop.
"""
try:
if self._valid:
self._result = self._callback(*self._args, **self._kwargs)
if self._notify is not None:
self._notify(self._result)
finally:
self._notify = None
self._pending = False
#--------------------------------------------------------------------------
# Public API
#--------------------------------------------------------------------------
[docs] def notify(self, callback):
""" Set a callback to be run when the task is executed.
Parameters
----------
callback : callable
A callable which accepts a single argument which is the
results of the task. It will be invoked immediate after
the task is executed, on the main event loop thread.
"""
self._notify = callback
[docs] def pending(self):
""" Returns True if this task is pending execution, False
otherwise.
"""
return self._pending
[docs] def unschedule(self):
""" Unschedule the task so that it will not be executed. If
the task has already been executed, this call has no effect.
"""
self._valid = False
[docs] def result(self):
""" Returns the result of the task, or ScheduledTask.undefined
if the task has not yet been executed, was unscheduled before
execution, or raised an exception on execution.
"""
return self._result
[docs]class Application(object):
""" The application object which manages the top-level communication
protocol for serving Enaml views.
"""
__metaclass__ = ABCMeta
#: Private storage for the singleton application instance.
_instance = None
@staticmethod
[docs] def instance():
""" Get the global Application instance.
Returns
-------
result : Application or None
The global application instance, or None if one has not yet
been created.
"""
return Application._instance
[docs] def __new__(cls, *args, **kwargs):
""" Create a new Enaml Application.
There may be only one application instance in existence at any
point in time. Attempting to create a new Application when one
exists will raise an exception.
"""
if Application._instance is not None:
raise RuntimeError('An Application instance already exists')
self = super(Application, cls).__new__(cls)
Application._instance = self
return self
[docs] def __init__(self, factories):
""" Initialize an Enaml Application.
Parameters
----------
factories : iterable
An iterable of SessionFactory instances that will be used
to create the sessions for the application.
"""
self._all_factories = []
self._named_factories = {}
self._task_heap = []
self._counter = count()
self._heap_lock = Lock()
self.add_factories(factories)
#--------------------------------------------------------------------------
# Private API
#--------------------------------------------------------------------------
def _process_task(self, task):
""" Processes the given task, then dispatches the next task.
"""
try:
task._execute()
finally:
self._next_task()
def _next_task(self):
""" Pulls the next task off the heap and processes it on the
main gui thread.
"""
heap = self._task_heap
with self._heap_lock:
if heap:
priority, ignored, task = heappop(heap)
self.deferred_call(self._process_task, task)
#--------------------------------------------------------------------------
# Abstract API
#--------------------------------------------------------------------------
@abstractmethod
[docs] def start_session(self, name):
""" Start a new session of the given name.
This method will create a new session object for the requested
session type and return the new session_id. If the session name
is invalid, an exception will be raised.
Parameters
----------
name : str
The name of the session to start.
Returns
-------
result : str
The unique identifier for the created session.
"""
raise NotImplementedError
@abstractmethod
[docs] def end_session(self, session_id):
""" End the session with the given session id.
This method will close down the existing session. If the session
id is not valid, an exception will be raised.
Parameters
----------
session_id : str
The unique identifier for the session to close.
"""
raise NotImplementedError
@abstractmethod
[docs] def session(self, session_id):
""" Get the session for the given session id.
Parameters
----------
session_id : str
The unique identifier for the session to retrieve.
Returns
-------
result : Session or None
The session object with the given id, or None if the id
does not correspond to an active session.
"""
raise NotImplementedError
@abstractmethod
[docs] def sessions(self):
""" Get the currently active sessions for the application.
Returns
-------
result : list
The list of currently active sessions for the application.
"""
raise NotImplementedError
@abstractmethod
[docs] def start(self):
""" Start the application's main event loop.
"""
raise NotImplementedError
@abstractmethod
[docs] def stop(self):
""" Stop the application's main event loop.
"""
raise NotImplementedError
@abstractmethod
[docs] def deferred_call(self, callback, *args, **kwargs):
""" Invoke a callable on the next cycle of the main event loop
thread.
Parameters
----------
callback : callable
The callable object to execute at some point in the future.
*args, **kwargs
Any additional positional and keyword arguments to pass to
the callback.
"""
raise NotImplementedError
@abstractmethod
[docs] def timed_call(self, ms, callback, *args, **kwargs):
""" Invoke a callable on the main event loop thread at a
specified time in the future.
Parameters
----------
ms : int
The time to delay, in milliseconds, before executing the
callable.
callback : callable
The callable object to execute at some point in the future.
*args, **kwargs
Any additional positional and keyword arguments to pass to
the callback.
"""
raise NotImplementedError
@abstractmethod
[docs] def is_main_thread(self):
""" Indicates whether the caller is on the main gui thread.
Returns
-------
result : bool
True if called from the main gui thread. False otherwise.
"""
raise NotImplementedError
#--------------------------------------------------------------------------
# Public API
#--------------------------------------------------------------------------
[docs] def schedule(self, callback, args=None, kwargs=None, priority=0):
""" Schedule a callable to be executed on the event loop thread.
This call is thread-safe.
Parameters
----------
callback : callable
The callable object to be executed.
args : tuple, optional
The positional arguments to pass to the callable.
kwargs : dict, optional
The keyword arguments to pass to the callable.
priority : int, optional
The queue priority for the callable. Smaller values indicate
lower priority, larger values indicate higher priority. The
default priority is zero.
Returns
-------
result : ScheduledTask
A task object which can be used to unschedule the task or
retrieve the results of the callback after the task has
been executed.
"""
if args is None:
args = ()
if kwargs is None:
kwargs = {}
task = ScheduledTask(callback, args, kwargs)
heap = self._task_heap
with self._heap_lock:
needs_start = len(heap) == 0
item = (-priority, self._counter.next(), task)
heappush(heap, item)
if needs_start:
if self.is_main_thread():
self._next_task()
else:
self.deferred_call(self._next_task)
return task
[docs] def has_pending_tasks(self):
""" Get whether or not the application has pending tasks.
Returns
-------
result : bool
True if there are pending tasks. False otherwise.
"""
with self._heap_lock:
has_pending = len(self._heap) > 0
return has_pending
[docs] def add_factories(self, factories):
""" Add session factories to the application.
Parameters
----------
factories : iterable
An iterable of SessionFactory instances to add to the
application.
"""
all_factories = self._all_factories
named_factories = self._named_factories
for factory in factories:
name = factory.name
if name in named_factories:
msg = 'Multiple session factories named `%s`; ' % name
msg += 'replacing previous value.'
logger.warn(msg)
old_factory = named_factories.pop(name)
all_factories.remove(old_factory)
all_factories.append(factory)
named_factories[name] = factory
[docs] def discover(self):
""" Get a dictionary of session information for the application.
Returns
-------
result : list
A list of dicts of information about the available sessions.
"""
info = [
{'name': fact.name, 'description': fact.description}
for fact in self._all_factories
]
return info
[docs] def destroy(self):
""" Destroy this application instance.
Once an application is created, it must be destroyed before a
new application can be instantiated.
"""
for session in self.sessions():
self.end_session(session.session_id)
self._all_factories = []
self._named_factories = {}
Application._instance = None
#------------------------------------------------------------------------------
# Helper Functions
#------------------------------------------------------------------------------
[docs]def deferred_call(callback, *args, **kwargs):
""" Invoke a callable on the next cycle of the main event loop
thread.
This is a convenience function for invoking the same method on the
current application instance. If an application instance does not
exist, a RuntimeError will be raised.
Parameters
----------
callback : callable
The callable object to execute at some point in the future.
*args, **kwargs
Any additional positional and keyword arguments to pass to
the callback.
"""
app = Application.instance()
if app is None:
raise RuntimeError('Application instance does not exist')
app.deferred_call(callback, *args, **kwargs)
[docs]def timed_call(ms, callback, *args, **kwargs):
""" Invoke a callable on the main event loop thread at a specified
time in the future.
This is a convenience function for invoking the same method on the
current application instance. If an application instance does not
exist, a RuntimeError will be raised.
Parameters
----------
ms : int
The time to delay, in milliseconds, before executing the
callable.
callback : callable
The callable object to execute at some point in the future.
*args, **kwargs
Any additional positional and keyword arguments to pass to
the callback.
"""
app = Application.instance()
if app is None:
raise RuntimeError('Application instance does not exist')
app.timed_call(ms, callback, *args, **kwargs)
[docs]def is_main_thread():
""" Indicates whether the caller is on the main gui thread.
This is a convenience function for invoking the same method on the
current application instance. If an application instance does not
exist, a RuntimeError will be raised.
Returns
-------
result : bool
True if called from the main gui thread. False otherwise.
"""
app = Application.instance()
if app is None:
raise RuntimeError('Application instance does not exist')
return app.is_main_thread()
[docs]def schedule(self, callback, args=None, kwargs=None, priority=0):
""" Schedule a callable to be executed on the event loop thread.
This call is thread-safe.
This is a convenience function for invoking the same method on the
current application instance. If an application instance does not
exist, a RuntimeError will be raised.
Parameters
----------
callback : callable
The callable object to be executed.
args : tuple, optional
The positional arguments to pass to the callable.
kwargs : dict, optional
The keyword arguments to pass to the callable.
priority : int, optional
The queue priority for the callable. Smaller values indicate
lower priority, larger values indicate higher priority. The
default priority is zero.
Returns
-------
result : ScheduledTask
A task object which can be used to unschedule the task or
retrieve the results of the callback after the task has
been executed.
"""
app = Application.instance()
if app is None:
raise RuntimeError('Application instance does not exist')
return app.schedule(callback, args, kwargs, priority)