Contexts and multiprocessing¶
Note
The multiprocessing support in Traits Futures is provisional. The API is subject to change in future releases. Feedback on the feature is welcome.
By default, the TraitsExecutor
submits its background tasks to a thread pool.
In some cases, for example in the case of multiple heavily CPU-bound background
tasks, it may be desirable to run the background tasks in separate processes
instead. For this to work, the Traits Futures code needs to know that it has to
work internally with multiprocessing-safe variants of the usual concurrency
primitives: events, queues, worker pools and the like.
This can be achieved through use of a context, or more specifically,
an object implementing the IParallelContext
interface. A context provides
the executor with a way of creating related and compatible
concurrency constructs.
Traits Futures provides two different contexts: the MultithreadingContext
and the MultiprocessingContext
. By default, the executor will use a
MultithreadingContext
, but you can create and pass in your own context
instead. The context should be closed with the close
method once it’s
no longer needed.
Here’s an example main
function that creates an executor that uses
a multiprocessing context:
def main():
context = MultiprocessingContext()
traits_executor = TraitsExecutor(context=context)
try:
view = SquaringHelper(traits_executor=traits_executor)
view.configure_traits()
finally:
traits_executor.shutdown()
context.close()
Here’s a complete TraitsUI example
that makes use of this.
"""
Complete example showing how to use the MultiprocessingContext to execute
background jobs in separate processes instead of separate threads.
The "jobs" in this case are slow, unreliable squaring operations. The
GUI allows multiple jobs to execute simultaneously, and shows the status
of each of the currently-running and completed jobs.
Requires TraitsUI to run, in addition to the usual Traits Futures
dependencies.
"""
import random
import time
from traits.api import (
Button,
Dict,
HasStrictTraits,
Instance,
List,
observe,
Property,
Range,
Str,
)
from traits_futures.api import (
CallFuture,
CANCELLED,
CANCELLING,
COMPLETED,
EXECUTING,
FAILED,
MultiprocessingContext,
submit_call,
TraitsExecutor,
WAITING,
)
from traitsui.api import (
HGroup,
Item,
TabularAdapter,
TabularEditor,
UItem,
VGroup,
View,
)
def slow_square(n, timeout=5.0):
"""
Compute the square of an integer, slowly and unreliably.
The input should be in the range 0-100. The larger
the input, the longer the expected time to complete the operation,
and the higher the likelihood of timeout.
"""
mean_time = (n + 5.0) / 5.0
sleep_time = random.expovariate(1.0 / mean_time)
if sleep_time > timeout:
time.sleep(timeout)
raise RuntimeError("Calculation took too long.")
else:
time.sleep(sleep_time)
return n * n
class JobTabularAdapter(TabularAdapter):
columns = [
("Job State", "state"),
]
#: Row colors for the table.
colors = Dict(
{
CANCELLED: (255, 0, 0),
CANCELLING: (255, 128, 0),
EXECUTING: (128, 128, 255),
FAILED: (255, 192, 255),
COMPLETED: (128, 255, 128),
WAITING: (255, 255, 255),
}
)
#: Text to be displayed for the state column.
state_text = Property(Str())
def _get_bg_color(self):
return self.colors[self.item.state]
def _get_state_text(self):
job = self.item
state = job.state
state_text = state.title()
if state == COMPLETED:
state_text += ": result={}".format(job.result)
elif state == FAILED:
state_text += ": {}".format(job.exception[1])
return state_text
class SquaringHelper(HasStrictTraits):
#: The Traits executor for the background jobs.
traits_executor = Instance(TraitsExecutor)
#: List of the submitted jobs, for display purposes.
current_futures = List(Instance(CallFuture))
#: Start a new squaring operation.
square = Button()
#: Cancel all currently executing jobs.
cancel_all = Button()
#: Clear completed jobs from the list of current jobs.
clear_finished = Button()
#: Value that we'll square.
input = Range(low=0, high=100)
@observe("square")
def _do_slow_square(self, event):
future = submit_call(self.traits_executor, slow_square, self.input)
self.current_futures.append(future)
@observe("cancel_all")
def _cancel_all_futures(self, event):
for future in self.current_futures:
future.cancel()
@observe("clear_finished")
def _clear_finished_futures(self, event):
for future in list(self.current_futures):
if future.done:
self.current_futures.remove(future)
def default_traits_view(self):
return View(
HGroup(
VGroup(
Item("input"),
UItem("square"),
UItem("cancel_all"),
UItem("clear_finished"),
),
VGroup(
UItem(
"current_futures",
editor=TabularEditor(
adapter=JobTabularAdapter(),
auto_update=True,
),
),
),
),
width=1024,
height=768,
resizable=True,
)
def main():
"""
Demonstrate a GUI that hands off background tasks to a separate process.
"""
context = MultiprocessingContext()
traits_executor = TraitsExecutor(context=context)
try:
view = SquaringHelper(traits_executor=traits_executor)
view.configure_traits()
finally:
traits_executor.shutdown()
context.close()
if __name__ == "__main__":
main()