Issue
There is a task to process a number of long http requests (20-30 seconds). As soon as a reply is received or a timeout occurs, the request has to be executed again without waiting for the rest.
My code:
enter code here
async def get(key):
url = f "https://mysite?key={key}"
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=20) as response:
return await response.json()
coroutines = [get(key1), get(key1), get(key3)]
async def main():
for task in asyncio.as_completed(coroutines):
resp = await task
print(f'resp: {resp}')
Can I somehow update the contents of the asyncio.as_completed(coroutines)
generator when one of the tasks finishes without stopping the other tasks in it?
As a result, I would like to have an infinite loop that repeats each request as soon as it is completed. The contents of the coroutines list are not constant. At any time new keys may appear or existing ones may become obsolete. I may have chosen the wrong way to do this.
Solution
Yes, you probably chose wrong way - it's much easier to design it with Consumer-Producer pattern.
What you did at resp = await task
can essentially be replaced by separate tasks listening for incoming data.
And producer will be multiple tasks that's listening to the individual URL, with while
loop keeping each of task running non-stop.
To connect these 2 types of tasks, Queue
is commonly used.
I strongly suggest reading Official document about it, I know it's indeed a boring task! But will help you in the long-run.
client.py
"""
Demo codes for https://stackoverflow.com/questions/71417266
"""
import asyncio
from typing import Dict
import aiohttp
async def process_response_queue(queue: asyncio.Queue):
"""
Get json response data from queue.
Effectively consumer.
Args:
queue: Queue for receiving url & json response pair
"""
print("Processor started")
while True:
url_from, data = await queue.get()
# do what you want here
print(f"Received {data} from {url_from}")
class TaskManager:
"""
Manage data fetching tasks
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def get_repeat(self, url, timeout=20):
"""
Repeatedly fetch json response from given url and put into queue.
Effectively producer.
Args:
url: URL to fetch from
timeout: Time until timeout
"""
print(f"Task for {url} started")
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, timeout=timeout) as resp:
await self.queue.put((url, await resp.json()))
finally:
del self.tasks[url]
print(f"Task for {url} canceled")
def start_processor(self):
"""
Starts the processor.
"""
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, url):
"""
Create new task from url.
Args:
url: URL to fetch from.
"""
self.tasks[url] = asyncio.create_task(self.get_repeat(url))
def stop_task(self, url):
"""
Stop existing task associated with url.
Args:
url: URL associated with task.
Raises:
KeyError: If no task associated with given url exists.
"""
self.tasks[url].cancel()
def close(self):
"""
Cancels all tasks
"""
for task in self.tasks.values():
task.cancel()
async def main():
"""
Starter code
"""
task_manager = TaskManager()
task_manager.start_processor()
for n in range(5):
task_manager.start_new_task(f"http://127.0.0.1:5000/json?key={n}")
# wait 10 sec
await asyncio.sleep(10)
# cancel 1 task
task_manager.stop_task("http://127.0.0.1:5000/json?key=3")
# wait 20 sec
await asyncio.sleep(20)
# stop all
task_manager.close()
if __name__ == '__main__':
asyncio.run(main())
server.py
"""
Demo codes for https://stackoverflow.com/questions/71417266
"""
import trio
from quart import request, jsonify
from quart_trio import QuartTrio
app = QuartTrio("Very named Much app")
@app.get("/json")
async def send_json():
"""
Sleeps 5 + n seconds before returning response.
Returns:
json response
"""
key = int(request.args["key"])
await trio.sleep(5 + key)
return jsonify({"key": key})
trio.run(app.run_task)
I just used trio for server part because I love Trio - take server part as mere example as you seems to have your own server.
Example output:
Task for http://127.0.0.1:5000/json?key=0 started
Task for http://127.0.0.1:5000/json?key=1 started
Task for http://127.0.0.1:5000/json?key=2 started
Task for http://127.0.0.1:5000/json?key=3 started
Task for http://127.0.0.1:5000/json?key=4 started
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 3} from http://127.0.0.1:5000/json?key=3
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Task for http://127.0.0.1:5000/json?key=3 canceled
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Received {'key': 1} from http://127.0.0.1:5000/json?key=1
Received {'key': 0} from http://127.0.0.1:5000/json?key=0
Received {'key': 4} from http://127.0.0.1:5000/json?key=4
Received {'key': 2} from http://127.0.0.1:5000/json?key=2
Task for http://127.0.0.1:5000/json?key=0 canceled
Task for http://127.0.0.1:5000/json?key=1 canceled
Task for http://127.0.0.1:5000/json?key=2 canceled
Task for http://127.0.0.1:5000/json?key=4 canceled
Process finished with exit code 0
And if you didn't already, feel free to check out Stack Overflow Tour!
Answered By - jupiterbjy
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.