Issue
I'm doing work on a pub/sub zmq receiver, one pub, multiple subs which can receive messages from one publisher in an asyncio environment. The receiver which is asyncio based doesn't receive any messages.
Snippet out of the receiver program :
async def zmq_listener(sub, stop_event):
global mode
while not stop_event.is_set():
msg = await sub.recv_multipart()
print(msg)
if len(msg) == 1:
print(msg[0].decode())
######################################
### MAIN PROGRAM STARTS HERE
async def main():
tasks = []
tasks.append(asyncio.create_task(other_routine1(abc, stop_event)))
if doZMQ:
print("ZMQ Mode is enabled")
ctx = zmq.asyncio.Context()
sub = ctx.socket(zmq.SUB)
ip = 'tcp://127.0.0.1:5559'
sub.connect(ip)
sub.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all topics
tasks.append(asyncio.create_task(zmq_listener(sub, stop_event)))
await asyncio.gather(*tasks)
stop_event = asyncio.Event()
try:
uvloop.install()
asyncio.run(main(), debug=False)
except KeyboardInterrupt:
print("Program interrupted by user")
asyncio.run(stop())
The send application to test the receiver is following :
import zmq
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)
pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])
Solution
The publisher is sending messages before the subscriber has had a chance to connect and subscribe to the messages, lets try to add a short delay like 1 second.
import zmq
import time
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
ip = 'tcp://127.0.0.1:5559'
pub.bind(ip)
time.sleep(1)
pub.send_multipart([b"SHOW"])
pub.send_multipart([b"P1"])
Answered By - Saxtheowl
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.