Source code for traits_futures.background_iteration
# (C) Copyright 2018-2021 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
"""
Background task that sends results from an iteration.
"""
from traits.api import Callable, Dict, Event, HasStrictTraits, Str, Tuple
from traits_futures.base_future import BaseFuture, BaseTask
from traits_futures.i_task_specification import ITaskSpecification
#: Message sent whenever the iteration yields a result.
#: The message argument is the result generated.
GENERATED = "generated"
[docs]class IterationTask(BaseTask):
"""
Iteration to be executed in the background.
"""
def __init__(self, callable, args, kwargs):
self.callable = callable
self.args = args
self.kwargs = kwargs
[docs] def run(self):
iterable = iter(self.callable(*self.args, **self.kwargs))
while True:
if self.cancelled():
return None
try:
result = next(iterable)
except StopIteration as e:
# If the iteration returned a value, the StopIteration
# exception carries that value. Return it.
return e.value
self.send(GENERATED, result)
# Don't keep a reference around until the next iteration.
del result
[docs]class IterationFuture(BaseFuture):
"""
Foreground representation of an iteration executing in the background.
"""
#: Event fired whenever a result arrives from the background
#: iteration.
result_event = Event()
# Private methods #########################################################
def _process_generated(self, result):
self.result_event = result
[docs]@ITaskSpecification.register
class BackgroundIteration(HasStrictTraits):
"""
Object representing the background iteration to be executed.
"""
#: The callable to be executed. This should return something iterable.
callable = Callable()
#: Positional arguments to be passed to the callable.
args = Tuple()
#: Named arguments to be passed to the callable.
kwargs = Dict(Str())
[docs] def future(self, cancel):
"""
Return a Future for the background task.
Parameters
----------
cancel
Zero-argument callable, returning no useful result. The returned
future's ``cancel`` method should call this to request cancellation
of the associated background task.
Returns
-------
future : IterationFuture
Future object that can be used to monitor the status of the
background task.
"""
return IterationFuture(_cancel=cancel)
[docs] def task(self):
"""
Return a background callable for this task specification.
Returns
-------
task : IterationTask
Callable accepting arguments ``send`` and ``cancelled``. The
callable can use ``send`` to send messages and ``cancelled`` to
check whether cancellation has been requested.
"""
return IterationTask(
callable=self.callable,
args=self.args,
kwargs=self.kwargs,
)
[docs]def submit_iteration(executor, callable, *args, **kwargs):
"""
Submit an iteration to an executor.
Parameters
----------
executor : TraitsExecutor
Executor to submit the task to. This argument should always be
passed by position rather than by name. Future versions of the library
may enforce this restriction.
callable
Callable returning an iterator when called with the given arguments.
This argument should always be passed by position rather than by name.
Future versions of the library may enforce this restriction.
*args
Positional arguments to pass to the callable.
**kwargs
Named arguments to pass to the callable.
Returns
-------
future : IterationFuture
Object representing the state of the background iteration.
"""
task = BackgroundIteration(callable=callable, args=args, kwargs=kwargs)
return executor.submit(task)