Issue
I'm reading weather sensor data from a Prometheus database and updating the widget displaying the weather data every 30 seconds. The "every 30 seconds" is done with a time.sleep in the Worker thread and I'm guessing that there is probably a better way to do that? For example, I'm trying to implement a closeEvent method on the MainWindow but it never exits. What's the proper way to implement a periodic task with a QRunnable?
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
import time
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = WorkerSignals()
@QtCore.pyqtSlot()
def run(self):
while True:
print("Thread start")
self.signals.result.emit(float(self.prom_sensor.get_sensor_latest()))
time.sleep(30)
print("Thread complete")
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.threadpool = QtCore.QThreadPool()
self.prom_sensor = GetPromSensor()
self.previous = None
layout = QtWidgets.QGridLayout()
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
widget = QtWidgets.QWidget()
widget.setLayout(layout)
self.setCentralWidget(widget)
self.show()
self.start_worker()
def start_worker(self):
worker = Worker(prom_sensor = self.prom_sensor)
worker.signals.result.connect(self.process_result)
self.threadpool.start(worker)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setText(str(temperature))
if __name__ == '__main__':
app = QtWidgets.QApplication([])
window = MainWindow()
app.exec()
Solution
Instead of using time.sleep you can use a QTimer to start a worker every T seconds, and then just run the heavy reading task on the other thread.
from PyQt6 import QtGui
from PyQt6 import QtWidgets
from PyQt6 import QtCore
from sensor import GetPromSensor
class WorkerSignals(QtCore.QObject):
result = QtCore.pyqtSignal(float)
class Worker(QtCore.QRunnable):
def __init__(self, prom_sensor, signals):
super(Worker, self).__init__()
self.prom_sensor = prom_sensor
self.signals = signals
def run(self):
value = self.prom_sensor.get_sensor_latest()
self.signals.result.emit(float(value))
class MainWindow(QtWidgets.QMainWindow):
def __init__(self):
super(MainWindow, self).__init__()
self.prom_sensor = GetPromSensor()
self.threadpool = QtCore.QThreadPool()
self.signals = WorkerSignals()
self.signals.result.connect(self.process_result)
self.previous = None
widget = QtWidgets.QWidget()
self.setCentralWidget(widget)
layout = QtWidgets.QGridLayout(widget)
self.label = QtWidgets.QLabel("Start")
self.label.setStyleSheet("background-color:rosybrown; border-radius:5px")
self.label.setAlignment(QtCore.Qt.Alignment.AlignCenter)
self.label.setFont(QtGui.QFont("Arial", 32))
layout.addWidget(self.label)
self.request_data()
def request_data(self):
worker = Worker(prom_sensor=self.prom_sensor, signals=self.signals)
self.threadpool.start(worker)
QtCore.QTimer.singleShot(5 * 1000, self.request_data)
@QtCore.pyqtSlot(float)
def process_result(self, temperature):
if temperature == self.previous:
return
self.previous = temperature
print("receiving:", temperature)
self.label.setNum(temperature)
if __name__ == "__main__":
app = QtWidgets.QApplication([])
window = MainWindow()
window.show()
app.exec()
Note: pyqtSlot only works on QObjects, and a QRunnable is not.
Answered By - eyllanesc
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.