Issue
I try to start Binance websocket to collect candles data. It works well if there is no delay in the data processing function. But when some pauses occurs in the function processing one ticker data, it also delays the response for other ticker. Do anybody know how to run them independantly?
from binance.client import Client
from binance.websockets import BinanceSocketManager
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)
def process(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
time.sleep(5)
def socket_1():
conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')
def socket_2():
conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')
socket_1()
socket_2()
bm.start()
I tried to make the socket run two separate tasks with asyncio
as @Mike Malyi suggested, but it did not eliminate the delay:
import asyncio
def process(msg):
asyncio.run(main(msg))
async def main(msg):
if msg['s'] == 'ETHUSDT':
task1 = asyncio.create_task(process_message(msg))
await task1
else:
task2 = asyncio.create_task(process_message(msg))
await task2
async def process_message(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
await asyncio.sleep(5)
eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')
bm.start()
I also tried to make the function run independanly using Queue
in threads
, but it did not help, one function still delays the other:
from queue import Queue
def consumer(in_q):
while True:
msg = in_q.get()
process_message(msg)
def producer(out_q):
eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')
def process_message(msg):
if msg['s'] == 'ETHUSDT':
time.sleep(5)
print(f"{msg['s']} with delay, {time.strftime('%X')}")
else:
print(f"{msg['s']} {time.strftime('%X')}")
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
bm.start()
Solution
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
def process_message(msg):
if msg['s'] == 'ETHUSDT':
print(f"{msg['s']} with delay, {time.strftime('%X')}")
time.sleep(5)
print('delay end')
else:
print(f"{msg['s']} {time.strftime('%X')}")
def build_thread (symbol):
print('start thread', symbol)
q = queue.Queue()
bm = BinanceSocketManager(client, user_timeout=60)
conn_key = bm.start_kline_socket(symbol, q.put, '1h')
bm.start()
while(True):
msg = q.get()
process_message(msg)
thread.start_new_thread(build_thread, ('ETHUSDT', ))
thread.start_new_thread(build_thread, ('BNBUSDT', ))
Answered By - Mike Malyi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.