Issue
Let's say I have some asyncio coroutine which fetches some data and returns it. Like this:
async def fetch_data(*args):
result = await some_io()
return result
Basically this coroutine is called from the chain of coroutines, and initial coroutine is runned by creating a task. But what if for test purposes I want to run only one coroutine this way just when running some file:
if __name__ == '__main__':
result = await fetch_data(*args)
print(result)
And obviously I can't do this since I'm trying to run and await coroutine from not coroutine function.
So the question is: is there some correct way to get data from coroutine without calling it function?
I can make some Future
object for result
and await it, but maybe there are some other more simple and clearer ways?
Solution
You will need to create an event loop to run your coroutine:
import asyncio
async def async_func():
return "hello"
loop = asyncio.get_event_loop()
result = loop.run_until_complete(async_func())
loop.close()
print(result)
Or as a function:
def run_coroutine(f, *args, **kwargs):
loop = asyncio.get_event_loop()
result = loop.run_until_complete(f(*args, **kwargs))
loop.close()
return result
Use like this:
print(run_coroutine(async_func))
Or:
assert "expected" == run_coroutine(fetch_data, "param1", param2="foo")
Answered By - Udi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.