Source code for traits_futures.background_iteration

# (C) Copyright 2018-2020 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!

"""
Background task that sends results from an iteration.
"""

from traits.api import (
    Callable,
    Dict,
    Event,
    HasStrictTraits,
    Str,
    Tuple,
)

from traits_futures.base_future import BaseFuture
from traits_futures.i_task_specification import ITaskSpecification


#: Message sent whenever the iteration yields a result.
#: The message argument is the result generated.
GENERATED = "generated"


[docs]class IterationBackgroundTask: """ Iteration to be executed in the background. """ def __init__(self, callable, args, kwargs): self.callable = callable self.args = args self.kwargs = kwargs def __call__(self, send, cancelled): iterable = iter(self.callable(*self.args, **self.kwargs)) while True: if cancelled(): return None try: result = next(iterable) except StopIteration as e: # If the iteration returned a value, the StopIteration # exception carries that value. Return it. return e.value send(GENERATED, result) # Don't keep a reference around until the next iteration. del result
[docs]class IterationFuture(BaseFuture): """ Foreground representation of an iteration executing in the background. """ #: Event fired whenever a result arrives from the background #: iteration. result_event = Event() # Private methods ######################################################### def _process_generated(self, result): self.result_event = result
[docs]@ITaskSpecification.register class BackgroundIteration(HasStrictTraits): """ Object representing the background iteration to be executed. """ #: The callable to be executed. This should return something iterable. callable = Callable() #: Positional arguments to be passed to the callable. args = Tuple() #: Named arguments to be passed to the callable. kwargs = Dict(Str())
[docs] def future(self): """ Return a Future for the background task. Returns ------- future : IterationFuture Future object that can be used to monitor the status of the background task. """ return IterationFuture()
[docs] def background_task(self): """ Return a background callable for this task specification. Returns ------- collections.abc.Callable Callable accepting arguments ``send`` and ``cancelled``. The callable can use ``send`` to send messages and ``cancelled`` to check whether cancellation has been requested. """ return IterationBackgroundTask( callable=self.callable, args=self.args, kwargs=self.kwargs.copy(), )
[docs]def submit_iteration(executor, callable, *args, **kwargs): """ Convenience function to submit a background iteration to an executor. Parameters ---------- executor : TraitsExecutor Executor to submit the task to. callable : collections.abc.Callable Callable returning an iterator when called with the given arguments. *args Positional arguments to pass to the callable. **kwargs Named arguments to pass to the callable. Returns ------- future : IterationFuture Object representing the state of the background iteration. """ task = BackgroundIteration(callable=callable, args=args, kwargs=kwargs) return executor.submit(task)