Issue
I have a function that makes an HTTP request and then returns the response. I'd like this function to be able to run in blocking or non-blocking mode depending on a parameter. Is this possible at all in Python? The pseudo-code I would imagine would be something like:
def maybe_async(asynchronous):
if asynchronous:
# We assume there's an event loop running and we can await
return await send_async_http_request()
else:
# Perform a normal synchronous call
return send_http_request()
This throws a SyntaxError
and I'd like to find a way to rephrase it so that it rather throws RuntimeError: no running event loop
at runtime if asynchronous=True
but no event loop is running.
There's been two comments saying I just have to remove the await
keyword from maybe_async
, however I believe if we want to post-process the response then this is not relevant. Here is a more specific use case: say I want to hand to end-users a function that gathers all events IDs from the GitHub API, and that does so in a blocking or non-blocking mode depending on user input. Here's what I would like to do:
import aiohttp
import requests
async def get_async(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
def get_sync(url):
return requests.get(url).json()
def get(url, asynchronous):
if asynchronous:
return get_async(url) # This is the suggested edit where
# I removed the await keyword
else:
return get_sync(url)
def get_github_events_ids(asynchronous=False):
url = 'https://api.github.com/events'
body = get(url, asynchronous)
return [event['id'] for event in body]
But obviously running get_github_events_ids(True)
raises TypeError: 'coroutine' object is not iterable
.
My question is: is there any code design other than duplicating all functions that allows to choose between sync and async?
Solution
The problem is that under asyncio, functions like get_github_events_ids
must either themselves be async or return an object obtained by calling (but not awaiting) an async function. This is to allow them to suspend execution while waiting for the results to arrive.
You could create two versions of each function, one that awaits and the other that runs code, but that would result in a lot of code duplication. There is a better way, but it requires a little bit of magic.
First, internally the code must always use async because that's the only way to propagate suspensions in the async case. But in the sync case, it can just wrap the object returned by the sync call in something that can be trivially awaited (because the coroutine will just return the result of the sync call), and we can do that awaiting at top level. We'll call that operation "awaitify". At top-level in asynchronous case we can return the coroutine object to the caller which will await it, and in the synchronous case we can just drive the coroutine to completion, an operation we'll aptly call "drive".
Here is what get_github_events_ids
would look like:
def get_github_events_ids(asynchronous=False):
coro = _get_github_events_ids_impl(asynchronous)
if asynchronous:
return coro # let the caller await
else:
return drive(coro) # get sync result from "coroutine"
The implementation will look like it's always async:
async def _get_github_events_ids_impl(asynchronous):
url = 'https://api.github.com/events'
body = await awaitify(get(url, asynchronous))
return [event['id'] for event in body]
# "get", "get_sync", and "get_async" remain exactly as
# in the question
Now we just need to define the awaitify
and drive
magic functions:
def awaitify(obj):
if isinstance(obj, types.CoroutineType):
return obj # nothing to do, we're already async
# return an async shim that will just return the object
async def _identity():
return obj
return _identity()
def drive(coro):
# coro is not really async, so we don't need an event loop or
# anything, we just drive the coroutine object to completion.
# Don't try this at home!
while True:
try:
coro.send(None)
except StopIteration as done:
return done.value
To test it, just run it both ways:
if __name__ == '__main__':
# test sync
print(get_github_events_ids(False))
# test async
print(asyncio.run(get_github_events_ids(True)))
Answered By - user4815162342
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.