Source code for traits_futures.background_iteration

# (C) Copyright 2018-2019 Enthought, Inc., Austin, TX
# All rights reserved.

"""
Background task that sends results from an iteration.
"""
from __future__ import absolute_import, print_function, unicode_literals

import types

from traits.api import (
    Any, Bool, Callable, Dict, Event, HasStrictTraits, HasTraits, Instance,
    on_trait_change, Property, Str, Tuple, Unicode)

from traits_futures.exception_handling import marshal_exception
from traits_futures.future_states import (
    CANCELLED, CANCELLING, EXECUTING, FAILED, COMPLETED, WAITING,
    CANCELLABLE_STATES, DONE_STATES, FutureState)

# Message types for messages from IterationBackgroundTask to IterationFuture.
# The background iteration will emit exactly one of the following
# sequences of message types, where GENERATED* indicates zero-or-more
# GENERATED messages.
#
#   [INTERRUPTED]
#   [RAISED]
#   [STARTED, GENERATED*, INTERRUPTED]
#   [STARTED, GENERATED*, RAISED]
#   [STARTED, GENERATED*, EXHAUSTED]

#: Iteration was cancelled either before it started or during the
#: iteration. No arguments.
INTERRUPTED = "interrupted"

#: Iteration started executing. No arguments.
STARTED = "started"

#: Iteration failed with an exception, or there was
#: an exception on creation of the iterator. Argument gives
#: exception information.
RAISED = "raised"

#: Iteration completed normally. No arguments.
EXHAUSTED = "exhausted"

#: Message sent whenever the iteration yields a result.
#: Argument is the result generated.
GENERATED = "generated"


[docs]class IterationBackgroundTask(object): """ Iteration to be executed in the background. """ def __init__(self, callable, args, kwargs, message_sender, cancel_event): self.callable = callable self.args = args self.kwargs = kwargs self.message_sender = message_sender self.cancel_event = cancel_event def __call__(self): with self.message_sender: if self.cancel_event.is_set(): self.send(INTERRUPTED) return self.send(STARTED) try: iterable = iter(self.callable(*self.args, **self.kwargs)) except BaseException as e: self.send(RAISED, marshal_exception(e)) del e return while True: if self.cancel_event.is_set(): message, message_args = INTERRUPTED, None break try: result = next(iterable) except StopIteration: message, message_args = EXHAUSTED, None break except BaseException as e: message, message_args = RAISED, marshal_exception(e) # Make sure we're not keeping references to anything # in the exception details. Not needed on Python 3. del e break else: self.send(GENERATED, result) # Delete now, else we'll hang on to the reference to this # result until the next iteration, which could be some # arbitrary time in the future. del result # If the iterable is a generator, close it before we send the final # message. This ensures that any cleanup in the generator function # (e.g., as a result of leaving a with block, or executing a # finally clause) occurs promptly. if isinstance(iterable, types.GeneratorType): iterable.close() # Belt and braces: also delete the reference to the iterable. del iterable self.send(message, message_args)
[docs] def send(self, message_type, message_args=None): """ Send a message to the linked IterationFuture. Sends a pair consisting of a string giving the message type along with an object providing any relevant arguments. The interpretation of the arguments depends on the message type. Parameters ---------- message_type : string Type of the message to be sent. message_args : object, optional Any arguments relevant to the message. Ideally, should be pickleable and immutable. If not provided, ``None`` is sent. """ self.message_sender.send((message_type, message_args))
# IterationFuture states. These represent the futures' current state of # knowledge of the background iteration. An iteration starts out in WAITING # state and ends with one of COMPLETED, FAILED or CANCELLED. The possible # progressions of states are: # # WAITING -> CANCELLING -> CANCELLED # WAITING -> EXECUTING -> CANCELLING -> CANCELLED # WAITING -> EXECUTING -> FAILED # WAITING -> EXECUTING -> COMPLETED # # The ``result`` trait will only be fired when the state is EXECUTING; # no results events will be fired after cancelling.
[docs]class IterationFuture(HasStrictTraits): """ Foreground representation of an iteration executing in the background. """ #: The state of the background iteration, to the best of the knowledge of #: this future. state = FutureState #: True if this task can be cancelled, else False. cancellable = Property(Bool()) #: True if we've received the final message from the background iteration, #: else False. `True` indicates either that the background iteration #: succeeded, or that it raised, or that it was cancelled. done = Property(Bool()) #: Event fired whenever a result arrives from the background #: iteration. result_event = Event(Any()) @property def exception(self): """ Information about any exception raised by the background call. Raises an ``AttributeError`` on access if no exception was raised (because the call succeeded, was cancelled, or has not yet completed). Note: this is deliberately a regular Python property rather than a Trait, to discourage users from attaching Traits listeners to it. Listen to the state or its derived traits instead. """ if self.state != FAILED: raise AttributeError("No exception has been raised for this call.") return self._exception
[docs] def cancel(self): """ Method called from the main thread to request cancellation of the background job. """ # In the interests of catching coding errors early in client # code, we're strict about what states we allow cancellation # from. Some applications may want to weaken the error below # to a warning, or just do nothing on an invalid cancellation. if not self.cancellable: raise RuntimeError("Can only cancel a queued or executing task.") self._cancel_event.set() self.state = CANCELLING
# Private traits ########################################################## #: Private event used to request cancellation of this task. Users #: should call the cancel() method instead of using this event. _cancel_event = Any() #: Exception information from the background task. _exception = Tuple(Unicode(), Unicode(), Unicode()) #: Object that receives messages from the background task. _message_receiver = Instance(HasTraits) #: Event fired when the background task is on the point of exiting. #: This is mostly used for internal bookkeeping. _exiting = Event() # Private methods ######################################################### @on_trait_change('_message_receiver:done') def _send_exiting_event(self): self._exiting = True @on_trait_change('_message_receiver:message') def _process_message(self, message): message_type, message_arg = message method_name = "_process_{}".format(message_type) getattr(self, method_name)(message_arg) def _process_interrupted(self, none): assert self.state in (CANCELLING,) self.state = CANCELLED def _process_started(self, none): assert self.state in (WAITING, CANCELLING) if self.state == WAITING: self.state = EXECUTING def _process_raised(self, exception_info): assert self.state in (WAITING, EXECUTING, CANCELLING) if self.state in (EXECUTING, WAITING): self._exception = exception_info self.state = FAILED else: # Don't record the exception if the job was already cancelled. self.state = CANCELLED def _process_exhausted(self, none): assert self.state in (EXECUTING, CANCELLING) if self.state == EXECUTING: self.state = COMPLETED else: self.state = CANCELLED def _process_generated(self, result): assert self.state in (EXECUTING, CANCELLING) # Any results arriving after a cancellation request are ignored. if self.state == EXECUTING: self.result_event = result def _get_cancellable(self): return self.state in CANCELLABLE_STATES def _get_done(self): return self.state in DONE_STATES def _state_changed(self, old_state, new_state): old_cancellable = old_state in CANCELLABLE_STATES new_cancellable = new_state in CANCELLABLE_STATES if old_cancellable != new_cancellable: self.trait_property_changed( "cancellable", old_cancellable, new_cancellable) old_done = old_state in DONE_STATES new_done = new_state in DONE_STATES if old_done != new_done: self.trait_property_changed("done", old_done, new_done)
[docs]class BackgroundIteration(HasStrictTraits): """ Object representing the background iteration to be executed. """ #: The callable to be executed. This should return something iterable. callable = Callable() #: Positional arguments to be passed to the callable. args = Tuple() #: Named arguments to be passed to the callable. kwargs = Dict(Str(), Any())
[docs] def future_and_callable( self, cancel_event, message_sender, message_receiver): """ Return a future and a linked background callable. Parameters ---------- cancel_event : threading.Event Event used to request cancellation of the background job. message_sender : MessageSender Object used by the background job to send messages to the UI. Supports the context manager protocol, and provides a 'send' method. message_receiver : MessageReceiver Object that remains in the main thread and receives messages sent by the message sender. This is a HasTraits subclass with a 'message' Event trait that can be listened to for arriving messages. Returns ------- future : IterationFuture Foreground object representing the state of the running calculation. runner : IterationBackgroundTask Callable to be executed in the background. """ future = IterationFuture( _cancel_event=cancel_event, _message_receiver=message_receiver, ) runner = IterationBackgroundTask( callable=self.callable, args=self.args, # Convert TraitsDict to a regular dict kwargs=dict(self.kwargs), message_sender=message_sender, cancel_event=cancel_event, ) return future, runner