Issue
Is it possible to use asyncio.Queue with a webserver like Quart to communicate between the producer and consumer?
Here is what I am trying to do....
from quart import Quart, request
import asyncio
queue = asyncio.Queue()
producers = []
consumers = []
async def producer(mesg):
print(f'produced {mesg}')
await queue.put(mesg)
await asyncio.sleep(1) # do some work
async def consumer():
while True:
token = await queue.get()
await asyncio.sleep(1) # do some work
queue.task_done()
print(f'consumed {token}')
@app.route('/route', methods=['POST'])
async def index():
mesg = await request.get_data()
try:
p = asyncio.create_task(producer(mesg))
producers.append(p)
c = asyncio.create_task(consumer())
consumers.append(c)
return f"published message {mesg}", 200
except Exception as e:
logger.exception("Failed tp publish message %s!", mesg)
return f"Failed to publish message: {mesg}", 400
if __name__ == '__main__':
PORT = int(os.getenv('PORT')) if os.getenv('PORT') else 8050
app.run(host='0.0.0.0', port=PORT, debug=True)
This works fine. But I am not sure if this is a good practice because I am confused how (where in my code) to do the below steps.
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
EDIT-1:
I have tried using @app.after_serving
, with some logger.debug
statements.
@app.after_serving
async def shutdown():
logger.debug("Shutting down...")
logger.debug("waiting for producers to finish...")
await asyncio.gather(*producers)
logger.debug("waiting for tasks to complete...")
await queue.join()
logger.debug("cancelling consumers...")
for c in consumers:
c.cancel()
But the debug statements are not printed when hypercorn
is gracefully shutting down. So, I am not sure whether the function(shutdown) decorated with @app.after_serving
is actually called during a shutdown.
Here is the message from hypercorn
during shutdown
appserver_1 | 2020-05-29 15:55:14,200 - base_events.py:1490 - create_server - INFO - <Server sockets=(<asyncio.TransportSocket fd=14, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8080)>,)> is serving
appserver_1 | Running on 0.0.0.0:8080 over http (CTRL + C to quit)
Gracefully stopping... (press Ctrl+C again to force)
I using a kill -SIGTERM <PID>
to signal a graceful shutdown to the process.
Solution
I would place the cleanup code in a shutdown after_serving function,
@app.after_serving
async def shutdown():
# Making sure all the producers have completed
await asyncio.gather(*producers)
#wait for the remaining tasks to be processed
await queue.join()
# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
As for the globals, I tend to store them on the app directly so that they can be accessed via the current_app
proxy. Please note though that this (and your solution) only works for a single process (worker), if you want to use multiple workers (or equivalently hosts) you will need a third party store for this information e.g. using redis.
Answered By - pgjones
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.