Threading pool similar to the multiprocessing Pool?
Пул потоков похож на пул многопроцессорных процессов?
Существует ли класс пула для рабочих потоков, аналогичный классу пула многопроцессорного модуля?
Мне нравится, например, простой способ распараллеливания функции map
deflong_running_func(p): c_func_no_gil(p)
p = multiprocessing.Pool(4) xs = p.map(long_running_func, range(100))
однако я хотел бы сделать это без накладных расходов на создание новых процессов.
Я знаю о GIL. Однако в моем случае использования функция будет функцией C с привязкой к вводу-выводу, для которой оболочка python выпустит GIL перед фактическим вызовом функции.
Должен ли я писать свой собственный пул потоков?
Переведено автоматически
Ответ 1
Я только что узнал, что на самом деле в модуле естьmultiprocessing интерфейс пула потоков, однако он несколько скрыт и не задокументирован должным образом.
Его можно импортировать через
from multiprocessing.pool import ThreadPool
Он реализован с использованием фиктивного класса процесса, обертывающего поток python. Этот класс процессов на основе потоков можно найти в multiprocessing.dummy который кратко упоминается в документах. Этот фиктивный модуль предположительно предоставляет весь интерфейс многопроцессорной обработки на основе потоков.
if(PROCESS): pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) else: pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, initializer=start_process)
pool.map(worker, inputs) ....
Ответ 4
Для чего-то очень простого и легковесного (немного измененного по сравнению с здесь):
from Queue import Queue from threading import Thread
classWorker(Thread): """Thread executing tasks from a given tasks queue""" def__init__(self, tasks): Thread.__init__(self) self.tasks = tasks self.daemon = True self.start()
classThreadPool: """Pool of threads consuming tasks from a queue""" def__init__(self, num_threads): self.tasks = Queue(num_threads) for _ inrange(num_threads): Worker(self.tasks)
defadd_task(self, func, *args, **kargs): """Add a task to the queue""" self.tasks.put((func, args, kargs))
defwait_completion(self): """Wait for completion of all the tasks in the queue""" self.tasks.join()
if __name__ == '__main__': from random import randrange from time import sleep
delays = [randrange(1, 10) for i inrange(100)]
defwait_delay(d): print'sleeping for (%d)sec' % d sleep(d)
pool = ThreadPool(20)
for i, d inenumerate(delays): pool.add_task(wait_delay, d)
pool.wait_completion()
Для поддержки обратных вызовов при завершении задачи вы можете просто добавить обратный вызов к кортежу задач.