Source code for traits_futures.asyncio.event_loop

# (C) Copyright 2018-2021 Enthought, Inc., Austin, TX
# All rights reserved.
#
# This software is provided without warranty under the terms of the BSD
# license included in LICENSE.txt and may be redistributed only under
# the conditions described in the aforementioned license. The license
# is also available online at http://www.enthought.com/licenses/BSD.txt
#
# Thanks for using Enthought open source!

"""
IEventLoop implementation for the main-thread asyncio event loop.
"""
import asyncio

from traits_futures.asyncio.event_loop_helper import EventLoopHelper
from traits_futures.asyncio.pingee import Pingee
from traits_futures.i_event_loop import IEventLoop


[docs]@IEventLoop.register class AsyncioEventLoop: """ IEventLoop implementation for the main-thread asyncio event loop. """ def __init__(self): self._event_loop = asyncio.get_event_loop()
[docs] def pingee(self, on_ping): """ Return a new pingee. Parameters ---------- on_ping Zero-argument callable, called on the main thread (under a running event loop) as a result of each ping sent. The return value of the callable is ignored. Returns ------- pingee : IPingee """ return Pingee(on_ping=on_ping, event_loop=self._event_loop)
[docs] def helper(self): """ Return a new event loop helper. Returns ------- event_loop_helper : IEventLoopHelper """ return EventLoopHelper(event_loop=self._event_loop)