Issue
I am trying to take the output of the client of 'ZeroMQ' and display it through Qt designer python script. The code already receives data correctly but I can't display it on the GUI window and receiving it. It only displays the first line. I am trying threading and 'subprocess' but it doesn't work. i receive data well here in the function (consumer) in the class(Ui_Form)
class Ui_Form(object):
def setupUi(self, Form):
def consumer(self):
consumer_id = random.randrange(1, 10005)
# print("I am consumer #%s" % (consumer_id))
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
# print(time.time())
data = np.frombuffer(buff, dtype="float32")
l = np.where(data == 0)
data = data[0:l[0][0]:1]
print(data)
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for i in range(0, len(signals)):
self.textEdit.append(str(signals[i]) + "\n")
self.textEdit_2.append(str(bandwidth[i]) + "\n")
and when i run the main:
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
Form = QtWidgets.QWidget()
ui = Ui_Form()
ui.setupUi(Form)
Form.show()
ui.consumer()
sys.exit(app.exec_())
the date receive it correct but can't display it .
Solution
Explanation:
The cause of the problem is that the infinite loop is blocking the Qt eventloop preventing it from handling tasks like updating the gui, listening to OS events, etc.
Solution:
You must implement some logic that allows to communicate with ZMQ and does not block the eventloop, and for this there are several options (in the following code I show the example of the server of my test):
server.py
import random
import time
import zmq
import numpy as np
import logging
logging.basicConfig(level=logging.DEBUG)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
while True:
d = []
start = random.randint(0, 10)
for i in range(10):
x = start + i
d.append(x)
d.append(x ** 2)
arr = np.array(d, dtype="float32")
buf = arr.tobytes()
logging.debug(buf)
socket.send(buf)
time.sleep(1)
1. threading.Thread
import sys
import threading
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class ZMQReceiver(QtCore.QObject):
dataChanged = QtCore.pyqtSignal(bytes)
def start(self):
threading.Thread(target=self._execute, daemon=True).start()
def _execute(self):
context = zmq.Context()
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5557")
while True:
buff = consumer_receiver.recv()
self.dataChanged.emit(buff)
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.dataChanged.connect(self.on_data_changed)
zmq_receiver.start()
@QtCore.pyqtSlot(bytes)
def on_data_changed(self, buff):
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
2. QSocketNotifier
import sys
from PyQt5 import QtCore, QtWidgets
import zmq
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = zmq.Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5556")
self.read_notifier = QtCore.QSocketNotifier(
self.consumer_receiver.getsockopt(zmq.FD), QtCore.QSocketNotifier.Read, self
)
self.read_notifier.activated.connect(self.on_read_msg)
@QtCore.pyqtSlot()
def on_read_msg(self):
self.read_notifier.setEnabled(False)
if self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
while self.consumer_receiver.getsockopt(zmq.EVENTS) & zmq.POLLIN:
buff = self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
self.read_notifier.setEnabled(True)
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
w = Widget()
w.show()
sys.exit(app.exec_())
3. asyncio
import sys
import asyncio
from PyQt5 import QtCore, QtWidgets
from asyncqt import QEventLoop, asyncClose
# from qasync import QEventLoop, asyncClose
import zmq
from zmq.asyncio import Context, Poller
import numpy as np
class Widget(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.logedit_1 = QtWidgets.QTextEdit(readOnly=True)
self.logedit_2 = QtWidgets.QTextEdit(readOnly=True)
lay = QtWidgets.QHBoxLayout(self)
lay.addWidget(self.logedit_1)
lay.addWidget(self.logedit_2)
context = Context()
self.consumer_receiver = context.socket(zmq.PULL)
self.consumer_receiver.connect("tcp://127.0.0.1:5557")
self.poller = Poller()
self.poller.register(self.consumer_receiver, zmq.POLLIN)
async def start_consumer(self):
while True:
events = await self.poller.poll()
if self.consumer_receiver in dict(events):
buff = await self.consumer_receiver.recv()
data = np.frombuffer(buff, dtype="float32")
bandwidth = []
signals = []
for i in range(0, len(data), 2):
signals.append(data[i])
bandwidth.append(data[i + 1])
for sg, bw in zip(signals, bandwidth):
self.logedit_1.append(str(sg))
self.logedit_2.append(str(bw))
@asyncClose
async def closeEvent(self, event):
self.consumer_receiver.close()
if __name__ == "__main__":
app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
w = Widget()
w.show()
try:
loop.run_until_complete(w.start_consumer())
except asyncio.CancelledError:
print("start_consumer is cancelled now")
finally:
loop.close()
Answered By - eyllanesc
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.