Issue
I'm trying to asynchronously send an HTTP POST using the asyncio and aiohttp libraries, but I want to make the request to the server immediately. In the code below, the producer and consumer continuously communicate through a queue. When the consumer encounters data that satisfies a specific condition (in this case, even number condition), it attempts to send a POST request to the server.
There are two methods in the code: # METHOD 1 and # METHOD 2. However, both have some issues. With # METHOD 1, the functionality works correctly, but there's a problem where the consumer gets blocked until it receives the result of the POST (the producer is not blocked, but the consumer is). What I want is for the consumer to continue processing data produced by the producer independently, even if a POST to the server is pending.
So, I attempted # METHOD 2, but there's an issue here too. # METHOD 2 creates an independent task for processing the POST request using create_task. The problem is that create_task only schedules the task in the event loop; it does not guarantee immediate execution.
In conclusion, I want to immediately execute sending the POST request to the server (just send), but handle the verification of the server's response to the POST asynchronously using create_task. If this is not possible with the aiohttp library, I may need to explore other libraries. Is there a way to achieve what I want?
import asyncio
import aiohttp
import random
async def my_send_post(session, url, headers, params):
resp = await session.post(url=url, headers=headers, json=params)
resp = await resp.json()
print(resp)
return
async def producer(queue):
x = 0
while True:
# produce an item
print(f'producing {x}')
# simulate i/o operation using sleep
await asyncio.sleep(1)
# put the item in the queue
await queue.put(x)
x += 1
async def consumer(queue):
session = aiohttp.ClientSession()
url = 'https://api.testurl.com/'
params = {}
headers = request_headers(params) # just get headers using other library
while True:
# wait for an item from the producer
item = await queue.get()
if item %2 == 0:
# METHOD 1
resp = await session.post(url=url, headers=headers, json=params)
resp = await resp.json()
print(resp)
# METHOD 2
asyncio.create_task(my_send_post(session=session, url=url, headers=headers, params=params))
else:
print(f"consuming odd number{item}")
async def main():
queue = asyncio.Queue()
await asyncio.gather(producer(queue), consumer(queue))
if __name__ == "__main__":
asyncio.run(main())`
Solution
The problem is that create_task only schedules the task in the event loop; it does not guarantee immediate execution.
There isn't going to be any way to guarantee this, unless you know all the variables in you application, but you should be able to get close enough.
First thing to understand is that create_task() schedules the task to be run in the next iteration of the loop (it obviously doesn't execute anything immediately as there is no await
in order to yield to the event loop).
Next thing to understand is that asyncio.sleep(0)
can be used to yield to the event loop for 1 iteration.
Then it's just a question of what tasks are currently running. If there are only the 2 tasks running, then we know that upon yielding to the event loop, the new task will be the next to run, so we can be sure that it is executed immediately.
If there are other tasks running, then it may take a little longer (i.e. it needs to complete one step of any remaining tasks in the current iteration, and then any tasks in the next iteration that were scheduled before it. But, we know it'll be run just before the current task resumes).
That's a very detailed explanation, but basically, you probably just want to do:
t = asyncio.create_task(...)
await asyncio.sleep(0)
Also, on a side-note, you should retain references to the tasks and await
on them at some point, otherwise you risk missing exceptions (you may get a warning about this when using dev mode (python -X dev
)). If you want to just throw tasks to the background without hassle, I suggest using aiojobs (although there might be something in asyncio itself to handle this soon).
Answered By - Sam Bull
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.