Source code for encore.concurrent.futures.serializing_asynchronizer
#
# (C) Copyright 2011-2022 Enthought, Inc., Austin, TX
# All right reserved.
#
# This file is open source software distributed according to the terms in
# LICENSE.txt
#
from collections import OrderedDict
from encore.concurrent.futures.abc_work_scheduler import ABCWorkScheduler
[docs]class SerializingAsynchronizer(ABCWorkScheduler):
"""Provides Asynchronizer functionality for multiple operations.
The SerializingAsynchronizer provides the same guarantees as the
:class:`~encore.concurrent.futures.asynchronizer.Asynchronizer` for
multiple different operations. For any submitted callable, requests
to submit a new operation while an operation of **the same**
callable is underway, the new operation is stored overwriting the
prior. Different submitted callables are executed serially in the
order in which they were originally submitted.
For example if long-running callable ``C`` is submitted, then
callable ``A``, then callable ``B``, then callable ``A`` again (all
while ``C`` is running), then the order of execution will be ``C``,
``A``, ``B``.
.. warning::
This is an experimental API and is subject to change.
"""
def __init__(self, executor, name=None, callback=None):
super(SerializingAsynchronizer, self).__init__(
executor, name, callback)
# Ordered dictionary containing tuples (operation, args, kwargs)
# representing pending operations. The items are keyed by the
# operation. Operations are executed in the order they were
# entered in the dict.
self._pending_operations = OrderedDict()
###########################################################################
# Private methods.
###########################################################################
def _get_next_operation(self):
""" Return the next operation to schedule or None.
"""
if len(self._pending_operations) == 0:
return None
_, pending = self._pending_operations.popitem(last=False)
return pending
def _add_pending_operation(self, operation, args, kwargs):
""" Add a new pending operation for scheduling.
"""
self._pending_operations[operation] = operation, args, kwargs