Issue
I am trying build a rudimentary crypto trader using the kafka consumer/producer to ingest websocket data.
My issue is that once I get the consumed message, I am trying to create some code to execute trades after analyzing the message. In this case, in the function auto_trader, I want to try and print '2nd anything' but it is getting blocked by the awaited kafkaconsumer function. The simplified code looks like the following:
'''
async def kafkaconsumer(some_topic, trade = False):
#does some for loop to return a message and sends the message to a function to storevalues#
await returns_values(first_msg, rec_msg, last_msg)
async def returns_values(first_msg, rec_msg, last_msg):
await asyncio.sleep(.01)
return [first_msg[1], rec_msg[1],rec_msg[2],rec_msg[3], rec_msg[4], rec_msg[5], rec_msg[6], last_msg[0]]
async def auto_trade(ticker):
print('anything')
reader = await kafkaconsumer(some_topic = 'ETHUSD', trade = True)
print('2nd anything') #this doesn't get printed out, instead I just continuously receive messages from the consumer
'''
It appears that it is getting stuck on the message and won't execute anything afterwards, even though I have placed several await sleep items in the returns_values function.
Is there something fundamentally flawed with my setup, or am I misusing asyncio in some manner?
For executing the function itself, I am using the following:
'''
async def execute():
await asyncio.gather (
auto_trade('ETHUSD'),
auto_trade('BTCUSD'),
auto_trade('DOGE')
)
if __name__=='__main__':
trade = asyncio.run(execute())
Solution
This is interesting question, and I tried to reproduce your behaviour using code below:
import asyncio
from random import random
async def kafkaconsumer(name):
for i in [(name, 'says:', "wait me"), (name, 'says:', "please wait, I don't over yet"), (name, 'says:', "wait wait, I'm still here"), (name, 'says:', "okay I'm done, now you can leave")]:
await asyncio.sleep(random()*3)
print(i[0], i[1], i[2])
async def auto_trade(name):
print('wait for the', name)
await kafkaconsumer(name=name)
print('Ok I\'m done with', name, '. May to go to next step')
print('Next step with', name)
async def execute():
await asyncio.gather (
auto_trade('John'),
auto_trade('Bill'),
auto_trade('Frank')
)
if __name__=='__main__':
trade = asyncio.run(execute())
Output is:
wait for the John
wait for the Bill
wait for the Frank
Frank says: wait me
Frank says: please wait, I don't over yet
Bill says: wait me
John says: wait me
Frank says: wait wait, I'm still here
Bill says: please wait, I don't over yet
John says: please wait, I don't over yet
John says: wait wait, I'm still here
Bill says: wait wait, I'm still here
Frank says: okay I'm done, now you can leave
Ok I'm done with Frank . May to go to next step
Next step with Frank
Bill says: okay I'm done, now you can leave
Ok I'm done with Bill . May to go to next step
Next step with Bill
John says: okay I'm done, now you can leave
Ok I'm done with John . May to go to next step
Next step with John
So, yes, await kafkaconsumer
is waiting before all messages provided by kafka will end. And looks like code hided behind
#does some for loop to return a message and sends the message to a function to storevalues#
is a some type of generator that provides new messages in infinite loop. So you will never reach the print('2nd anything')
without break
infinite loop by some trigger (or SIGTERM for example)
So here is no possible to recommend exact solution without knowing what inside kafkaconsumer
And about reader
, not sure, maybe it is code simplification mistake, but reader
is meaningless, it is always None
. reader = await kafkaconsumer...
doing exactly same stuff as await kafkaconsumer...
. So if you expecting some value from it in real code, it is a logical mistake. You must return something in kafkaconsumer
(return await returns_values(first_msg, rec_msg, last_msg)
for example)
Answered By - rzlvmp
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.