Source code for traits_futures.base_future

# (C) Copyright 2018-2024 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!

"""
Base class providing common pieces of the Future machinery.
"""

import abc
import logging

from traits.api import (
    Any,
    Bool,
    Callable,
    Enum,
    HasStrictTraits,
    observe,
    Property,
    Str,
    Tuple,
)

from traits_futures.exception_handling import marshal_exception
from traits_futures.future_states import (
    CANCELLABLE_STATES,
    CANCELLED,
    CANCELLING,
    COMPLETED,
    DONE_STATES,
    EXECUTING,
    FAILED,
    FutureState,
    WAITING,
)
from traits_futures.i_task_specification import IFuture

logger = logging.getLogger(__name__)

# Messages sent by the BaseTask, and interpreted by BaseFuture.

#: Custom message from the future. The argument is a pair
#: (message_type, message_args); the message type and message args
#: are interpreted by the future.
SENT = "sent"

#: Control message sent when the callable is abandoned before execution.
ABANDONED = "abandoned"

#: Control message sent before we start to process the target callable.
#: The argument is always ``None``.
STARTED = "started"

#: Control message sent when an exception was raised by the background
#: callable. The argument is a tuple containing exception information.
RAISED = "raised"

#: Control message sent to indicate that the background callable succeeded
#: and returned a result. The argument is that result.
RETURNED = "returned"

#: Message types that indicate a "final" message. After a message of this
#: type is received, no more messages will be received.
FINAL_MESSAGES = {ABANDONED, RAISED, RETURNED}

# The BaseFuture class maintains an internal state. That internal state maps to
# the user-facing state, but is more fine-grained, allowing the class to keep
# track of the internal consistency and invariants. For example, the
# user-facing CANCELLING state doesn't indicate whether the task has started
# or not; if it *has* already started, then a "start" message from the
# background task is invalid; otherwise, it's valid. So we split this into
# two internal states: _CANCELLING_BEFORE_STARTED and _CANCELLING_AFTER_STARTED
# to allow us to detect an invalid _task_started call.


#: Internal state attained when cancellation is requested before the task
#: starts.
_CANCELLING_BEFORE_STARTED = "cancelling_before_started"

#: Internal state attained when cancellation is requested after the task
#: starts.
_CANCELLING_AFTER_STARTED = "cancelling_after_started"

#: Internal state corresponding to a task that was abandoned due to
#: cancellation.
_CANCELLED_ABANDONED = "cancelled_abandoned"

#: Internal state corresponding to a task that failed after cancellation.
_CANCELLED_FAILED = "cancelled_failed"

#: Internal state corresponding to a task that completed after cancellation.
_CANCELLED_COMPLETED = "cancelled_completed"

#: Mapping from each internal state to the corresponding user-visible state.
_INTERNAL_STATE_TO_STATE = {
    WAITING: WAITING,
    EXECUTING: EXECUTING,
    COMPLETED: COMPLETED,
    FAILED: FAILED,
    _CANCELLING_BEFORE_STARTED: CANCELLING,
    _CANCELLING_AFTER_STARTED: CANCELLING,
    _CANCELLED_ABANDONED: CANCELLED,
    _CANCELLED_COMPLETED: CANCELLED,
    _CANCELLED_FAILED: CANCELLED,
}

#: Internal states corresponding to completed futures.
_DONE_INTERNAL_STATES = {
    internal_state
    for internal_state, state in _INTERNAL_STATE_TO_STATE.items()
    if state in DONE_STATES
}

#: Internal states corresponding to cancellable futures.
_CANCELLABLE_INTERNAL_STATES = {
    internal_state
    for internal_state, state in _INTERNAL_STATE_TO_STATE.items()
    if state in CANCELLABLE_STATES
}


[docs] class TaskCancelled(Exception): """ Exception raised within the background task on cancellation. """
class _StateTransitionError(Exception): """ Exception used to indicate a bad state transition. Users should never see this exception. It indicates an error in the sequence of operations received by the future from the background task, from the executor, or from the user. """
[docs] @IFuture.register class BaseFuture(HasStrictTraits): """ Convenience base class for the various flavours of Future. """ # IFuture interface #######################################################
[docs] def cancel(self): """ Request cancellation of the background task. A task in ``WAITING`` or ``EXECUTING`` state will immediately be moved to ``CANCELLING`` state. If the task is not in ``WAITING`` or ``EXECUTING`` state, this function does nothing. .. versionchanged:: 0.3.0 This method no longer raises for a task that isn't cancellable. In previous versions, :exc:`RuntimeError` was raised. Returns ------- cancelled : bool True if the task was cancelled, False if the task was not cancellable. """ if self.state in {WAITING, EXECUTING}: self._user_cancelled() logger.debug(f"{self} cancelled") return True else: logger.debug(f"{self} not cancellable; state is {self.state}") return False
[docs] def receive(self, message): """ Receive and process a message from the task associated to this future. This method is primarily for use by the executors, but may also be of use in testing. Parameters ---------- message : object The message received from the associated task. Returns ------- final : bool True if the received message should be the last one ever received from the paired task. """ message_type, message_arg = message method_name = "_task_{}".format(message_type) getattr(self, method_name)(message_arg) return message_type in FINAL_MESSAGES
# BaseFuture interface #################################################### #: The state of the background task, to the best of the knowledge of #: this future. One of the six constants ``WAITING``, ``EXECUTING``, #: ``COMPLETED``, ``FAILED``, ``CANCELLING`` or ``CANCELLED``. state = Property(FutureState) #: True if cancellation of the background task can be requested, else #: False. Cancellation of the background task can be requested only if #: the future's ``state`` is either ``WAITING`` or ``EXECUTING``. cancellable = Property(Bool()) #: True when communications from the background task are complete. #: At that point, no further state changes can occur for this future. #: This trait has value True if the ``state`` is one of ``COMPLETED``, #: ``FAILED``, or ``CANCELLED``. It's safe to listen to this trait #: for changes: it will always fire exactly once, and when it fires #: its value will be consistent with that of the ``state`` trait. done = Property(Bool()) @property def result(self): """ Result of the background task. This attribute is only available if the state of the future is ``COMPLETED``. If the future has not reached the ``COMPLETED`` state, any attempt to access this attribute will raise an ``AttributeError``. Returns ------- result : object The result obtained from the background task. Raises ------ AttributeError If the task is still executing, or was cancelled, or raised an exception instead of returning a result. """ if self.state != COMPLETED: raise AttributeError( "No result available. Task has not yet completed, " "or was cancelled, or failed with an exception. " "Task state is {}".format(self.state) ) return self._result @property def exception(self): """ Information about any exception raised by the background task. This attribute is only available if the state of this future is ``FAILED``. If the future has not reached the ``FAILED`` state, any attempt to access this attribute will raise an ``AttributeError.`` Returns ------- exc_info : tuple Tuple containing exception information in string form: (exception type, exception value, formatted traceback). Raises ------ AttributeError If the task is still executing, or was cancelled, or completed without raising an exception. """ if self.state != FAILED: raise AttributeError( "No exception information available. Task has " "not yet completed, or was cancelled, or completed " "without an exception. " "Task state is {}".format(self.state) ) return self._exception
[docs] def dispatch(self, message): """ Dispatch a message arriving from the associated BaseTask. This is a convenience function, and may be safely overridden by subclasses that want to use a different dispatch mechanism. For a message type ``msgtype``, it looks for a method called ``_process_<msgtype>`` and dispatches the message arguments to that method. Subclasses then only need to provide the appropriate ``_process_<msgtype>`` methods. Parameters ---------- message : object Message sent by the background task. The default implementation of this method expects the message to be in the form ``(message_type, message_args)`` with ``message_type`` a string. """ message_type, message_arg = message method_name = "_process_{}".format(message_type) getattr(self, method_name)(message_arg)
# State transitions ####################################################### # These methods represent state transitions in response to external events. def _task_sent(self, message): """ Automate dispatch of different types of message. Delegates the actual work to the :meth:`dispatch` method, which can be overridden by subclasses. Messages received after cancellation are ignored. Parameters ---------- message : object Message from the background task. """ if self._internal_state == _CANCELLING_AFTER_STARTED: # Ignore messages that arrive after a cancellation request. return elif self._internal_state == EXECUTING: self.dispatch(message) else: raise _StateTransitionError( "Unexpected custom message in internal state {!r}".format( self._internal_state ) ) def _task_abandoned(self, none): """ Update state when the background task is abandoned due to cancellation. Internal state: * _CANCELLING_BEFORE_STARTED -> _CANCELLED_ABANDONED Parameters ---------- none : NoneType This parameter is unused. """ if self._internal_state == _CANCELLING_BEFORE_STARTED: self._cancel = None self._internal_state = _CANCELLED_ABANDONED else: raise _StateTransitionError( "Unexpected 'started' message in internal state {!r}".format( self._internal_state ) ) def _task_started(self, none): """ Update state when the background task has started processing. Internal state: * WAITING -> EXECUTING * _CANCELLING_BEFORE_STARTED -> _CANCELLED_AFTER_STARTED Parameters ---------- none : NoneType This parameter is unused. """ if self._internal_state == WAITING: self._internal_state = EXECUTING elif self._internal_state == _CANCELLING_BEFORE_STARTED: self._internal_state = _CANCELLING_AFTER_STARTED else: raise _StateTransitionError( "Unexpected 'started' message in internal state {!r}".format( self._internal_state ) ) def _task_returned(self, result): """ Update state when background task reports completing successfully. Internal state: * EXECUTING -> COMPLETED * _CANCELLING_AFTER_STARTED -> _CANCELLED_COMPLETED Parameters ---------- result : any The object returned by the background task. """ if self._internal_state == EXECUTING: self._cancel = None self._result = result self._internal_state = COMPLETED elif self._internal_state == _CANCELLING_AFTER_STARTED: self._cancel = None self._result = result self._internal_state = _CANCELLED_COMPLETED else: raise _StateTransitionError( "Unexpected 'returned' message in internal state {!r}".format( self._internal_state ) ) def _task_raised(self, exception_info): """ Update state when the background task reports completing with an error. Internal state: * EXECUTING -> FAILED * _CANCELLING_AFTER_STARTED -> _CANCELLED_FAILED Parameters ---------- exception_info : tuple Tuple containing exception information in string form: (exception type, exception value, formatted traceback). """ if self._internal_state == EXECUTING: self._cancel = None self._exception = exception_info self._internal_state = FAILED elif self._internal_state == _CANCELLING_AFTER_STARTED: self._cancel = None self._exception = exception_info self._internal_state = _CANCELLED_FAILED else: raise _StateTransitionError( "Unexpected 'raised' message in internal state {!r}".format( self._internal_state ) ) def _user_cancelled(self): """ Update state when the user requests cancellation. A future in ``WAITING`` or ``EXECUTING`` state moves to ``CANCELLING`` state. Internal state: * WAITING -> _CANCELLING_BEFORE_STARTED * EXECUTING -> _CANCELLING_AFTER_STARTED """ if self._internal_state == WAITING: self._cancel() self._internal_state = _CANCELLING_BEFORE_STARTED elif self._internal_state == EXECUTING: self._cancel() self._internal_state = _CANCELLING_AFTER_STARTED else: raise _StateTransitionError( "Unexpected 'cancelled' message in internal state {!r}".format( self._internal_state ) ) # Private traits ########################################################## #: Callback called (with no arguments) when user requests cancellation. #: This is reset to ``None`` once cancellation is impossible. _cancel = Callable(allow_none=True) #: The internal state of the future. _internal_state = Enum(WAITING, list(_INTERNAL_STATE_TO_STATE)) #: Result from the background task. _result = Any() #: Exception information from the background task. _exception = Tuple(Str(), Str(), Str()) # Private methods ######################################################### def _get_state(self): """Property getter for the "state" trait.""" return _INTERNAL_STATE_TO_STATE[self._internal_state] def _get_cancellable(self): """Property getter for the "cancellable" trait.""" return self._internal_state in _CANCELLABLE_INTERNAL_STATES def _get_done(self): """Property getter for the "done" trait.""" return self._internal_state in _DONE_INTERNAL_STATES @observe("_internal_state") def _update_property_traits(self, event): """Trait change handler for the "_internal_state" trait.""" old_internal_state, new_internal_state = event.old, event.new old_state = _INTERNAL_STATE_TO_STATE[old_internal_state] new_state = _INTERNAL_STATE_TO_STATE[new_internal_state] if old_state != new_state: self.trait_property_changed("state", old_state, new_state) old_cancellable = old_internal_state in _CANCELLABLE_INTERNAL_STATES new_cancellable = new_internal_state in _CANCELLABLE_INTERNAL_STATES if old_cancellable != new_cancellable: self.trait_property_changed( "cancellable", old_cancellable, new_cancellable ) old_done = old_internal_state in _DONE_INTERNAL_STATES new_done = new_internal_state in _DONE_INTERNAL_STATES if old_done != new_done: self.trait_property_changed("done", old_done, new_done)
[docs] class BaseTask(abc.ABC): """ Mixin for background task classes, making those classes callable. This class provides a callable wrapper allowing subclasses to easily provide a background callable task. Subclasses should override the ``run`` method to customize what should happen when the task runs. This class's ``__call__`` implementation will take care of sending standard control messages telling the future that the task has started, completed, or raised, and delegate to the ``run`` method for execution of the background task and sending of any custom messages. """
[docs] @abc.abstractmethod def run(self): """ Run the body of the background task. Returns ------- any : object May return any object. That object will be delivered to the future's ``result`` attribute. """
[docs] def send(self, message_type, message_arg=None): """ Send a message to the associated future. Parameters ---------- message_type : str The type of the message. This is used by ``BaseFuture`` when dispatching messages to appropriate handlers. message_arg : object, optional Message argument, if any. If not given, ``None`` is used. """ self.__send((SENT, (message_type, message_arg)))
[docs] def cancelled(self): """ Determine whether the user has requested cancellation. Returns True if the user has requested cancellation via the associated future's ``cancel`` method, and False otherwise. Returns ------- bool """ return self.__cancelled()
def __call__(self, send, cancelled): self.__send = send self.__cancelled = cancelled try: if cancelled(): send((ABANDONED, None)) return send((STARTED, None)) try: result = self.run() except BaseException as e: send((RAISED, marshal_exception(e))) else: send((RETURNED, result)) finally: del self.__cancelled del self.__send