Issue
I'm starting here a new topic that will be linked with this question.
I invite you to just read on the background in order to get the global idea.
So I’ve a download function which relies on a python 3.2 API (developed by a private company). The process can take up to 400 seconds per file.
Obviously, I don't have only one file to download, so I've been trying for days to put every download process in a thread pool. Each thread in the pool should be completely autonomous from the GUI Main Thread. When one of them has finished, it should just send a signal to the GUI.
I did several tests but whatever the technique used, but
- the GUI is freezing;
- the result is only given at the end of the processing of all threads and not – as wanted – one by one.
I think that the download method given by the API is a blocking function that can't be threaded.
So my question is simple: how to know if a I/O method can be handled through a thread.
November 24,2017 Update
You will find below a first draft (with the tandem multiprocessing.pool / map_async) that partially meets my expectations. As you will see, I unfortunately had to insert a "Busy Waiting Loop" in order to get on the QPlainTextEdit some information on what was going on.
The results of the tasks are given only at the end of the global processing (behaviour map_async). That's not exactly what I'm looking for. I would like to insert a little more real time and see for each completed task its message immediately on the console.
import time
import multiprocessing
import private.library as bathy
from PyQt4 import QtCore, QtGui
import os
import sys
user = 'user'
password = 'password'
server = 'server'
basename = 'basename'
workers = multiprocessing.cpu_count()
node = bathy.NodeManager(user, password, server)
database = node.get_database(basename)
ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)
# ---------------------------------------------------------------------------------
def download(surface_id, index):
global node
global database
t = time.time()
message = 'Surface #%d - Process started\n' % index
surface = database.get_surface(surface_id)
metadata = surface.get_metadata()
file_path = os.path.join("C:\\Users\\philippe\\Test_Download",
metadata["OBJNAM"] + ".surf")
try:
surface.download_bathymetry(file_path)
except RuntimeError as error:
message += "Error : " + str(error).split('\n')[0] + '\n'
finally:
message += ('Process ended : %.2f s\n' % (time.time() - t))
return message
# ---------------------------------------------------------------------------------
def pass_args(args):
# Method to pass multiple arguments to download (multiprocessing.Pool)
return download(*args)
# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
def __init__(self):
super(self.__class__, self).__init__()
self.resize(600, 300)
self.setMinimumSize(QtCore.QSize(600, 300))
self.setWindowTitle("Console")
self.setModal(True)
self.verticalLayout = QtGui.QVBoxLayout(self)
# Text edit
# -------------------------------------------------------------------------
self.text_edit = QtGui.QPlainTextEdit(self)
self.text_edit.setReadOnly(True)
self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
self.verticalLayout.addWidget(self.text_edit)
# Ok / Close
# -------------------------------------------------------------------------
self.button_box = QtGui.QDialogButtonBox(self)
self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close |
QtGui.QDialogButtonBox.Ok)
self.button_box.setObjectName("button_box")
self.verticalLayout.addWidget(self.button_box)
# Connect definition
# -------------------------------------------------------------------------
self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
QtCore.SIGNAL('clicked()'),
self.button_cancel_clicked)
self.connect(self.button_box.button(QtGui.QDialogButtonBox.Ok),
QtCore.SIGNAL('clicked()'),
self.button_ok_clicked)
# Post initialization
# -------------------------------------------------------------------------
self.pool = multiprocessing.Pool(processes=workers)
# Connect functions
# -----------------------------------------------------------------------------
def button_cancel_clicked(self):
self.close()
def button_ok_clicked(self):
jobs_args = [(surface_id, index) for index, surface_id in enumerate(ids)]
async = pool.map_async(pass_args, jobs_args)
pool.close()
# Busy waiting loop
while True:
# pool.map_async has a _number_left attribute, and a ready() method
if async.ready():
self.write_stream("All tasks completed\n")
pool.join()
for line in async.get():
self.write_stream(line)
break
remaining = async._number_left
self.write_stream("Waiting for %d task(s) to complete...\n" % remaining)
time.sleep(0.5)
# Other functions
# -----------------------------------------------------------------------------
def write_stream(self, text):
self.text_edit.insertPlainText(text)
cursor = self.text_edit.textCursor()
self.text_edit.setTextCursor(cursor)
app.processEvents()
# ---------------------------------------------------------------------------------
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
window = Console()
window.show()
app.exec_()
Questions
- At first glance does the above code for you present conceptual errors?
- Do I have to use the apply_async method in this specific case to get something more interactive?
- Could you guide me on how to use a callback function to post a custom event to update the console (methodology suggested by @ekhumoro)?
November 25,2017 Update
I had a try with apply_async:
def button_ok_clicked(self):
# Pool.apply_async - the call returns immediately instead of
# waiting for the result
for index, surface_id in enumerate(ids):
async = pool.apply_async(download,
args=(surface_id, index),
callback=self.write_stream)
pool.close()
with a callback:
def write_stream(self, text):
# This is called whenever pool.apply_async(i) returns a result
self.text_edit.insertPlainText(text)
cursor = self.text_edit.textCursor()
self.text_edit.setTextCursor(cursor)
# Update the text edit
app.processEvents()
Unfortunately, by doing this way the application crashes. I think I'll have to put a lock mechanism to prevent all the tasks from writing in the text edit at the same time.
Solution
Below is a simplified version of your example script that shows how to post a custom event using a callback. Each job is processed separately via apply_async
, so a simple counter is updated to indicate when all the jobs have been completed.
import sys, time, random, multiprocessing
from PyQt4 import QtCore, QtGui
ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)
def download(surface_id, index):
t = time.time()
message = 'Surface #%s (%s) - Process started\n' % (index, surface_id)
time.sleep(random.random())
message += 'Process ended : %.2f s\n' % (time.time() - t)
return message
def pass_args(args):
return download(*args)
class CustomEvent(QtCore.QEvent):
DownloadComplete = QtCore.QEvent.registerEventType()
def __init__(self, typeid, *args):
super().__init__(typeid)
self.data = args
class Console(QtGui.QDialog):
def __init__(self):
super().__init__()
self.resize(600, 300)
self.setMinimumSize(QtCore.QSize(600, 300))
self.setWindowTitle("Console")
self.verticalLayout = QtGui.QVBoxLayout(self)
self.text_edit = QtGui.QPlainTextEdit(self)
self.text_edit.setReadOnly(True)
self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
self.verticalLayout.addWidget(self.text_edit)
self.button_box = QtGui.QDialogButtonBox(self)
self.button_box.setStandardButtons(
QtGui.QDialogButtonBox.Close | QtGui.QDialogButtonBox.Ok)
self.button_box.setObjectName("button_box")
self.verticalLayout.addWidget(self.button_box)
self.button_box.button(QtGui.QDialogButtonBox.Close
).clicked.connect(self.button_cancel_clicked)
self.button_box.button(QtGui.QDialogButtonBox.Ok
).clicked.connect(self.button_ok_clicked)
self.pool = multiprocessing.Pool(None)
def event(self, event):
if event.type() == CustomEvent.DownloadComplete:
message, complete = event.data
self.write_stream(message)
if complete:
self.write_stream('Downloads complete!')
return super().event(event)
def button_cancel_clicked(self):
self.close()
def button_ok_clicked(self):
total = len(ids)
def callback(message):
nonlocal total
total -= 1
QtGui.qApp.postEvent(self, CustomEvent(
CustomEvent.DownloadComplete, message, not total))
for index, surface_id in enumerate(ids):
self.pool.apply_async(
pass_args, [(surface_id, index)], callback=callback)
def write_stream(self, text):
self.text_edit.insertPlainText(text)
cursor = self.text_edit.textCursor()
self.text_edit.setTextCursor(cursor)
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
window = Console()
window.show()
app.exec_()
Answered By - ekhumoro
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.