Source code for traits_futures.background_progress
# (C) Copyright 2018-2019 Enthought, Inc., Austin, TX
# All rights reserved.
"""
Support for a progress-reporting background call.
The code in this module supports an arbitrary callable that accepts a
"progress" named argument, and can use that argument to submit progress
information.
Every progress submission also marks a point where the callable can
be cancelled.
"""
from __future__ import absolute_import, print_function, unicode_literals
from traits.api import (
Any, Bool, Callable, Dict, Event, HasStrictTraits, HasTraits, Instance,
on_trait_change, Property, Str, Tuple, Unicode)
from traits_futures.exception_handling import marshal_exception
from traits_futures.future_states import (
CANCELLED, CANCELLING, EXECUTING, FAILED, COMPLETED, WAITING,
CANCELLABLE_STATES, DONE_STATES, FutureState)
# Message types for messages from ProgressBackgroundTask
# to ProgressFuture.
#: Task was cancelled before it started. No arguments.
INTERRUPTED = "interrupted"
#: Task started executing. No arguments.
STARTED = "started"
#: Task failed with an exception. Argument gives exception information.
RAISED = "raised"
#: Task succeeded and returned a result. Argument is the result.
RETURNED = "returned"
#: Task sends progress. Argument is a single object giving progress
#: information. This module does not interpret the contents of the argument.
PROGRESS = "progress"
class _ProgressCancelled(Exception):
"""
Exception raised when progress reporting is interrupted by
task cancellation.
"""
[docs]class ProgressReporter(object):
"""
Object used by the target callable to report progress.
"""
def __init__(self, message_sender, cancel_event):
self.message_sender = message_sender
self.cancel_event = cancel_event
[docs] def report(self, progress_info):
"""
Send progress information to the linked future.
The ``progress_info`` object will eventually be sent to the
corresponding future's ``progress`` event trait.
Parameters
----------
progress_info : object
An arbitrary object representing progress. Ideally, this
should be immutable and pickleable.
"""
if self.cancel_event.is_set():
raise _ProgressCancelled("Task was cancelled")
self.message_sender.send((PROGRESS, progress_info))
[docs]class ProgressBackgroundTask(object):
"""
Background portion of a progress background task.
This provides the callable that will be submitted to the thread pool, and
sends messages to communicate with the ProgressFuture.
"""
def __init__(self, callable, args, kwargs, message_sender, cancel_event):
self.callable = callable
self.args = args
self.kwargs = kwargs
self.message_sender = message_sender
self.cancel_event = cancel_event
def __call__(self):
progress = ProgressReporter(
message_sender=self.message_sender,
cancel_event=self.cancel_event,
)
self.kwargs["progress"] = progress.report
with self.message_sender:
if self.cancel_event.is_set():
self.send(INTERRUPTED)
return
self.send(STARTED)
try:
result = self.callable(*self.args, **self.kwargs)
except _ProgressCancelled:
self.send(INTERRUPTED)
except BaseException as e:
self.send(RAISED, marshal_exception(e))
del e
else:
self.send(RETURNED, result)
[docs] def send(self, message_type, message_args=None):
"""
Send a message to the linked future.
Sends a pair consisting of a string giving the message type along with
an object providing any relevant arguments. The interpretation of the
arguments depends on the message type.
Parameters
----------
message_type : string
Type of the message to be sent.
message_args : object, optional
Any arguments relevant to the message. Ideally, should be
pickleable and immutable. If not provided, ``None`` is sent.
"""
self.message_sender.send((message_type, message_args))
[docs]class ProgressFuture(HasStrictTraits):
"""
Object representing the front-end handle to a ProgressBackgroundTask.
"""
#: The state of the background task, to the best of the knowledge of
#: this future.
state = FutureState
#: True if this task can be cancelled, else False.
cancellable = Property(Bool())
#: True if we've received the final message from the background task,
#: else False. `True` indicates either that the background task
#: succeeded, or that it raised, or that it was cancelled.
done = Property(Bool())
#: Event fired whenever a progress message arrives from the background.
progress = Event(Any())
@property
def result(self):
"""
Result of the background task. Raises an ``Attributerror`` on access if
no result is available (because the background task failed, was
cancelled, or has not yet completed).
Note: this is deliberately a regular Python property rather than a
Trait, to discourage users from attaching Traits listeners to
it. Listen to the state or its derived traits instead.
"""
if self.state != COMPLETED:
raise AttributeError("No result available for this call.")
return self._result
@property
def exception(self):
"""
Information about any exception raised by the background call. Raises
an ``AttributeError`` on access if no exception was raised (because the
call succeeded, was cancelled, or has not yet completed).
Note: this is deliberately a regular Python property rather than a
Trait, to discourage users from attaching Traits listeners to
it. Listen to the state or its derived traits instead.
"""
if self.state != FAILED:
raise AttributeError("No exception has been raised for this call.")
return self._exception
[docs] def cancel(self):
"""
Method that can be called from the main thread to
indicate that the task should be cancelled (provided
it hasn't already started running).
"""
# In the interests of catching coding errors early in client
# code, we're strict about what states we allow cancellation
# from. Some applications may want to weaken the error below
# to a warning, or just do nothing on an invalid cancellation.
if not self.cancellable:
raise RuntimeError(
"Can only cancel a queued or executing task. "
"Task state is {!r}".format(self.state))
self._cancel_event.set()
self.state = CANCELLING
# Private traits ##########################################################
#: Private event used to request cancellation of this task. Users
#: should call the cancel() method instead of using this event.
_cancel_event = Any()
#: Result from the background task.
_result = Any()
#: Exception information from the background task.
_exception = Tuple(Unicode(), Unicode(), Unicode())
#: Object that receives messages from the background task.
_message_receiver = Instance(HasTraits)
#: Event fired when the background task is on the point of exiting.
#: This is mostly used for internal bookkeeping.
_exiting = Event()
# Private methods #########################################################
@on_trait_change('_message_receiver:done')
def _send_exiting_event(self):
self._exiting = True
@on_trait_change('_message_receiver:message')
def _process_message(self, message):
message_type, message_arg = message
method_name = "_process_{}".format(message_type)
getattr(self, method_name)(message_arg)
def _process_interrupted(self, none):
assert self.state in (CANCELLING,)
self.state = CANCELLED
def _process_started(self, none):
assert self.state in (WAITING, CANCELLING)
if self.state == WAITING:
self.state = EXECUTING
def _process_raised(self, exception_info):
assert self.state in (EXECUTING, CANCELLING)
if self.state == EXECUTING:
self._exception = exception_info
self.state = FAILED
else:
self.state = CANCELLED
def _process_returned(self, result):
assert self.state in (EXECUTING, CANCELLING)
if self.state == EXECUTING:
self._result = result
self.state = COMPLETED
else:
self.state = CANCELLED
def _process_progress(self, progress_info):
assert self.state in (EXECUTING, CANCELLING)
if self.state == EXECUTING:
self.progress = progress_info
def _get_cancellable(self):
return self.state in CANCELLABLE_STATES
def _get_done(self):
return self.state in DONE_STATES
def _state_changed(self, old_state, new_state):
old_cancellable = old_state in CANCELLABLE_STATES
new_cancellable = new_state in CANCELLABLE_STATES
if old_cancellable != new_cancellable:
self.trait_property_changed(
"cancellable", old_cancellable, new_cancellable)
old_done = old_state in DONE_STATES
new_done = new_state in DONE_STATES
if old_done != new_done:
self.trait_property_changed("done", old_done, new_done)
[docs]class BackgroundProgress(HasStrictTraits):
"""
Object representing the background task to be executed.
"""
#: The callable to be executed.
callable = Callable()
#: Positional arguments to be passed to the callable.
args = Tuple()
#: Named arguments to be passed to the callable.
kwargs = Dict(Str(), Any())
[docs] def future_and_callable(
self, cancel_event, message_sender, message_receiver):
"""
Return a future and a linked background callable.
Parameters
----------
cancel_event : threading.Event
Event used to request cancellation of the background task.
message_sender : MessageSender
Object used by the background task to send messages to the
UI. Supports the context manager protocol, and provides a
'send' method.
message_receiver : MessageReceiver
Object that remains in the main thread and receives messages sent
by the message sender. This is a HasTraits subclass with
a 'message' Event trait that can be listened to for arriving
messages.
Returns
-------
future : ProgressFuture
Foreground object representing the state of the running
calculation.
runner : ProgressBackgroundTask
Callable to be executed in the background.
"""
if "progress" in self.kwargs:
raise TypeError("progress may not be passed as a named argument")
future = ProgressFuture(
_cancel_event=cancel_event,
_message_receiver=message_receiver,
)
runner = ProgressBackgroundTask(
callable=self.callable,
args=self.args,
# Convert TraitsDict to a regular dict
kwargs=dict(self.kwargs),
message_sender=message_sender,
cancel_event=cancel_event,
)
return future, runner