Issue
Or, in other words, why does this code never print Hello, world
:
import asyncio
import threading
import time
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.transport.write(data)
loop = None
def serve():
async def _serve():
server = await asyncio.get_running_loop().create_server(EchoProtocol, '::1', 2000)
await server.serve_forever()
global loop
loop = asyncio.new_event_loop()
loop.run_until_complete(_serve())
async def hello():
print("Hello, world", flush=True)
def main():
t = threading.Thread(target=serve)
t.start()
time.sleep(0.1)
loop.call_soon_threadsafe(hello)
t.join()
main()
This sets up a TCP echo server on port 2000 which works as intended. But the call to call_soon_threadsafe()
doesn't seem to actually insert the new coroutine into the thread loop running in the thread.
Solution
Your problem is simply that call_soon_thread_safe
takes a regular function, not an async function.
The function is called in the other thread as part of the scheduling of the asyncio loop, and, if needed it can call asyncio.get_running_loop
and then iterate with it.
It is possible then to call loop.create_task
to add new co-routines to be executed in the running loop - but not to do asyncio.run
or loop.run_until_complete
.
In other words, the task can be created in the synchronous function added as a callback from other thread, but you can't await for it - for few tasks, since the loop is running in the server, it will just work with "fire and forget":
async def hello_async():
await asyncio.sleep(2)
print("async hello")
def hello():
print("Hello, world")
loop = asyncio.get_running_loop()
task = loop.create_task(hello_async())
return
(these two functions added in your listing will run both print statements: note that "hello" has changed from "async def" to just "def")
If you need a more elaborate thing, ensuring tasks from a foreign thread are executed and retrieving their results, it is best to have a permanent task checking a global structure where newly created tasks are added like:
import asyncio
import threading
import time
class EchoProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
self.transport.write(data)
loop = None
def serve():
async def _serve():
server = await asyncio.get_running_loop().create_server(EchoProtocol, '::1', 2000)
await server.serve_forever()
global loop
loop = asyncio.new_event_loop()
loop.create_task(foreign_task_worker())
loop.run_until_complete(_serve())
task_registry = []
async def foreign_task_worker():
while True:
if task_registry:
# The quid-pro-quo with task_registry is so
# to ensure new tasks are not ignored
tasks = task_registry[:]
task_registry[:] = []
results = asyncio.gather(*tasks)
# do something with results
...
await asyncio.sleep(0.1)
async def hello_async():
await asyncio.sleep(2)
print("async hello")
def hello():
print("Hello, world", flush=True)
loop = asyncio.get_running_loop()
task_registry.append(loop.create_task(hello_async()))
def main():
t = threading.Thread(target=serve)
t.start()
time.sleep(0.1)
loop.call_soon_threadsafe(hello)
t.join()
main()
Note that asyncio won't complain if you do call loop.create_task
passing a co-routine from the main thread - and it will even work most of times . However, the inner workings of the loop itself are not thread-safe, and eventually doing this will add a task in a point it will wreck the loop internal state.
Answered By - jsbueno
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.