Issue
Pretty new to python asyncio lib and have been banging my head to implement a keep alive task. I want to concurrently run a cpu intensive task and a keep alive task. The keep alive should run periodically until cpu intensive finished.
import asyncio
import time
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
delay = 3
close_time = time.time() + delay
while True:
if time.time() > close_time:
break
async def keep_alive():
print("keep alive for 1 second") # My real use case is I want to send a heart beat message every x seconds until cpu intensive finished
await asyncio.sleep(1)
async def main():
cpu_intensive_task = asyncio.create_task(cpu_intensive())
keep_alive_task = asyncio.create_task(keep_alive())
print(f"Started at {time.strftime('%X')}")
# TODO: Not sure how to achieve the expected output
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
'''
Expected Output
Started at 23:55:08
cpu_intensive 3 seconds
keep alive for 1 seconds
keep alive for 1 seconds
keep alive for 1 seconds
Finished at 23:55:11
'''
I have browse through the asyncio lib python and tried several API such as await
, run_coroutine_threadsafe
, asyncio.gather
. But couldn't get it work.
Solution
I think you might have confused concept of concurrency with Parallelism. Let me write some of what I understood about when playing with asyncio:
Two way of achieving concurrency
Parallel
: 'Physical' concurrency, where there could be more than 1 line of code being executed simultaneously at one point.- library:
multiprocessing
- pro: Can utilize Multiple core
- con: Takes some resource to create processes. High overhead communicating with processes(pickle serialization is used). workload has to be thread-safe.
- library:
Asynchronous(await/async)
: 'Perceived' concurrency, where there can't be more than 1 line of code being executed at any given time, but achieve concurrency via context switching. Usesawait
keyword to allow context changes.- libraries:
asyncio
curio
trio
- pro: Can utilize one core better than synchronous code. Much lightweight. Control flows are more predictable than threading. (context switching ONLY happens on await keywords.)
- con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Can't switch context at middle of heavy workload.
- libraries:
Asynchronous(time division)
: aka Thread. Thread in python can only execute 1 line of code at any given time due to GIL. Therefore shares similarity with above.- libraries:
threading
- pro: Can utilize one core better than synchronous code. (Because it run other code while waiting.) Much lightweight. Since it's using time division method it can run even under CPU heavy workload. (By stopping workload briefly and executing other thread)
- con: Cannot utilize Multiple core. Can't run more than 1 code at any given time. Control flows are hard to predict.
- libraries:
So, for any CPU intensive workload, it is better be parallelized.
For any IO bound workload (aka waiting), it is better coded asynchronously - because we don't need to utilize more cores anyway.
Code fixes
asyncio
You need to await
something in cpu_intensive
coroutine.
As shown on this SO post We can use yield asyncio.sleep(0)
to add context switching point inside workload. Of course this is not desired way of writing asynchronous code, but if you need to attach such function to async code it's a way.
import asyncio
import time
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
duration = 3
close_time = time.time() + duration
while True:
if time.time() > close_time:
break
await asyncio.sleep(0)
async def keep_alive():
while True:
print("keep alive for 1 second")
await asyncio.sleep(1)
async def main():
print(f"Started at {time.strftime('%X')}")
cpu_intensive_task = asyncio.create_task(cpu_intensive())
asyncio.create_task(keep_alive())
await cpu_intensive_task
print(f"Finished at {time.strftime('%X')}")
asyncio.run(main())
"""
Started at 04:13:09
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 04:13:12
"""
There's one more keep alive because it check condition first then wait 1 seconds.
Do note asyncio.sleep
is scheduling for event not actually is waiting accurate given time. Consider it like "Do whatever you want while I sleep, but just make sure to call me after X seconds."
P.S.
Later at one point, you'll realize the instability, hard error handling or inconsistency of asyncio and stumble upon to trio like I did, for that case I am leaving example for trio.
trio
import trio
import time
class TaskDoneException(Exception):
pass
async def cpu_intensive():
print("cpu_intensive for 3 seconds")
duration = 3
close_time = time.time() + duration
while True:
if time.time() > close_time:
raise TaskDoneException()
await trio.sleep(0)
async def keep_alive():
while True:
print("keep alive for 1 second")
await trio.sleep(1)
async def main():
try:
async with trio.open_nursery() as nursery:
print(f"Started at {time.strftime('%X')}")
nursery.start_soon(cpu_intensive)
nursery.start_soon(keep_alive)
except TaskDoneException:
print(f"Finished at {time.strftime('%X')}")
trio.run(main)
'''
Output:
Started at 17:43:45
keep alive for 1 second
cpu_intensive for 3 seconds
keep alive for 1 second
keep alive for 1 second
keep alive for 1 second
Finished at 17:43:48
'''
Answered By - jupiterbjy
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.