Вопрос-Ответ

How can I get the return value of a function passed to multiprocessing.Process?

Как я могу получить возвращаемое значение функции, переданной в multiprocessing.Process?

В приведенном ниже примере кода я хотел бы получить возвращаемое значение функции worker. Как я могу это сделать? Где хранится это значение?

Пример кода:

import multiprocessing

def worker(procnum):
'''worker function'''
print str(procnum) + ' represent!'
return procnum


if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
print jobs

Вывод:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

Кажется, я не могу найти соответствующий атрибут в объектах, хранящихся в jobs.

Переведено автоматически
Ответ 1

Используйте общую переменную для связи. Например, вот так,

Пример кода:

import multiprocessing


def worker(procnum, return_dict):
"""worker function"""
print(str(procnum) + " represent!")
return_dict[procnum] = procnum


if __name__ == "__main__":
manager = multiprocessing.Manager()
return_dict = manager.dict()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
print(return_dict.values())

Вывод:

0 represent!
1 represent!
3 represent!
2 represent!
4 represent!
[0, 1, 3, 2, 4]
Ответ 2

Я думаю, что подход, предложенный sega_sai, является лучшим. Но для этого действительно нужен пример кода, поэтому здесь:

import multiprocessing
from os import getpid

def worker(procnum):
print('I am number %d in process %d' % (procnum, getpid()))
return getpid()

if __name__ == '__main__':
pool = multiprocessing.Pool(processes = 3)
print(pool.map(worker, range(5)))

Который выведет возвращаемые значения:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

Если вы знакомы с map (встроенный Python 2), это не должно быть слишком сложным. В противном случае взгляните на ссылку sega_Sai .

Обратите внимание, как мало кода требуется. (Также обратите внимание, как процессы используются повторно.)

Ответ 3

Для всех, кто ищет, как получить значение из Process используя Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
ret = queue.get()
ret['foo'] = True
queue.put(ret)

if __name__ == '__main__':
queue = multiprocessing.Queue()
queue.put(ret)
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
p.join()
print(queue.get()) # Prints {"foo": True}

Обратите внимание, что в Windows или Jupyter Notebook с помощью multithreading вы должны сохранить это как файл и выполнить файл. Если вы сделаете это в командной строке, вы увидите ошибку, подобную этой:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>
Ответ 4

По какой-то причине я нигде не смог найти общий пример того, как это сделать с Queue (даже примеры doc в Python не запускают несколько процессов), так что вот что у меня получилось примерно после 10 попыток:

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
ret = arg1 + arg2
queue.put(ret)

def multi_add(): # spawns child processes
q = Queue()
processes = []
rets = []
for _ in range(0, 100):
p = Process(target=add_helper, args=(q, 1, 2))
processes.append(p)
p.start()
for p in processes:
ret = q.get() # will block
rets.append(ret)
for p in processes:
p.join()
return rets

Queue это блокирующая потокобезопасная очередь, которую вы можете использовать для хранения возвращаемых значений из дочерних процессов. Таким образом, вы должны передавать очередь каждому процессу. Что-то менее очевидное здесь заключается в том, что вам нужно get() из очереди, прежде чем вы join выполните Processes, иначе очередь заполнится и заблокирует все.

Обновление для тех, кто объектно-ориентирован (протестировано в Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

def __init__(self):
self.processes = []
self.queue = Queue()

@staticmethod
def _wrapper(func, queue, args, kwargs):
ret = func(*args, **kwargs)
queue.put(ret)

def run(self, func, *args, **kwargs):
args2 = [func, self.queue, args, kwargs]
p = Process(target=self._wrapper, args=args2)
self.processes.append(p)
p.start()

def wait(self):
rets = []
for p in self.processes:
ret = self.queue.get()
rets.append(ret)
for p in self.processes:
p.join()
return rets

# tester
if __name__ == "__main__":
mp = Multiprocessor()
num_proc = 64
for _ in range(num_proc): # queue up multiple tasks running `sum`
mp.run(sum, [1, 2, 3, 4, 5])
ret = mp.wait() # get all results
print(ret)
assert len(ret) == num_proc and all(r == 15 for r in ret)
python function multiprocessing