traits_futures.multiprocessing_router module

Implementations of the IMessageSender, IMessageReceiver and IMessageRouter interfaces for tasks executed in a background process.

Overview of the implementation

When the router is started (via the start method), it sets up the following machinery:

  • A process-safe process message queue that’s shared between processes (MultiprocessingRouter._process_message_queue). This queue runs in its own manager server process (the manager is MultiprocessingRouter.manager), and the main process and worker processes use proxy objects to communicate with the queue.

  • A thread-safe local message queue (MultiprocessingRouter._local_message_queue) in the main process.

  • A long-running thread (MultiprocessingRouter._monitor_thread), running in the main process, that continually monitors the process message queue and immediately transfers any messages that arrive to the local message queue.

  • A IPingee instance that’s pinged by the monitor thread whenever a message is transferred from the process message queue to the local message queue, alerting the GUI that there’s a message to process and route.

When a worker process uses the sender to send a message, the following steps occur:

  • the sender places the message onto the process message queue (using its local proxy for that message queue)

  • the monitor thread receives the message (using its local proxy for the process message queue) and places the message onto the local message queue. It also pings the pingee.

  • assuming a running GUI event loop, the pingee receives the ping and executes the MultiprocessingRouter._route_message callback

  • the _route_message callback pulls the next message from the local message queue, inspects it to determine which receiver it should be sent to, and sends it to that receiver

class traits_futures.multiprocessing_router.MultiprocessingReceiver[source]

Bases: traits.has_traits.HasStrictTraits

Implementation of the IMessageReceiver interface for the case where the sender will be in a background process.

connection_id = Int()

Connection id, matching that of the paired sender.

message = Event(Any())

Event fired when a message is received from the paired sender.

class traits_futures.multiprocessing_router.MultiprocessingRouter(**traits)[source]

Bases: traits.has_traits.HasRequiredTraits

Implementation of the IMessageRouter interface for the case where the sender will be in a background process.

Parameters
  • gui_context (IGuiContext) – GUI context to use for interactions with the GUI event loop.

  • manager (multiprocessing.Manager) – Manager to be used for creating the shared-process queue.

close_pipe(receiver)[source]

Close the receiver end of a pipe produced by pipe.

Removes the receiver from the routing table, so that no new messages can reach that receiver.

Not thread safe. Must always be called in the main thread.

Parameters

receiver (MultiprocessingReceiver) – Receiver half of the pair returned by the pipe method.

Raises

RuntimeError – If the router is not currently running.

gui_context = self._route_message)

GUI context to use for interactions with the GUI event loop.

manager = self.gui_context.pingee(on_ping=self._route_message)

Manager, used to create message queues.

pipe()[source]

Create a (sender, receiver) pair for sending and receiving messages.

The sender will be passed to the background task and used to send messages, while the receiver remains in the foreground.

Not thread safe. Must always be called in the main thread.

Returns

  • sender (MultiprocessingSender) – Object to be passed to the background task.

  • receiver (MultiprocessingReceiver) – Object kept in the foreground, which reacts to messages.

Raises

RuntimeError – If the router is not currently running.

start()[source]

Start routing messages.

This method must be called before any call to pipe or close_pipe can be made.

Not thread-safe. Must always be called in the main thread.

Raises

RuntimeError – If the router has already been started.

stop()[source]

Stop routing messages.

This method should be called in the main thread after all pipes are finished with. Calls to pipe or close_pipe are not permitted after this method has been called.

Logs a warning if there are unclosed pipes.

Not thread safe. Must always be called in the main thread.

Raises

RuntimeError – If the router is not running.

class traits_futures.multiprocessing_router.MultiprocessingSender(connection_id, message_queue)[source]

Bases: object

Object allowing the worker to send messages.

This class will be instantiated in the main thread, and the instance passed to the worker process to allow the worker to communicate back to the main thread.

Parameters
  • connection_id (int) – Id of the matching receiver; used for message routing.

  • message_queue (multiprocessing.Queue) – Process-safe queue for passing messages to the foreground.

send(message)[source]

Send a message to the router.

Parameters
  • message (object) – Typically this will be immutable, small, and pickleable.

  • thread-safe. The 'start' (Not) –

  • and 'stop' methods should ('send') –

  • be called from the same thread. (all) –

Raises

RuntimeError – If the sender has not been started, or has already been stopped.

start()[source]

Do any setup necessary to prepare for sending messages.

This method must be called before any messages can be sent using the send method.

Not thread-safe. The ‘start’, ‘send’ and ‘stop’ methods should all be called from the same thread.

Raises

RuntimeError – If the sender has previously been started.

stop()[source]

Do any teardown.

After this method has been called, no more messages can be sent.

Not thread-safe. The ‘start’, ‘send’ and ‘stop’ methods should all be called from the same thread.

Raises

RuntimeError – If the sender has not been started, or has already been stopped.

traits_futures.multiprocessing_router.monitor_queue(process_queue, local_queue, pingee)[source]

Move incoming child process messages to the local queue.

Monitors the process queue for incoming messages, and transfers those messages to the local queue. For each message transferred, pings the event loop using the pingee to notify it that there’s a message to be processed.

To stop the thread, put None onto the process_queue.

Parameters
  • process_queue (multiprocessing.Queue) – Queue to listen to for messages.

  • local_queue (queue.Queue) – Queue to transfer those messages to.

  • pingee (IPingee) – Recipient for pings, used to notify the GUI thread that there’s a message pending.