Source code for traits_futures.traits_executor
# (C) Copyright 2018-2020 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!
"""
Executor to submit background tasks.
"""
import concurrent.futures
import warnings
from traits.api import (
Any,
Bool,
Dict,
Enum,
HasStrictTraits,
Instance,
on_trait_change,
Property,
)
from traits_futures.background_call import submit_call
from traits_futures.background_iteration import submit_iteration
from traits_futures.background_progress import submit_progress
from traits_futures.i_parallel_context import IParallelContext
from traits_futures.wrappers import BackgroundTaskWrapper, FutureWrapper
# Executor states.
#: Executor is currently running (this is the initial state).
RUNNING = "running"
#: Executor has been requested to stop. In this state, no new
#: jobs can be submitted, and we're waiting for old ones to complete.
STOPPING = "stopping"
#: Executor is stopped.
STOPPED = "stopped"
#: Trait type representing the executor state.
ExecutorState = Enum(RUNNING, STOPPING, STOPPED)
[docs]class TraitsExecutor(HasStrictTraits):
"""
Executor to initiate and manage background tasks.
Parameters
----------
thread_pool : concurrent.futures.Executor, optional
Deprecated alias for worker_pool.
.. deprecated:: 0.2
Use ``worker_pool`` instead.
worker_pool : concurrent.futures.Executor, optional
If supplied, provides the underlying worker pool executor to use. In
this case, the creator of the TraitsExecutor is responsible for
shutting down the worker pool once it's no longer needed. If not
supplied, a new private worker pool will be created, and this object's
``stop`` method will shut down that worker pool.
max_workers : int or None, optional
Maximum number of workers for the private worker pool. This parameter
is mutually exclusive with ``worker_pool``. The default is ``None``,
which delegates the choice of number of workers to Python's
``concurrent.futures`` module.
context : IParallelContext, optional
Parallelism context, providing appropriate concurrent primitives
and worker pools for a given choice of parallelism (for example
multithreading or multiprocessing). If not given, assumes
multithreading. Note that if both ``context`` and ``worker_pool``
are given, they must be compatible.
"""
#: Current state of this executor.
state = ExecutorState
#: Derived state: true if this executor is running; False if it's
#: stopped or stopping.
running = Property(Bool())
#: Derived state: true if this executor is stopped and it's safe
#: to dispose of related resources (like the worker pool).
stopped = Property(Bool())
def __init__(
self,
thread_pool=None,
*,
worker_pool=None,
max_workers=None,
context=None,
**traits,
):
super(TraitsExecutor, self).__init__(**traits)
if thread_pool is not None:
warnings.warn(
(
"The thread_pool argument to TraitsExecutor is "
"deprecated. Use worker_pool instead."
),
category=DeprecationWarning,
stacklevel=2,
)
worker_pool = thread_pool
if context is not None:
self._context = context
own_worker_pool = worker_pool is None
if own_worker_pool:
if max_workers is None:
worker_pool = self._context.worker_pool()
else:
worker_pool = self._context.worker_pool(
max_workers=max_workers
)
elif max_workers is not None:
raise TypeError(
"at most one of 'worker_pool' and 'max_workers' "
"should be supplied"
)
self._worker_pool = worker_pool
self._own_worker_pool = own_worker_pool
[docs] def submit_call(self, callable, *args, **kwargs):
"""
Convenience function to submit a background call.
.. deprecated:: 0.2
Use the :func:`~.submit_call` function instead.
Parameters
----------
callable : an arbitrary callable
Function to execute in the background.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function.
Returns
-------
future : CallFuture
Object representing the state of the background call.
"""
warnings.warn(
"The submit_call method is deprecated. Use the submit_call "
"convenience function instead.",
DeprecationWarning,
stacklevel=2,
)
return submit_call(self, callable, *args, **kwargs)
[docs] def submit_iteration(self, callable, *args, **kwargs):
"""
Convenience function to submit a background iteration.
.. deprecated:: 0.2
Use the :func:`~.submit_iteration` function instead.
Parameters
----------
callable : an arbitrary callable
Function executed in the background to provide the iterable.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function.
Returns
-------
future : IterationFuture
Object representing the state of the background iteration.
"""
warnings.warn(
"The submit_iteration method is deprecated. Use the "
"submit_iteration convenience function instead.",
DeprecationWarning,
stacklevel=2,
)
return submit_iteration(self, callable, *args, **kwargs)
[docs] def submit_progress(self, callable, *args, **kwargs):
"""
Convenience function to submit a background progress call.
.. deprecated:: 0.2
Use the :func:`~.submit_progress` function instead.
Parameters
----------
callable : callable accepting a "progress" named argument
Function executed in the background to provide the iterable. This
should accept a "progress" named argument. The callable can then
call the "progress" object to report progress.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function. These should not include
"progress".
Returns
-------
future : ProgressFuture
Object representing the state of the background task.
"""
warnings.warn(
"The submit_progress method is deprecated. Use the "
"submit_progress convenience function instead.",
DeprecationWarning,
stacklevel=2,
)
return submit_progress(self, callable, *args, **kwargs)
[docs] def submit(self, task):
"""
Submit a task to the executor, and return the corresponding future.
Parameters
----------
task : ITaskSpecification
Returns
-------
future : IFuture
Future for this task.
"""
if not self.running:
raise RuntimeError("Can't submit task unless executor is running.")
cancel_event = self._context.event()
sender, receiver = self._message_router.pipe()
try:
runner = task.background_task()
future = task.future()
future._executor_initialized(cancel_event.set)
except Exception:
self._message_router.close_pipe(sender, receiver)
raise
background_task_wrapper = BackgroundTaskWrapper(
runner, sender, cancel_event
)
wrapper = FutureWrapper(future=future, receiver=receiver)
self._worker_pool.submit(background_task_wrapper)
self._wrappers[receiver] = wrapper
return future
[docs] def stop(self):
"""
Initiate stop: cancel existing jobs and prevent new ones.
"""
if not self.running:
raise RuntimeError("Executor is not currently running.")
# For consistency, we always go through the STOPPING state,
# even if there are no jobs.
self.state = STOPPING
# Cancel any futures that aren't already cancelled.
for _, wrapper in self._wrappers.items():
future = wrapper.future
if future.cancellable:
future.cancel()
if not self._wrappers:
self._stop()
# Private traits ##########################################################
#: Parallelization context
_context = Instance(IParallelContext)
#: True if we own this context, else False.
_own_context = Bool(False)
#: concurrent.futures.Executor instance providing the worker pool.
_worker_pool = Instance(concurrent.futures.Executor)
#: True if we own this worker pool (and are therefore responsible
#: for shutting it down), else False.
_own_worker_pool = Bool()
#: Router providing message connections between background tasks
#: and foreground futures.
_message_router = Any()
#: True if we've created a message router, and need to shut it down.
_have_message_router = Bool(False)
#: Wrappers for currently-executing futures.
_wrappers = Dict(Any(), Any())
# Private methods #########################################################
def _get_running(self):
return self.state == RUNNING
def _get_stopped(self):
return self.state == STOPPED
def _state_changed(self, old_state, new_state):
old_running = old_state == RUNNING
new_running = new_state == RUNNING
if old_running != new_running:
self.trait_property_changed("running", old_running, new_running)
old_stopped = old_state == STOPPED
new_stopped = new_state == STOPPED
if old_stopped != new_stopped:
self.trait_property_changed("stopped", old_stopped, new_stopped)
def __message_router_default(self):
# Toolkit-specific message router.
router = self._context.message_router()
router.connect()
self._have_message_router = True
return router
def __context_default(self):
# By default, we use multithreading
from traits_futures.multithreading_context import MultithreadingContext
context = MultithreadingContext()
self._own_context = True
return context
@on_trait_change("_message_router:receiver_done")
def _remove_future(self, receiver):
self._wrappers.pop(receiver)
# If we're in STOPPING state and the last future has just exited,
# go to STOPPED state.
if self.state == STOPPING and not self._wrappers:
self._stop()
def _stop(self):
"""
Go to STOPPED state, and shut down the worker pool if we own it.
"""
assert self.state == STOPPING
if self._have_message_router:
self._message_router.disconnect()
self._message_router = None
if self._own_worker_pool:
self._worker_pool.shutdown()
self._worker_pool = None
if self._own_context:
self._context.close()
self._context = None
self.state = STOPPED