Issue
I am making a multi-threaded application, where a main process sends messages to the appropriate thread through a queue. My doubt is in the part of the thread: the solution that I have found listens constantly (up to a limit, that's why I have my class Clock and its method "isRunnig", that returns True if time is not expired yet) and if there are attempts in which no data arrives, then I catch the exception and simply continue.
I put first a simplification of the code of the main process:
def callUpdate (self, update : Update): #Update is a class that includes the correct ID of its thread and the data to process by the thread.
find = False
wrapp : _Wrapper = None
for current in self.threads:
if (type(current) is not _Wrapper): #_Wrapper is a class that includes the thread
continue
if not current.theThread.is_alive() :
#Here I save some data, and I remove the thread from
self.threads.remove(current)
continue
if (current.id == update.id):
wrapp = current
find = True
break
#Here I do some things and then, I create a new thread if not found and send first message (the update itself in this first send), or if its found and working (alive), I just send the data to the thread. Wrapper creates a new queue and saves the thread to send more data later if needed.
if (not find):
wrapp = _Wrapper(data)
self.threads.append(wrapp)
wrapp.queue.put(update)
bot.start()
else:
#Thread already working and I send the update
wrapp.queue.put(update)
Well, now I include a simplification of the thread part, which is what worries me, because it seems a bit "sloppy". Notice that I read the message queue with a 1 second pause. I have a clock class that simply returns if the indicated time has passed (in this case, 120 seconds)
def process (self): #This process is part of the class that heritate from Thread (class ProcessThread (threading.Thread):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
#At this point, the message update is correct and I process the data. Once the clock is finnish, I finnish the process
return
The problem is that sometimes the execution of the program slows down a lot, with few threads or with many threads (it seems that it has nothing to do with it); I have also tried to reduce the reread time of the queue (because if there are many requests it seems to cause problems). I have a feeling it's hacky, can anyone suggest me any other option to receive the queued data in multithread?
Thank you
---------- EDIT ---------- Sorry, I didnt include the process to get the data from queue:
#Get data from queue, maximum wait time in seconds.
def getUpdateFromQueue (self, seconds=10):
max = datetime.datetime.now() + datetime.timedelta(seconds=seconds)
current = datetime.datetime.now()
while (current < max):
try:
data : Update = self.queue.get(timeout=0.01)
return data
except Empty:
current = datetime.datetime.now()
continue
return None
Solution
Your code is spinning and waiting for no reason, which is naturally going to hurt performance; you should not be doing this in your own code at all. Instead use the timeout functionality in queue.Queue
to handle your timeouts.
For example, getUpdateFromQueue
doesn't need to loop and look at the wall time in between short-timed calls to queue.get
; it can just pass the seconds
maximum directly to queue.get
:
def getUpdateFromQueue(self, seconds=10):
try:
return self.queue.get(timeout=seconds)
except Empty:
return None
But you don't need this to be its own function in the first place. Instead of:
def process(self):
clock = Clock(seconds=120)
while (clock.isRunning()):
update: Update = self.getUpdateFromQueue(seconds=1)
if (update is None) : continue
return
you can just use queue.get
directly with the overall maximum timeout that you're trying to enforce using your Clock
class:
def process(self):
try:
return self.queue.get(timeout=120)
except Empty:
return None
That should have the same effect (return a piece of data, waiting for a maximum of 120 seconds before returning None
instead), without two nested while
loops that are constantly spinning the CPU (and both doing the same thing, just at different resolutions).
If you need to process multiple messages, you just need a single loop where you adjust the timeout on each get()
to reflect the overall deadline. (I'm using time.monotonic()
here because it by definition can't get thrown off by changes in the system clock.)
from queue import Empty
from time import monotonic
def process(self, data):
# do whatever you need to do with one piece of data
pass
def process_messages_with_timeout(self, timeout=120):
deadline = monotonic() + timeout
while True:
try:
self.process(self.queue.get(timeout=deadline - monotonic()))
except Empty:
break
The important thing is that you should only ever need to make one call to get()
per item you actually want to get, with the actual timeout; there's no point in doing a get()
with a shorter timeout than you want and then adding extra logic to retry within the real timeout. Adding extra loops within loops serves no purpose.
Answered By - Samwise
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.