Can't pickle when using multiprocessing Pool.map()
Не удается настроить <тип 'instancemethod'> при использовании многопроцессорного пула.map()
Я пытаюсь использовать функцию multiprocessing's Pool.map() для одновременного разделения работы. Когда я использую следующий код, он работает нормально:
import multiprocessing
deff(x): return x*x
defgo(): pool = multiprocessing.Pool(processes=4) print pool.map(f, range(10))
if __name__== '__main__' : go()
Однако, когда я использую его в более объектно-ориентированном подходе, он не работает. Сообщение об ошибке, которое он выдает, является:
Это происходит, когда ниже приведена моя основная программа:
import someClass
if __name__== '__main__' : sc = someClass.someClass() sc.go()
и следующий мой someClass класс:
import multiprocessing
classsomeClass(object): def__init__(self): pass
deff(self, x): return x*x
defgo(self): pool = multiprocessing.Pool(processes=4) print pool.map(self.f, range(10))
Кто-нибудь знает, в чем может быть проблема, или простой способ ее обойти?
Переведено автоматически
Ответ 1
Проблема в том, что многопроцессорная обработка должна рассортировывать объекты, чтобы распределять их между процессами, а связанные методы не могут быть рассортированы. Обходной путь (независимо от того, считаете ли вы это "простым" или нет;-) заключается в добавлении инфраструктуры в вашу программу, позволяющей изменять такие методы, зарегистрировав ее с помощью метода стандартной библиотеки copy_reg.
Например, вклад Стивена Бетхарда в этот поток (ближе к концу потока) показывает один совершенно работоспособный подход, позволяющий выполнять травление / открепление метода через copy_reg.
Ответ 2
Все эти решения уродливы, потому что многопроцессорная обработка и травление нарушены и ограничены, если вы не выйдете за пределы стандартной библиотеки.
Если вы используете форк multiprocessing called pathos.multiprocesssing , вы можете напрямую использовать классы и методы классов в map функциях многопроцессорного пула. Это потому, что dill используется вместо pickle или cPickle, и dill может сериализовать практически все, что угодно в python.
pathos.multiprocessing также предоставляет функцию асинхронного отображения ... и она может map функционировать с несколькими аргументами (например, map(math.pow, [1,2,3], [4,5,6]))
Вы также могли бы определить __call__() метод внутри вашего someClass(), который вызывает someClass.go() и затем передает экземпляр someClass() в пул. Этот объект можно удалить, и он отлично работает (для меня)...
Ответ 4
Однако у решения Стивена Бетхарда есть некоторые ограничения :
Когда вы регистрируете метод своего класса как функцию, деструктор вашего класса неожиданно вызывается каждый раз, когда обработка вашего метода завершается. Итак, если у вас есть 1 экземпляр вашего класса, который вызывает свой метод n раз, члены могут исчезнуть между двумя запусками, и вы можете получить сообщение malloc: *** error for object 0x...: pointer being freed was not allocated (например, открыть файл члена) или pure virtual method called, terminate called without an active exception (что означает, что время жизни объекта-члена, который я использовал, было короче, чем я думал). Я получил это, когда имел дело с числом n, превышающим размер пула. Вот краткий пример :
from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above ------------- from copy_reg import pickle from types import MethodType
def_unpickle_method(func_name, obj, cls): for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls)
classMyclass(object):
def__init__(self, nobj, workers=cpu_count()):
print"Constructor ..." # multi-processing pool = Pool(processes=workers) async_results = [ pool.apply_async(self.process_obj, (i,)) for i inrange(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results
def__del__(self): print"... Destructor"
defprocess_obj(self, index): print"object %d" % index return"results"
pickle(MethodType, _pickle_method, _unpickle_method) Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once)
Метод __call__ не настолько эквивалентен, потому что [None, ...] считываются из результатов :
from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult
classMyclass(object):
def__init__(self, nobj, workers=cpu_count()):
print"Constructor ..." # multiprocessing pool = Pool(processes=workers) async_results = [ pool.apply_async(self, (i,)) for i inrange(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results
def__call__(self, i): self.process_obj(i)
def__del__(self): print"... Destructor"
defprocess_obj(self, i): print"obj %d" % i return"result"
Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once), # **and** results are empty !
Таким образом, ни один из обоих методов не удовлетворяет...