Issue
I am currently running some endless tasks using asyncio.wait
I need a special function to run when all the others are on await
import asyncio
async def special_function():
while True:
# does some work,
# Passes control back to controller to run main_tasks
# if they are no longer waiting.
await asyncio.sleep(0)
async def handler():
tasks = [task() for task in main_tasks]
# Adding the task that I want to run when all main_tasks are awaiting:
tasks.append(special_function())
await asyncio.wait(tasks)
asyncio.get_event_loop().run_until_complete(handler())
How can I get the special_function
to only be run when all main_tasks
are on await
?
Edit:
What I mean by "all main_tasks
are on await
": all main_tasks
are not ready to continue, e.g. are in asyncio.sleep(100)
or I/O bound and still waiting for data.
Therefore the main_tasks
cannot continue and the event loop runs the special_function
while the tasks are in this state, NOT every iteration of the event loop.
Edit 2:
My use case:
The main_tasks
are updating a data structure with new data from web-sockets.
The special_function
transfers that data to another process upon an update signal from that process. (multiprocessing
with shared variables and data structures)
It needs to be the most up to date data it can be when it transfers, there cannot be pending updates from main_tasks.
This is why I only want to run special_function when there are no main_tasks with new data available to be processed. (i.e. all waiting on await
)
Solution
I tried to write a test for the 'task not ready to run' condition. I think asyncio does not expose details from the scheduler. The developers have clearly stated they want to keep freedom for changing asyncio internals without breaking backward compatibility.
In asyncio.Task
there is this comment (note: _step()
runs the task coroutine till the next await):
# An important invariant maintained while a Task not done:
#
# - Either _fut_waiter is None, and _step() is scheduled;
# - or _fut_waiter is some Future, and _step() is *not* scheduled.
But that internal variable is not in the API, of course.
You can get some limited access to _fut_waiter
by reading the output of repr(task)
, but the format seems to be not reliable either, so I would not depend on somehing like this:
PENDINGMSG = 'wait_for=<Future pending '
if all(PENDINGMSG in repr(t) for t in monitored_tasks):
do_something()
Anyway, I think you are trying to be too perfect. You want to know if there is new data in other tasks. What if the data is in asyncio buffers? Kernel buffer? Network card receive buffer? ... You could never know if new data arrives the next millisecond.
My suggestion: write all updates to a single queue. Check that queue as the only source of updates. If the queue is empty, publish the last state.
Answered By - VPfB
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.