Issue
I have one DB connection and many сoroutines to request data. I make the minimal concept, and need help with correct understanding the way of realization.
import asyncio
db_queeu = asyncio.Queue()
async def db_pipe():
while True:
data = await db_queeu.get()
print("DB got", data)
# here process data and return result to requested exec_in_db
async def exec_in_db(query, timeout):
await asyncio.sleep(timeout)
await db_queeu.put(query)
# here I want got result from db_pipe
async def main():
asyncio.create_task(db_pipe())
await asyncio.gather(exec_in_db("Loong query", 4), exec_in_db("Fast query", 1))
print("Listener starts")
if __name__ == "__main__":
asyncio.run(main())
Solution
The good example:
import asyncio
import uuid
import datetime
db_state = {}
db_input = asyncio.Queue()
async def db_executer():
while True:
data = await db_input.get()
db_state[data["key"]]["result"] = data["query"] + "***"
db_state[data["key"]]["event_done"].set()
async def exec_in_db(query, timeout):
key = str(uuid.uuid4())
db_state[key] = {
"event_done": asyncio.Event(),
}
await asyncio.sleep(timeout)
print(db_state)
await db_input.put(
{
"key": key,
"query": query,
}
)
await db_state[key]["event_done"].wait()
print("result:", db_state[key]["result"])
db_state.pop(key)
async def main():
asyncio.create_task(db_executer())
await asyncio.gather(exec_in_db("Loong query", 4), exec_in_db("Fast query", 1))
print("Listener starts")
if __name__ == "__main__":
start_at = datetime.datetime.now()
delta = datetime.timedelta(minutes=1)
print("Start at:", start_at)
asyncio.run(main())
print("Finish at:", datetime.datetime.now())
Answered By - Gulaev Valentin
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.