Issue
I want to download data in batches asynchronously.
The data for each name
is downloaded in batches, and I'd like asyncio.gather(*coroutines)
to return a list of lists (a list of batches for each name). So far I have this code, but it raises an exception:
import asyncio
import datetime
async def run(names):
"""Start one coroutine for each name."""
coroutines = [_fetch_data(name) for name in names]
return await asyncio.gather(*coroutines) # This fails!
async def _fetch_data(name):
"""Fetch data for a single symbol in batches."""
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
i = 1
while start_timestamp < end_timestamp:
batch = f"Batch {i} for {name}"
await asyncio.sleep(2) # Some async API call, for example
# If I remove the yield, it works. But I want to use the results!
yield batch
start_timestamp += 3600
i += 1
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
# I'd expect something like
# [["Batch 1 for Jack", "Batch 2 for Jack", ...], ["Batch 1 for Jill", "Batch 2 for Jill", ...], ...]
Unfortunately, this code returns an exception for asyncio.gather(*coroutines)
:
TypeError: An asyncio.Future, a coroutine or an awaitable is required
Isn't _fetch_data
a coroutine? What is this error trying to tell me? And how can I get past it?
I'm trying to learn more about asyncio in Python and I'm quite sure I'm missing some basics here.
Solution
No, _fetch_data
is not a coroutine function. By using a yield
statement inside it you turned it into an asynchronous generator, which is not awaitable by default. See here for details about the distinction.
If you actually want it to be a generator and you want multiple async
generators to be consumed concurrently, you'll need to modify your code a bit and introduce a coroutine that asynchronously consumes the generator. This is typically done via an async for
-loop.
Something like this will work:
...
async def run(names):
coroutines = [fetch(name) for name in names]
return await asyncio.gather(*coroutines)
async def fetch(name):
return [batch async for batch in _fetch_data(name)]
...
I don't know your actual use case, so I don't know, if using a generator is actually prudent here. But alternatively you could of course sidestep the entire issue by just making _fetch_data
an actual coroutine function that returns a list, rather than an asynchronous generator yielding list items:
import asyncio
import datetime
async def run(names):
return await asyncio.gather(*(_fetch_data(name) for name in names))
async def _fetch_data(name):
start_timestamp = datetime.datetime(2021, 9, 1).timestamp()
end_timestamp = datetime.datetime(2021, 9, 2).timestamp()
results = []
i = 1
while start_timestamp < end_timestamp:
batch = f"Batch {i} for {name}"
await asyncio.sleep(2) # Some async API call, for example
results.append(batch)
start_timestamp += 3600
i += 1
return results
async def main():
names = ["Jack", "Jill", "Bob"]
return await run(names)
output = asyncio.run(main())
print(output)
Both of these work, produce the output you expect, and are (at least with your simplified example) essentially equivalent.
Answered By - Daniil Fajnberg
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.