Issue
I recently started studying the asyncio
library with the goal of replacing a big thread based application with async.
Reading the asyncio
documentantion I stumbled upon an example where
the create_task
is being used. Because I'm stuck for now with python 3.6, I changed the create_task
call with
the ensure_future
, yielding the current code:
# Python 3.6
import asyncio
import time
async def say_after(delay, what):
print(f"start {what}") # Added for better vizualization of what is happening
await asyncio.sleep(delay)
print(what)
async def main():
task1 = asyncio.ensure_future(
say_after(1, 'hello'))
task2 = asyncio.ensure_future(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
And with the output:
started at 15:23:11
start hello
start world
hello
world
finished at 15:23:13
From what I can understand, the event loop:
- First starts the task
task1
; - After hitting the
asyncio.sleep
it changes the context to the secondtask2
; and - When the first sleep is over, it changes to
task1
and equally the same thing occurs with thetask2
when its sleep call is over.
With all of that said, one of the requirements of my application is that we have some blocking calls that would needed to be converted to a coroutine.
I created this mock code for testing the run_in_executor
function with this goal:
# Python 3.6
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def normal_operations():
print("start blocking")
time.sleep(1)
print("ended blocking")
async def async_operation():
print("start non blocking")
await asyncio.sleep(2)
print("ended non blocking")
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
with ThreadPoolExecutor() as pool:
task1 = asyncio.ensure_future(
loop.run_in_executor(pool, normal_operations)
)
task2 = asyncio.ensure_future(
async_operation()
)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()
I expected that the output would be similar to the first example, but when I ran this code the output was:
started at 15:28:06
start blocking
ended blocking
start non blocking
ended non blocking
finished at 15:28:09
The two functions ran in order, not like the first example where the start print
calls were called one before the other.
I'm not sure what I'm doing wrong, my guess is that the run_in_executor
function does not truly create an async call, or maybe I just implemented it wrong, I don't know.
Solution
Ok, I think got the error.
I was awaiting the non-blocking operation outside the ThreadPoolExecutor
and in the __exit__
dunder a shutdown
function is being called with the wait=True
parameter, so basically the executor was blocking my code.
Fixed code:
async def main():
loop = asyncio.get_event_loop()
print(f"started at {time.strftime('%X')}")
pool = ThreadPoolExecutor()
task1 = asyncio.ensure_future(loop.run_in_executor(pool, normal_operations))
task2 = asyncio.ensure_future(async_operation())
await task1
await task2
print(f"finished at {time.strftime('%X')}")
pool.shutdown(wait=True)
With the expected output:
started at 16:20:48
start non blocking
start blocking
ended blocking
ended non blocking
finished at 16:20:50
Answered By - Rodrigo Ce Moretto
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.