Contexts and multiprocessing

Note

The multiprocessing support in Traits Futures is provisional. The API is subject to change in future releases. Feedback on the feature is welcome.

By default, the TraitsExecutor submits its background tasks to a thread pool. In some cases, for example in the case of multiple heavily CPU-bound background tasks, it may be desirable to run the background tasks in separate processes instead. For this to work, the Traits Futures code needs to know that it has to work internally with multiprocessing-safe variants of the usual concurrency primitives: events, queues, worker pools and the like.

This can be achieved through use of a context, or more specifically, an object implementing the IParallelContext interface. A context provides the executor with a way of creating related and compatible concurrency constructs.

Traits Futures provides two different contexts: the MultithreadingContext and the MultiprocessingContext. By default, the executor will use a MultithreadingContext, but you can create and pass in your own context instead. The context should be closed with the close method once it’s no longer needed.

Here’s an example main function that creates an executor that uses a multiprocessing context:

def main():
    context = MultiprocessingContext()
    traits_executor = TraitsExecutor(context=context)
    try:
        view = SquaringHelper(traits_executor=traits_executor)
        view.configure_traits()
    finally:
        traits_executor.shutdown()
        context.close()

Here’s a complete TraitsUI example that makes use of this.

"""
Complete example showing how to use the MultiprocessingContext to execute
background jobs in separate processes instead of separate threads.

The "jobs" in this case are slow, unreliable squaring operations. The
GUI allows multiple jobs to execute simultaneously, and shows the status
of each of the currently-running and completed jobs.

Requires TraitsUI to run, in addition to the usual Traits Futures
dependencies.
"""

import random
import time

from traits.api import (
    Button,
    Dict,
    HasStrictTraits,
    Instance,
    List,
    observe,
    Property,
    Range,
    Str,
)
from traits_futures.api import (
    CallFuture,
    CANCELLED,
    CANCELLING,
    COMPLETED,
    EXECUTING,
    FAILED,
    MultiprocessingContext,
    submit_call,
    TraitsExecutor,
    WAITING,
)
from traitsui.api import (
    HGroup,
    Item,
    TabularAdapter,
    TabularEditor,
    UItem,
    VGroup,
    View,
)


def slow_square(n, timeout=5.0):
    """
    Compute the square of an integer, slowly and unreliably.

    The input should be in the range 0-100. The larger
    the input, the longer the expected time to complete the operation,
    and the higher the likelihood of timeout.
    """
    mean_time = (n + 5.0) / 5.0
    sleep_time = random.expovariate(1.0 / mean_time)
    if sleep_time > timeout:
        time.sleep(timeout)
        raise RuntimeError("Calculation took too long.")
    else:
        time.sleep(sleep_time)
        return n * n


class JobTabularAdapter(TabularAdapter):
    columns = [
        ("Job State", "state"),
    ]

    #: Row colors for the table.
    colors = Dict(
        {
            CANCELLED: (255, 0, 0),
            CANCELLING: (255, 128, 0),
            EXECUTING: (128, 128, 255),
            FAILED: (255, 192, 255),
            COMPLETED: (128, 255, 128),
            WAITING: (255, 255, 255),
        }
    )

    #: Text to be displayed for the state column.
    state_text = Property(Str())

    def _get_bg_color(self):
        return self.colors[self.item.state]

    def _get_state_text(self):
        job = self.item
        state = job.state
        state_text = state.title()
        if state == COMPLETED:
            state_text += ": result={}".format(job.result)
        elif state == FAILED:
            state_text += ": {}".format(job.exception[1])
        return state_text


class SquaringHelper(HasStrictTraits):
    #: The Traits executor for the background jobs.
    traits_executor = Instance(TraitsExecutor)

    #: List of the submitted jobs, for display purposes.
    current_futures = List(Instance(CallFuture))

    #: Start a new squaring operation.
    square = Button()

    #: Cancel all currently executing jobs.
    cancel_all = Button()

    #: Clear completed jobs from the list of current jobs.
    clear_finished = Button()

    #: Value that we'll square.
    input = Range(low=0, high=100)

    @observe("square")
    def _do_slow_square(self, event):
        future = submit_call(self.traits_executor, slow_square, self.input)
        self.current_futures.append(future)

    @observe("cancel_all")
    def _cancel_all_futures(self, event):
        for future in self.current_futures:
            future.cancel()

    @observe("clear_finished")
    def _clear_finished_futures(self, event):
        for future in list(self.current_futures):
            if future.done:
                self.current_futures.remove(future)

    def default_traits_view(self):
        return View(
            HGroup(
                VGroup(
                    Item("input"),
                    UItem("square"),
                    UItem("cancel_all"),
                    UItem("clear_finished"),
                ),
                VGroup(
                    UItem(
                        "current_futures",
                        editor=TabularEditor(
                            adapter=JobTabularAdapter(),
                            auto_update=True,
                        ),
                    ),
                ),
            ),
            width=1024,
            height=768,
            resizable=True,
        )


def main():
    """
    Demonstrate a GUI that hands off background tasks to a separate process.
    """
    context = MultiprocessingContext()
    traits_executor = TraitsExecutor(context=context)
    try:
        view = SquaringHelper(traits_executor=traits_executor)
        view.configure_traits()
    finally:
        traits_executor.shutdown()
        context.close()


if __name__ == "__main__":
    main()