Issue
I have a script to subscribe to websockets of over 1000 instruments on Deribit's API and feed the results to a queue and runs in a separate thread, however after a certain amount of websockets are opened within 1 second, I get rate limited. My solution is to open a websocket and then sleep for 100ms before awaiting the next websocket task, however I cannot figure out how to do this with asyncio. Here is the websocket code:
class Deribit:
def __init__(self, contract, currency, q):
self.baseUrl = "wss://www.deribit.com/ws/api/v2"
self.contract = contract
self.currency = currency
self.q = q
async def tickerSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"ticker.{self.contract}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'tickerSubscribe'
self.q.put(response)
if not websocket.open:
print('tickerSubscribe Closed')
await self.tickerSubscribe()
async def flowSubscribe(self):
msg = {
"jsonrpc": "2.0",
"method": "public/subscribe",
"params": {"channels": [f"trades.option.{self.currency}.raw"]},
}
async with websockets.connect(self.baseUrl) as websocket:
await websocket.send(json.dumps(msg))
while websocket.open:
if flag == 'stop':
await websocket.close()
return
else:
response = await websocket.recv()
response = json.loads(response)
response['type'] = 'flowSubscribe'
self.q.put(response)
if not websocket.open:
print('flowSubscribe closed')
await self.flowSubscribe()
Then here is the code to await the tasks:
async def main():
tasks = []
q = queue.Queue()
coins = []
print('Getting instruments..')
while True:
btc_instruments = Deribit('BTC', 'BTC', '').getInstruments('false')
if btc_instruments != None:
eth_instruments = Deribit('ETH', 'ETH', '').getInstruments('false')
break
print('Adding instruments to list..')
for instrument in range(len(btc_instruments['result'])):
orderbooks[btc_instruments['result'][instrument]['instrument_name']] = []
coins.append(btc_instruments['result'][instrument]['instrument_name'])
for instrument in range(len(eth_instruments['result'])):
orderbooks[eth_instruments['result'][instrument]['instrument_name']] = []
coins.append(eth_instruments['result'][instrument]['instrument_name'])
for coin in range(len(coins)):
tasks.append(asyncio.create_task(Deribit(coins[coin], '', q).tickerSubscribe()))
tasks.append(asyncio.create_task(Deribit('', 'BTC', q).flowSubscribe()))
tasks.append(asyncio.create_task(Deribit('', 'ETH', q).flowSubscribe()))
print(f'Length of Tasks list : {len(tasks)}')
print('Subscribing to instruments..')
for t in range(len(tasks)):
await tasks[t]
await asyncio.sleep(0.1)
Here is the traceback:
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "ws_correct_trades.py", line 199, in main
await tasks[t]
File "ws_correct_trades.py", line 133, in tickerSubscribe
async with websockets.connect(self.baseUrl) as websocket:
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line 517, in __aenter__
return await self
File "/usr/local/lib/python3.8/dist-packages/websockets/client.py", line 535, in __await_impl__
transport, protocol = await self._create_connection()
File "/usr/lib/python3.8/asyncio/base_events.py", line 1050, in create_connection
transport, protocol = await self._create_connection_transport(
File "/usr/lib/python3.8/asyncio/base_events.py", line 1080, in _create_connection_transport
await waiter
ConnectionResetError
Solution
...open a websocket and then sleep for 100ms before awaiting the next websocket task...
From the documentation, there are three ways to start a coroutine. One is with asyncio.create_task
. In your process, the loops that create the tasks are starting them immediately with no delay. Here is a, hopefully analogous, mre - adapted directly from the example in the docs.
import asyncio, time
async def say_after(delay, what):
# await asyncio.sleep(delay)
print(f"{time.perf_counter():1.5f} {what:>7}")
async def main():
tasks = []
print(f"start creating tasks {time.perf_counter():1.5f}")
for i in range(5):
tasks.append(asyncio.create_task(say_after(.1, f'task-{i}'),name=f'task-{i}'))
print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
await asyncio.sleep(3)
for task in tasks:
print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
await task
asyncio.run(main())
start creating tasks 0.79750
sleeping 3 before awaiting tasks 0.79803
0.79858 task-0
0.79891 task-1
0.79926 task-2
0.80382 task-3
0.80421 task-4
awaiting task-0 | 3.80519
awaiting task-1 | 3.80611
awaiting task-2 | 3.80744
awaiting task-3 | 3.80816
awaiting task-4 | 3.81093
You can see that the tasks are starting immediately when they are created - not when they are awaited.
Even if you add a delay in the couroutine, it doesn't accomplish what you want.
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{time.perf_counter():1.5f} {what:>7}")
>>>
start creating tasks 0.89468
sleeping 3 before awaiting tasks 0.89484
1.00323 task-0
1.00376 task-1
1.00431 task-4
1.00467 task-3
1.00508 task-2
awaiting task-0 | 3.90330
awaiting task-1 | 3.90421
awaiting task-2 | 3.90542
awaiting task-3 | 3.90617
awaiting task-4 | 3.91162
Each one processes 100 mS after it is started but no delay between them.
Adding the delay to the loop should do what you want - create each task 100 mS apart.
async def say_after(delay, what):
# await asyncio.sleep(delay)
print(f"{time.perf_counter():1.5f} {what:>7}")
async def main():
tasks = []
print(f"start creating tasks {time.perf_counter():1.5f}")
for i in range(5):
await asyncio.sleep(.1)
tasks.append(asyncio.create_task(say_after(.1, f'task-{i}'),name=f'task-{i}'))
print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
await asyncio.sleep(3)
for task in tasks:
print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
await task
>>>
start creating tasks 0.81102
0.91610 task-0
1.02837 task-1
1.13217 task-2
1.24762 task-3
sleeping 3 before awaiting tasks 1.35085
1.35127 task-4
awaiting task-0 | 4.36476
awaiting task-1 | 4.36564
awaiting task-2 | 4.36670
awaiting task-3 | 4.36752
awaiting task-4 | 4.37601
Or maybe you could submit in batches waiting between each batch.
async def main():
tasks = []
print(f"start creating tasks {time.perf_counter():1.5f}")
flag = 0
n_per_second = 3
while flag < 3: # limit the number of batches for the example.
print(f'batch:{flag}')
for i in range(n_per_second):
name = f'task-{(flag*10)+i}'
tasks.append(asyncio.create_task(say_after(.1, name),name=name))
await asyncio.sleep(1 / n_per_second)
flag += 1
print(f"sleeping 3 before awaiting tasks {time.perf_counter():1.5f}")
await asyncio.sleep(3)
for task in tasks:
print(f"awaiting {task.get_name()} | {time.perf_counter():1.5f}")
await task
asyncio.run(main()
>>>
start creating tasks 0.84680
batch:0
0.84742 task-0
0.84749 task-1
0.84755 task-2
batch:1
1.19304 task-10
1.19334 task-11
1.19372 task-12
batch:2
1.54109 task-20
1.54149 task-21
1.54171 task-22
sleeping 3 before awaiting tasks 1.88965
awaiting task-0 | 4.90472
awaiting task-1 | 4.90519
awaiting task-2 | 4.90568
awaiting task-10 | 4.90766
awaiting task-11 | 4.90802
awaiting task-12 | 4.90836
awaiting task-20 | 4.91029
awaiting task-21 | 4.91057
awaiting task-22 | 4.91096
Answered By - wwii
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.