Issue
On a single process I have a tasks running on a thread that produces values and broadcasts them and several consumer async tasks that run concurrently in an asyncio loop.
I found this issue on PyZMQ's github asking async <-> sync communication
with inproc sockets which is what I also wanted and the answer was to use .shadow(ctx.underlying)
when
creating the async ZMQ Context.
I prepared this example and seems to be working fine:
import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json
def producer(ctrl):
# delay first push to give asyncio loop time
# to start
time.sleep(1)
ctx = ctrl["ctx"]
s = ctx.socket(zmq.PUB)
s.bind(ctrl["endpoint"])
v = 0
while ctrl["run"]:
payload = {"value": v, "timestamp": time.time()}
msg = json.dumps(payload).encode("utf-8")
s.send(msg)
v += 1
time.sleep(5)
print("Bye")
def main():
endpoint = "inproc://testendpoint"
ctx = zmq.Context()
actx = zmq.asyncio.Context.shadow(ctx.underlying)
ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }
th = threading.Thread(target=producer, args=(ctrl,))
th.start()
try:
asyncio.run(amain(actx, endpoint))
except KeyboardInterrupt:
pass
print("Stopping thread")
ctrl["run"] = False
th.join()
async def amain(ctx, endpoint):
s = ctx.socket(zmq.SUB)
s.subscribe("")
s.connect(endpoint)
loop = asyncio.get_running_loop()
def stop():
try:
print("Closing zmq async socket")
s.close()
except:
pass
raise KeyboardInterrupt
loop.add_signal_handler(signal.SIGINT, stop)
while True:
event = await s.poll(1000)
if event & zmq.POLLIN:
msg = await s.recv()
payload = json.loads(msg.decode("utf-8"))
print("%f: %d" % (payload["timestamp"], payload["value"]))
if __name__ == "__main__":
sys.exit(main())
Is it safe to use inproc://*
between a thread and asyncio task in this way? The 0MQ
context is thread safe and I'm not sharing sockets between the thread and the
asyncio task, so I would say in general that this is thread safe, right? Or am I
missing something that I should consider?
Solution
Q :
Is it safe to useinproc://*
between a thread and asyncio task in this way?""
A :
First and foremost, I might be awfully wrong (not only here), yet having worked with ZeroMQ since native API 2.1.1+ I dare claim that unless newer "improvements" got lost the core principles ( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ), the answer here shall be YES, as much as the newer releases of pyzmq-binding kept all mandatory properties of the inproc:
-Transport-Class without a compromise.
Q :
" The 0MQ context is thread safe and I'm not sharing sockets between the thread and theasyncio
task, so I would say in general that this is thread safe, right? "
A :
Here my troubles start - ZeroMQ implementations were since ever developed based on Martin SUSTRIK's & Pieter HINTJENS' Zen-of-Zero -- i.e. also as Zero-sharing -- so never sharing was the principle ( though "share"-zmq.Context
-instances were no problem to be used from different threads, to the contrary of the zmq.Socket
-instances )
Python (since ever & still valid in 2022-Q1) used to use & still uses a total [CONCURRENT]
-code-execution avoider -- prevented by GIL-lock, which principally avoids any & all kinds of problems, arising from [CONCURRENT]
-code-execution to never happen insider Python GIL-lock re-[SERIAL]
-ised flow of code-execution, so even if the asyncio
-part is built as a pythonic (non-destructive) part of the ecosystem, your code shall never "meet" any kind of concurrency-related issue, as the unless it gains GIL-lock, it does nothing but "hanging in NOP
-s cracking" ( nuts-cracking in idle loop ).
Being inside the same process, there seems no advantage to spawn another Context
-instance at all ( this used to be the rock-solid certainty since ever, not to ever increase any kind of overheads - Zen-of-Zero ( almost )Zero-overhead ... ). The Sig/Msg core engine was, if performance or latency needs required, powered with more zmq.Context( IOthreads )
upon instantiations, yet these were zmq.Context
-owned, not Python-GIL-governed/(b)locked threads, so the performance was pretty well scalable, without wasting any RAM/HWM/buffers/...-resources, without growing any overheads and very efficient, as the IO-threads were co-located for only indeed I/O-work, so not needed for inproc:
-( protocol-less )-Transport-Class at all )
Q :
" Or am I missing something that I should consider? "
A :
Mixing asyncio
, O/S-signals ( that are well documented how they interact with native ZeroMQ API ) and other layers of complexity is for sure possible, yet it comes at a cost - it makes the use-case less and less readable and more and more prone to conceptual-gaps and similar hard to decode "errors".
I remember using Tkinter-mainloop()
as a cost-wise very cheap and a super-stable framework for rapid-prototyping an MVC-{ M-odel, V-isual, C-ontroller }-parts of many-actors' indeed distributed-system applications in Python. There were Zerop-problems to use ZeroMQ with a single Context
-instance, passing the references of the respective AccessNodes' into whatever amount of event-handlers, supposing we kept the ZeroMQ Zen-of-Zero, i.e. no to "share" (meaning no two parts "use" (compete to use) one and the same AccessPoint "one-over-another")
This all was designed-in, at "Zero-cost", by the ZeroMQ by-definition, so unless spoilt in some later phase, re-wrapping a re-wrapped native API, all this ought still work in 2022-Q1, ought it not?
Answered By - user3666197
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.