Вопрос-Ответ

Threading pool similar to the multiprocessing Pool?

Пул потоков похож на пул многопроцессорных процессов?

Существует ли класс пула для рабочих потоков, аналогичный классу пула многопроцессорного модуля?

Мне нравится, например, простой способ распараллеливания функции map

def long_running_func(p):
c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

однако я хотел бы сделать это без накладных расходов на создание новых процессов.

Я знаю о GIL. Однако в моем случае использования функция будет функцией C с привязкой к вводу-выводу, для которой оболочка python выпустит GIL перед фактическим вызовом функции.

Должен ли я писать свой собственный пул потоков?

Переведено автоматически
Ответ 1

Я только что узнал, что на самом деле в модуле естьmultiprocessing интерфейс пула потоков, однако он несколько скрыт и не задокументирован должным образом.

Его можно импортировать через

from multiprocessing.pool import ThreadPool

Он реализован с использованием фиктивного класса процесса, обертывающего поток python. Этот класс процессов на основе потоков можно найти в multiprocessing.dummy который кратко упоминается в документах. Этот фиктивный модуль предположительно предоставляет весь интерфейс многопроцессорной обработки на основе потоков.

Ответ 2

В Python 3 вы можете использовать concurrent.futures.ThreadPoolExecutor, т.е.:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Смотрите Документы для получения дополнительной информации и примеров.

Ответ 3

Да, и, похоже, у него (более или менее) тот же API.

import multiprocessing

def worker(lnk):
....
def start_process():
.....
....

if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)

pool.map(worker, inputs)
....
Ответ 4

Для чего-то очень простого и легковесного (немного измененного по сравнению с здесь):

from Queue import Queue
from threading import Thread


class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()

def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()


class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)

def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))

def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()

if __name__ == '__main__':
from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(100)]

def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)

pool = ThreadPool(20)

for i, d in enumerate(delays):
pool.add_task(wait_delay, d)

pool.wait_completion()

Для поддержки обратных вызовов при завершении задачи вы можете просто добавить обратный вызов к кортежу задач.

python multithreading