Issue
I'm replacing part of an existing program. That original program uses threads. There's this particular class which inherits from threading.Thread
which functionality I need to replace but I need to keep the interface the same.
The functionality I'm integrating is packaged in a library which uses asyncio
a lot.
The original calls to the class I'm replacing go something like this:
network = Network()
network.start()
network.fetch_something() # crashes!
network.stop()
I've gotten to a point where my replacing class inherits from threading.Thread
too and I can connect, from within the run
method to my backends via the client library:
class Network(threading.Thread):
def __init__(self):
self._loop = asyncio.new_event_loop()
self._client = Client() # this is the library
def run(self):
self._loop.run_until_complete(self.__connect()) # works dandy, implementation not shown
self._loop.run_forever()
def fetch_something(self):
return self._loop.run_until_complete(self._client.fetch_something())
Running this code throws an exception:
RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one
I sort of get what's going on here. In the run
method things worked out because the same thread running the event loop was the caller. In the other case an other thread was the caller hence the problem.
As you might have noticed I was hoping the problem would have been solved by using the same event loop. Alas, that didn't work out.
I really want to keep the interface exactly as it is otherwise I'm refactoring for the remainder of the year. I could relatively easily pass arguments to the constructor of the Network
class. I've tried passing in an event loop created on the main thread but the result was the same.
(Note that this is the opposite problem this author has: Call coroutine within Thread)
Solution
When scheduling a coroutine from a different thread, you must use asyncio.run_coroutine_threadsafe
. For example:
def fetch_something(self):
future = asyncio.run_coroutine_threadsafe(
self._client.fetch_something(), loop)
return future.result()
run_coroutine_threadsafe
schedules the coroutine with the event loop in a thread-safe way and returns a concurrent.futures.Future
. You can use the returned future to simply wait for the result as shown above, but you can also pass it to other functions, poll whether the result has arrived, or implement timeouts.
When combining threads and asyncio, remember to make sure that all interfacing with the event loop from other threads (even to call something as simple as loop.stop
to implement Network.stop
) is done using loop.call_soon_threadsafe
and asyncio.run_coroutine_threadsafe
.
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.