Source code for traits_futures.traits_executor
# (C) Copyright 2018-2019 Enthought, Inc., Austin, TX
# All rights reserved.
"""
Main-thread executor for submission of background tasks.
"""
from __future__ import absolute_import, print_function, unicode_literals
import concurrent.futures
import threading
from traits.api import (
Bool, Enum, HasStrictTraits, HasTraits, Instance, on_trait_change,
Property, Set)
from traits_futures.background_call import BackgroundCall
from traits_futures.background_iteration import BackgroundIteration
from traits_futures.background_progress import BackgroundProgress
from traits_futures.toolkit_support import message_router_class
# Executor states.
#: Executor is currently running (this is the initial state).
RUNNING = "running"
#: Executor has been requested to stop. In this state, no new
#: jobs can be submitted, and we're waiting for old ones to complete.
STOPPING = "stopping"
#: Executor is stopped.
STOPPED = "stopped"
#: Trait type representing the executor state.
ExecutorState = Enum(RUNNING, STOPPING, STOPPED)
[docs]class TraitsExecutor(HasStrictTraits):
"""
Executor to initiate and manage background tasks.
"""
#: Current state of this executor.
state = ExecutorState
#: Derived state: true if this executor is running; False if it's
#: stopped or stopping.
running = Property(Bool())
#: Derived state: true if this executor is stopped and it's safe
#: to dispose of related resources (like the thread pool).
stopped = Property(Bool())
def __init__(self, thread_pool=None, **traits):
super(TraitsExecutor, self).__init__(**traits)
if thread_pool is None:
self._thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=4)
self._own_thread_pool = True
else:
self._thread_pool = thread_pool
self._own_thread_pool = False
[docs] def submit_call(self, callable, *args, **kwargs):
"""
Convenience function to submit a background call.
Parameters
----------
callable : an arbitrary callable
Function to execute in the background.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function.
Returns
-------
future : CallFuture
Object representing the state of the background call.
"""
task = BackgroundCall(
callable=callable,
args=args,
kwargs=kwargs,
)
return self.submit(task)
[docs] def submit_iteration(self, callable, *args, **kwargs):
"""
Convenience function to submit a background iteration.
Parameters
----------
callable : an arbitrary callable
Function executed in the background to provide the iterable.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function.
Returns
-------
future : IterationFuture
Object representing the state of the background iteration.
"""
task = BackgroundIteration(
callable=callable,
args=args,
kwargs=kwargs,
)
return self.submit(task)
[docs] def submit_progress(self, callable, *args, **kwargs):
"""
Convenience function to submit a background progress call.
Parameters
----------
callable : callable accepting a "progress" named argument
Function executed in the background to provide the iterable. This
should accept a "progress" named argument. The callable can then
call the "progress" object to report progress.
*args
Positional arguments to pass to that function.
**kwargs
Named arguments to pass to that function. These should not include
"progress".
Returns
-------
future : ProgressFuture
Object representing the state of the background task.
"""
task = BackgroundProgress(
callable=callable,
args=args,
kwargs=kwargs,
)
return self.submit(task)
[docs] def submit(self, task):
"""
Submit a task to the executor, and return the corresponding future.
Parameters
----------
task : BackgroundCall, BackgroundIteration or BackgroundProgress
The task to be executed.
Returns
-------
future : CallFuture, IterationFuture or ProgressFuture
Future for this task.
"""
if not self.running:
raise RuntimeError("Can't submit task unless executor is running.")
sender, receiver = self._message_router.pipe()
future, runner = task.future_and_callable(
cancel_event=threading.Event(),
message_sender=sender,
message_receiver=receiver,
)
self._thread_pool.submit(runner)
self._futures.add(future)
return future
[docs] def stop(self):
"""
Initiate stop: cancel existing jobs and prevent new ones.
"""
if not self.running:
raise RuntimeError("Executor is not currently running.")
# For consistency, we always go through the STOPPING state,
# even if there are no jobs.
self.state = STOPPING
# Cancel any futures that aren't already cancelled.
for future in self._futures:
if future.cancellable:
future.cancel()
if not self._futures:
self._stop()
# Private traits ##########################################################
#: concurrent.futures.Executor instance providing the thread pool.
_thread_pool = Instance(concurrent.futures.Executor)
#: True if we own this thread pool (and are therefore responsible
#: for shutting it down), else False.
_own_thread_pool = Bool()
#: Router providing message connections between background tasks
#: and foreground futures.
_message_router = Instance(HasTraits)
#: Currently executing futures.
_futures = Set()
# Private methods #########################################################
def _get_running(self):
return self.state == RUNNING
def _get_stopped(self):
return self.state == STOPPED
def _state_changed(self, old_state, new_state):
old_running = old_state == RUNNING
new_running = new_state == RUNNING
if old_running != new_running:
self.trait_property_changed("running", old_running, new_running)
old_stopped = old_state == STOPPED
new_stopped = new_state == STOPPED
if old_stopped != new_stopped:
self.trait_property_changed("stopped", old_stopped, new_stopped)
def __message_router_default(self):
class_ = message_router_class()
return class_()
@on_trait_change('_futures:_exiting')
def _remove_future(self, future, name, new):
self._futures.remove(future)
# If we're in STOPPING state and the last future has just exited,
# go to STOPPED state.
if self.state == STOPPING and not self._futures:
self._stop()
def _stop(self):
"""
Go to STOPPED state, and shut down the thread pool if we own it.
"""
assert self.state == STOPPING
if self._own_thread_pool:
self._thread_pool.shutdown()
self._thread_pool = None
self.state = STOPPED