Issue
I have a situation where I need to stop the tasks in the async function where it matches the condition whether in task_1
or task_2
or task_3
and also check that result is not None
if the results is None
then I'd like to proceed till the function gets the first result which is not None
. Here is the minimal reproducible code I have so far:
import asyncio
async def task_1(_id):
_ids = ["230327-12717", "230221-28276", "230214-06090"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_1"
async def task_2(_id):
_ids = ["230502-14191", "230425-17005", "230327-14434"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_2"
async def task_3(_id):
_ids = ["230404-23786", "230221-25729", "230221-28276"]
for i in _ids:
if i == _id:
return f"Found {_id} in task_3"
async def main(_id):
tasks = [task_1(_id), task_2(_id), task_3(_id)]
finished, unfinished = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
print(finished)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main("230502-14191"))
which gives output:
{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>, <Task finished name='Task-3' coro=<task_3() done, defined at /home/zerox/pipedrive_CRM/minimal.py:21> result=None>, <Task finished name='Task-4' coro=<task_1() done, defined at /home/zerox/pipedrive_CRM/minimal.py:5> result=None>}
And my expected output should be something like this:
{<Task finished name='Task-2' coro=<task_2() done, defined at /home/zerox/pipedrive_CRM/minimal.py:13> result='Found 230502-14191 in task_2'>}
How can I achieve that?
Solution
This seems like an XY Problem, so I'll try and interpret what you actually want to accomplish.
Whenever you are dealing with multiple tasks that need to be coordinated in some way, it is a good idea to start with the available synchronization primitives.
In this case, you want the search for your specific ID to stop in all tasks, as soon as possible, after it has been found in one of them.
An Event
object can be very useful for such a situation. The main coroutine schedules the "searching" tasks and then simply waits for the event to be set. The tasks all carry a reference to that event object and once the ID is found by one of the tasks, it sets the event. The main coroutine then simple cancels all the tasks and cleans up.
Demo:
from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> None:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
print(f"Found {id_} in {name}")
found.set()
async def task_1(id_: str, found: Event) -> None:
ids = ["230327-12717", "230221-28276", "230214-06090"]
await _search(id_, ids, found, "task_1")
async def task_2(id_: str, found: Event) -> None:
ids = ["230502-14191", "230425-17005", "230327-14434"]
await _search(id_, ids, found, "task_2")
async def task_3(id_: str, found: Event) -> None:
ids = ["230404-23786", "230221-25729", "230221-28276"]
await _search(id_, ids, found, "task_3")
async def main(id_: str) -> None:
found = Event()
tasks = [
create_task(task_1(id_, found)),
create_task(task_2(id_, found)),
create_task(task_3(id_, found)),
]
await found.wait()
for task in tasks:
task.cancel()
await gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
run(main("230502-14191"))
Output:
task_1 checking 230327-12717
task_2 checking 230502-14191
Found 230502-14191 in task_2
task_3 checking 230404-23786
task_1 checking 230221-28276
A few things to note:
- Some
await
within thefor
-loop is necessary for this simple example to ensure that a context switch can happen. Otherwise the tasks will simply execute sequentially. That is why I added theawait asyncio.sleep(0)
. - As you can see, even though the ID was found by task 2, both task 1 and task 3 each still had one more go. There is no way to prevent that because the event loop decides itself, which coroutine to switch to at any given moment. And
await found.wait()
in ourmain
coroutine only guarantees that it will block until the event is set, not that it receives control right away. But you'll notice that none of the tasks actually finished, i.e. none of them went though all their IDs because they were cancelled before they could. - I am using
asyncio.gather
withreturn_exceptions=True
because I do not want any of theCancelledError
s to propagate up to themain
coroutine. - Technically you don't need the
gather
at all because the coroutines will be cancelled regardless, but it is bad form to leave dangling un-await
-ed tasks lying around. You should alwaysawait
your tasks at some point. Andgather
seems like the easiest solution here.
PS: Returning results from asyncio.gather
If you want the task_
functions to return something, gather
will collect those results for you too:
from asyncio import Event, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], found: Event, name: str) -> str:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
found.set()
return f"Found {id_} in {name}"
async def task_1(id_: str, found: Event) -> str:
ids = ["230327-12717", "230221-28276", "230214-06090"]
return await _search(id_, ids, found, "task_1")
async def task_2(id_: str, found: Event) -> str:
ids = ["230502-14191", "230425-17005", "230327-14434"]
return await _search(id_, ids, found, "task_2")
async def task_3(id_: str, found: Event) -> str:
ids = ["230404-23786", "230221-25729", "230221-28276"]
return await _search(id_, ids, found, "task_3")
async def main(id_: str) -> None:
found = Event()
tasks = [
create_task(task_1(id_, found)),
create_task(task_2(id_, found)),
create_task(task_3(id_, found)),
]
await found.wait()
for task in tasks:
task.cancel()
results = await gather(*tasks, return_exceptions=True)
for result in results:
if not isinstance(result, Exception):
print(result)
if __name__ == "__main__":
run(main("230502-14191"))
PPS: Use a Queue
instead of an Event
Alternatively you can set up a Queue
to put the result in and simply await that result in the main
coroutine. That may be a more elegant solution alltogether, but I suppose that is a matter of preference:
from asyncio import Queue, create_task, gather, run, sleep
from collections.abc import Iterable
async def _search(id_: str, ids: Iterable[str], q: Queue, name: str) -> None:
for i in ids:
await sleep(0)
print(f"{name} checking {i}")
if i == id_:
q.put_nowait(f"Found {id_} in {name}")
async def task_1(id_: str, q: Queue) -> None:
ids = ["230327-12717", "230221-28276", "230214-06090"]
await _search(id_, ids, q, "task_1")
async def task_2(id_: str, q: Queue) -> None:
ids = ["230502-14191", "230425-17005", "230327-14434"]
await _search(id_, ids, q, "task_2")
async def task_3(id_: str, q: Queue) -> None:
ids = ["230404-23786", "230221-25729", "230221-28276"]
await _search(id_, ids, q, "task_3")
async def main(id_: str) -> None:
result_queue = Queue()
tasks = [
create_task(task_1(id_, result_queue)),
create_task(task_2(id_, result_queue)),
create_task(task_3(id_, result_queue)),
]
result = await result_queue.get()
for task in tasks:
task.cancel()
await gather(*tasks, return_exceptions=True)
print(result)
if __name__ == "__main__":
run(main("230502-14191"))
Awaiting Queue.get
will block until there is an item in the queue, then return that item. So this accomplishes essentially the same as the Event
from before, but it also gives you the result right away.
You should still clean up (i.e. cancel and await) the other tasks though.
Answered By - Daniil Fajnberg
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.