Вопрос-Ответ

Can't pickle when using multiprocessing Pool.map()

Не удается настроить <тип 'instancemethod'> при использовании многопроцессорного пула.map()

Я пытаюсь использовать функцию multiprocessing's Pool.map() для одновременного разделения работы. Когда я использую следующий код, он работает нормально:

import multiprocessing

def f(x):
return x*x

def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))


if __name__== '__main__' :
go()

Однако, когда я использую его в более объектно-ориентированном подходе, он не работает. Сообщение об ошибке, которое он выдает, является:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

Это происходит, когда ниже приведена моя основная программа:

import someClass

if __name__== '__main__' :
sc = someClass.someClass()
sc.go()

и следующий мой someClass класс:

import multiprocessing

class someClass(object):
def __init__(self):
pass

def f(self, x):
return x*x

def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))

Кто-нибудь знает, в чем может быть проблема, или простой способ ее обойти?

Переведено автоматически
Ответ 1

Проблема в том, что многопроцессорная обработка должна рассортировывать объекты, чтобы распределять их между процессами, а связанные методы не могут быть рассортированы. Обходной путь (независимо от того, считаете ли вы это "простым" или нет;-) заключается в добавлении инфраструктуры в вашу программу, позволяющей изменять такие методы, зарегистрировав ее с помощью метода стандартной библиотеки copy_reg.

Например, вклад Стивена Бетхарда в этот поток (ближе к концу потока) показывает один совершенно работоспособный подход, позволяющий выполнять травление / открепление метода через copy_reg.

Ответ 2

Все эти решения уродливы, потому что многопроцессорная обработка и травление нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.

Если вы используете форк multiprocessing called pathos.multiprocesssing , вы можете напрямую использовать классы и методы классов в map функциях многопроцессорного пула. Это потому, что dill используется вместо pickle или cPickle, и dill может сериализовать практически все, что угодно в python.

pathos.multiprocessing также предоставляет функцию асинхронного отображения ... и она может map функционировать с несколькими аргументами (например, map(math.pow, [1,2,3], [4,5,6]))

Смотрите: Что могут многопроцессорная обработка и dill делать вместе?

и: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization /

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

И просто для наглядности, вы можете сделать именно то, что хотели сделать в первую очередь, и вы можете сделать это из интерпретатора, если хотите.

>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>

Получить код здесь:
https://github.com/uqfoundation/pathos

Ответ 3

Вы также могли бы определить __call__() метод внутри вашего someClass(), который вызывает someClass.go() и затем передает экземпляр someClass() в пул. Этот объект можно удалить, и он отлично работает (для меня)...

Ответ 4

Однако у решения Стивена Бетхарда есть некоторые ограничения :

Когда вы регистрируете метод своего класса как функцию, деструктор вашего класса неожиданно вызывается каждый раз, когда обработка вашего метода завершается. Итак, если у вас есть 1 экземпляр вашего класса, который вызывает свой метод n раз, члены могут исчезнуть между двумя запусками, и вы можете получить сообщение malloc: *** error for object 0x...: pointer being freed was not allocated (например, открыть файл члена) или pure virtual method called,
terminate called without an active exception
(что означает, что время жизни объекта-члена, который я использовал, было короче, чем я думал). Я получил это, когда имел дело с числом n, превышающим размер пула. Вот краткий пример :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)


class Myclass(object):

def __init__(self, nobj, workers=cpu_count()):

print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results

def __del__(self):
print "... Destructor"

def process_obj(self, index):
print "object %d" % index
return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Вывод:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

Метод __call__ не настолько эквивалентен, потому что [None, ...] считываются из результатов :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

def __init__(self, nobj, workers=cpu_count()):

print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results

def __call__(self, i):
self.process_obj(i)

def __del__(self):
print "... Destructor"

def process_obj(self, i):
print "obj %d" % i
return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !

Таким образом, ни один из обоих методов не удовлетворяет...

python multithreading multiprocessing