Source code for encore.concurrent.futures.enhanced_thread_pool_executor
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
#
# Changes:
# May 2013
# CLF: Ported to encore
# CLF: Renamed ThreadPoolExecutor to EnhancedThreadPoolExecutor
# CLF: Patched from upstream issue16284
# CLF: Patched from upstream issue11777
# CLF: Added 'initializer' and 'uninitializer' arguments to
# EnhancedThreadPoolExecutor
# CLF: Added 'name' argument to EnhancedThreadPoolExecutor
"""Implements EnhancedThreadPoolExecutor.
This builds off of concurrent.futures.thread and implements the following
changes:
* Each worker can be initialized and uninitialised with specified functions.
* 'map' works without iterating (bugs.python.org/issue11777).
* Workers do not unnecessarily retain references to work items
(bugs.python.org/issue16284).
* Workers do not use polling: http://bugs.python.org/issue11635
* Optional `name` argument to prefix executor threads' names.
* Optional `wait_on_exit` argument to let worker threads die
abruptly during jobs on interpreter exit instead of waiting to finish.
The implementation is largely copied to avoid reliance on undocumented, private
parts of the code. For example, '_threads_queues' is needed to properly
manage threads in the ThreadPoolExecutor, but this is not guaranteed to exist
in future implementations of concurrent.futures.thread.
"""
import atexit
import itertools
import queue
import threading
import weakref
import time
from concurrent.futures import _base
from .future import Future
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue, initialize=None,
uninitialize=None):
if initialize is not None:
try:
initialize()
except BaseException:
_base.LOGGER.critical('Initialize exception in worker',
exc_info=True)
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notify other workers.
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
finally:
if uninitialize is not None:
try:
uninitialize()
except BaseException:
_base.LOGGER.critical('Uninitialize exception in worker',
exc_info=True)
[docs]class EnhancedThreadPoolExecutor(_base.Executor):
# Class-attribute for customizing the type of future used by the executor.
# This is provided to ease customization of this class without having to
# duplicate its internals. The factory must accept no arguments and the
# object returned by the factory must support the concurrent.futures.Future
# API.
_future_factory = Future
def __init__(self, max_workers, initializer=None, uninitializer=None,
name=None, wait_at_exit=True):
"""Initializes a new EnhancedThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
initializer: callable, taking no arguments, that is to be called on
each worker thread when it is started. Exceptions in the
initializer are logged, then ignored.
uninitializer: callable, taking no arguments, that is to be called
on each worker thread when shut down. Exceptions in the
uninitializer are logged, then ignored.
name: string giving the name for this Executor. This name
is used as a prefix for the names of the executor's
worker threads. If no name is given then the executor
class name will be used.
wait_at_exit: whether to wait for running jobs to finish during
interpreter exit or to abruptly kill the worker thread.
Note: The killed job does not get a chance to do any cleanups,
its resources (such as open files, database transactions, etc.)
may not be released properly. See the threading module
documentation about daemon threads for more information.
"""
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._initializer = initializer
self._uninitializer = uninitializer
self._wait_at_exit = wait_at_exit
if name is None:
name = type(self).__name__
self.name = name
self._thread_counter = itertools.count(start=1)
[docs] def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = self._future_factory()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
if len(self._threads) < self._max_workers:
thread_name = "{0}Worker-{1}".format(self.name,
next(self._thread_counter))
t = threading.Thread(target=_worker, name=thread_name,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._uninitializer))
t.daemon = True
t.start()
self._threads.add(t)
if self._wait_at_exit:
_threads_queues[t] = self._work_queue
[docs] def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
[docs] def map(self, fn, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, \*iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(\*args) raises for any values.
"""
timeout = kwargs.get('timeout')
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()