Tkinter: как использовать потоки для предотвращения "зависания" цикла основных событий
У меня есть небольшой тест с графическим интерфейсом с кнопкой "Пуск" и индикатором выполнения. Желаемое поведение:
- Нажмите "Пуск"
- Индикатор выполнения колеблется в течение 5 секунд
- Панель прогресса останавливается
Наблюдаемое поведение заключается в том, что кнопка "Пуск" зависает на 5 секунд, затем отображается индикатор выполнения (без колебаний).
Вот мой код на данный момент:
class GUI:
def __init__(self, master):
self.master = master
self.test_button = Button(self.master, command=self.tb_click)
self.test_button.configure(
text="Start", background="Grey",
padx=50
)
self.test_button.pack(side=TOP)
def progress(self):
self.prog_bar = ttk.Progressbar(
self.master, orient="horizontal",
length=200, mode="indeterminate"
)
self.prog_bar.pack(side=TOP)
def tb_click(self):
self.progress()
self.prog_bar.start()
# Simulate long running process
t = threading.Thread(target=time.sleep, args=(5,))
t.start()
t.join()
self.prog_bar.stop()
root = Tk()
root.title("Test Button")
main_ui = GUI(root)
root.mainloop()
Основываясь на информации от Брайана Оукли здесь, я понимаю, что мне нужно использовать потоки. Я попытался создать поток, но я предполагаю, что, поскольку поток запускается из основного потока, это не помогает.
У меня была идея поместить логическую часть в другой класс и создать экземпляр GUI из этого класса, аналогично примеру кода А. Родаса здесь.
Мой вопрос:
Я не могу понять, как закодировать это так, чтобы эта команда:
self.test_button = Button(self.master, command=self.tb_click)
вызывает функцию, которая находится в другом классе. Это плохо или это вообще возможно? Как бы мне создать 2-й класс, который может обрабатывать self.tb_click? Я попытался следовать примеру A. Пример кода Родаса, который прекрасно работает. Но я не могу понять, как реализовать его решение в случае виджета Button, который запускает действие.
Если бы я вместо этого обрабатывал поток из одного класса GUI, как бы создать поток, который не мешает основному потоку?
Переведено автоматически
Ответ 1
Когда вы присоединяетесь к новому потоку в основном потоке, он будет ждать завершения потока, поэтому графический интерфейс будет заблокирован, даже если вы используете многопоточность.
Если вы хотите поместить логическую часть в другой класс, вы можете напрямую создать подкласс Thread, а затем запустить новый объект этого класса при нажатии кнопки. Конструктор этого подкласса Thread может получить объект Queue, и тогда вы сможете передать его в графическую часть. Итак, мое предложение таково:
- Создайте объект очереди в основном потоке
- Создайте новый поток с доступом к этой очереди
- Периодически проверяйте очередь в главном потоке
Тогда вам нужно решить проблему того, что произойдет, если пользователь дважды нажмет одну и ту же кнопку (при каждом нажатии будет создаваться новый поток), но вы можете это исправить, отключив кнопку "Пуск" и включив ее снова после вызова self.prog_bar.stop()
.
import queue
class GUI:
# ...
def tb_click(self):
self.progress()
self.prog_bar.start()
self.queue = queue.Queue()
ThreadedTask(self.queue).start()
self.master.after(100, self.process_queue)
def process_queue(self):
try:
msg = self.queue.get_nowait()
# Show result of the task if needed
self.prog_bar.stop()
except queue.Empty:
self.master.after(100, self.process_queue)
class ThreadedTask(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
time.sleep(5) # Simulate long running process
self.queue.put("Task finished")
Ответ 2
I will submit the basis for an alternate solution. It is not specific to a Tk progress bar per se, but it can certainly be implemented very easily for that.
Here are some classes that allow you to run other tasks in the background of Tk, update the Tk controls when desired, and not lock up the gui!
Here's class TkRepeatingTask and BackgroundTask:
import threading
class TkRepeatingTask():
def __init__( self, tkRoot, taskFuncPointer, freqencyMillis ):
self.__tk_ = tkRoot
self.__func_ = taskFuncPointer
self.__freq_ = freqencyMillis
self.__isRunning_ = False
def isRunning( self ) : return self.__isRunning_
def start( self ) :
self.__isRunning_ = True
self.__onTimer()
def stop( self ) : self.__isRunning_ = False
def __onTimer( self ):
if self.__isRunning_ :
self.__func_()
self.__tk_.after( self.__freq_, self.__onTimer )
class BackgroundTask():
def __init__( self, taskFuncPointer ):
self.__taskFuncPointer_ = taskFuncPointer
self.__workerThread_ = None
self.__isRunning_ = False
def taskFuncPointer( self ) : return self.__taskFuncPointer_
def isRunning( self ) :
return self.__isRunning_ and self.__workerThread_.isAlive()
def start( self ):
if not self.__isRunning_ :
self.__isRunning_ = True
self.__workerThread_ = self.WorkerThread( self )
self.__workerThread_.start()
def stop( self ) : self.__isRunning_ = False
class WorkerThread( threading.Thread ):
def __init__( self, bgTask ):
threading.Thread.__init__( self )
self.__bgTask_ = bgTask
def run( self ):
try :
self.__bgTask_.taskFuncPointer()( self.__bgTask_.isRunning )
except Exception as e: print repr(e)
self.__bgTask_.stop()
Here's a Tk test which demos the use of these. Just append this to the bottom of the module with those classes in it if you want to see the demo in action:
def tkThreadingTest():
from tkinter import Tk, Label, Button, StringVar
from time import sleep
class UnitTestGUI:
def __init__( self, master ):
self.master = master
master.title( "Threading Test" )
self.testButton = Button(
self.master, text="Blocking", command=self.myLongProcess )
self.testButton.pack()
self.threadedButton = Button(
self.master, text="Threaded", command=self.onThreadedClicked )
self.threadedButton.pack()
self.cancelButton = Button(
self.master, text="Stop", command=self.onStopClicked )
self.cancelButton.pack()
self.statusLabelVar = StringVar()
self.statusLabel = Label( master, textvariable=self.statusLabelVar )
self.statusLabel.pack()
self.clickMeButton = Button(
self.master, text="Click Me", command=self.onClickMeClicked )
self.clickMeButton.pack()
self.clickCountLabelVar = StringVar()
self.clickCountLabel = Label( master, textvariable=self.clickCountLabelVar )
self.clickCountLabel.pack()
self.threadedButton = Button(
self.master, text="Timer", command=self.onTimerClicked )
self.threadedButton.pack()
self.timerCountLabelVar = StringVar()
self.timerCountLabel = Label( master, textvariable=self.timerCountLabelVar )
self.timerCountLabel.pack()
self.timerCounter_=0
self.clickCounter_=0
self.bgTask = BackgroundTask( self.myLongProcess )
self.timer = TkRepeatingTask( self.master, self.onTimer, 1 )
def close( self ) :
print "close"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
self.master.quit()
def onThreadedClicked( self ):
print "onThreadedClicked"
try: self.bgTask.start()
except: pass
def onTimerClicked( self ) :
print "onTimerClicked"
self.timer.start()
def onStopClicked( self ) :
print "onStopClicked"
try: self.bgTask.stop()
except: pass
try: self.timer.stop()
except: pass
def onClickMeClicked( self ):
print "onClickMeClicked"
self.clickCounter_+=1
self.clickCountLabelVar.set( str(self.clickCounter_) )
def onTimer( self ) :
print "onTimer"
self.timerCounter_+=1
self.timerCountLabelVar.set( str(self.timerCounter_) )
def myLongProcess( self, isRunningFunc=None ) :
print "starting myLongProcess"
for i in range( 1, 10 ):
try:
if not isRunningFunc() :
self.onMyLongProcessUpdate( "Stopped!" )
return
except : pass
self.onMyLongProcessUpdate( i )
sleep( 1.5 ) # simulate doing work
self.onMyLongProcessUpdate( "Done!" )
def onMyLongProcessUpdate( self, status ) :
print "Process Update: %s" % (status,)
self.statusLabelVar.set( str(status) )
root = Tk()
gui = UnitTestGUI( root )
root.protocol( "WM_DELETE_WINDOW", gui.close )
root.mainloop()
if __name__ == "__main__":
tkThreadingTest()
Two import points I'll stress about BackgroundTask:
1) The function you run in the background task needs to take a function pointer it will both invoke and respect, which allows the task to be cancelled mid way through - if possible.
2) You need to make sure the background task is stopped when you exit your application. That thread will still run even if your gui is closed if you don't address that!
Ответ 3
I have used RxPY which has some nice threading functions to solve this in a fairly clean manner. No queues, and I have provided a function that runs on the main thread after completion of the background thread. Here is a working example:
import rx
from rx.scheduler import ThreadPoolScheduler
import time
import tkinter as tk
class UI:
def __init__(self):
self.root = tk.Tk()
self.pool_scheduler = ThreadPoolScheduler(1) # thread pool with 1 worker thread
self.button = tk.Button(text="Do Task", command=self.do_task).pack()
def do_task(self):
rx.empty().subscribe(
on_completed=self.long_running_task,
scheduler=self.pool_scheduler
)
def long_running_task(self):
# your long running task here... eg:
time.sleep(3)
# if you want a callback on the main thread:
self.root.after(5, self.on_task_complete)
def on_task_complete(self):
pass # runs on main thread
if __name__ == "__main__":
ui = UI()
ui.root.mainloop()
Another way to use this construct which might be cleaner (depending on preference):
tk.Button(text="Do Task", command=self.button_clicked).pack()
...
def button_clicked(self):
def do_task(_):
time.sleep(3) # runs on background thread
def on_task_done():
pass # runs on main thread
rx.just(1).subscribe(
on_next=do_task,
on_completed=lambda: self.root.after(5, on_task_done),
scheduler=self.pool_scheduler
)
Ответ 4
The problem is that t.join() blocks the click event, the main thread does not get back to the event loop to process repaints.
See Why ttk Progressbar appears after process in Tkinter or TTK progress bar blocked when sending email