Вопрос-Ответ

Python multiprocessing PicklingError: Can't pickle

Многопроцессорный PicklingError на Python: не удается выполнить pickle <тип 'функция'>

Мне жаль, что я не могу воспроизвести ошибку на более простом примере, а мой код слишком сложен для публикации. Если я запускаю программу в оболочке IPython вместо обычного Python, все работает хорошо.

Я просмотрел несколько предыдущих заметок по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в классе function . Но для меня это не тот случай.

Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Я был бы признателен за любую помощь.

Обновление: функция, которую я выполняю pickle, определена на верхнем уровне модуля. Хотя он вызывает функцию, содержащую вложенную функцию. т.е. f() Вызывает g() вызовы h(), у которых есть вложенная функция i(), и я вызываю pool.apply_async(f). f(), g(), h() все определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он все же работает.

Переведено автоматически
Ответ 1

Вот список того, что можно выполнить pickled. В частности, функции можно выполнять picklingerror только в том случае, если они определены на верхнем уровне модуля.

Этот фрагмент кода:

import multiprocessing as mp

class Foo():
@staticmethod
def work(self):
pass

if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

выдает ошибку, почти идентичную той, которую вы опубликовали:

Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Проблема в том, что pool все методы используют a mp.SimpleQueue для передачи задач рабочим процессам. Все, что проходит через mp.SimpleQueue должно быть выбираемым, и foo.work не может быть выбрано, поскольку оно не определено на верхнем уровне модуля.

Это можно исправить, определив функцию на верхнем уровне, которая вызывает foo.work():

def work(foo):
foo.work()

pool.apply_async(work,args=(foo,))

Обратите внимание, что это foo выбираемо, поскольку Foo определено на верхнем уровне и foo.__dict__ может быть выбрано.

Ответ 2

Я бы использовал pathos.multiprocesssing, вместо multiprocessing. pathos.multiprocessing это форк multiprocessing, который использует dill. dill можно сериализовать практически все, что угодно в python, так что вы можете отправлять гораздо больше данных параллельно. В pathos fork также есть возможность работать напрямую с функциями с несколькими аргументами, что необходимо для методов класса.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Получите pathos (и, если хотите, dill) здесь:
https://github.com/uqfoundation

Ответ 3

Когда возникает эта проблема с multiprocessing простым решением является переключение с Pool на ThreadPool. Это можно сделать без изменения кода, кроме импорта-

from multiprocessing.pool import ThreadPool as Pool

Это работает, потому что ThreadPool разделяет память с основным потоком, а не создает новый процесс - это означает, что обработка не требуется.

Недостатком этого метода является то, что python не самый лучший язык для обработки потоков - он использует нечто, называемое глобальной блокировкой интерпретатора, для обеспечения потокобезопасности, что может замедлить некоторые варианты использования здесь. Однако, если вы в основном взаимодействуете с другими системами (запускаете HTTP-команды, общаетесь с базой данных, записываете в файловые системы), то ваш код, скорее всего, не привязан к процессору и не сильно пострадает. Фактически, при написании тестов HTTP / HTTPS я обнаружил, что используемая здесь потоковая модель имеет меньшие накладные расходы и задержки, поскольку накладные расходы на создание новых процессов намного выше, чем накладные расходы на создание новых потоков, и в противном случае программа просто ожидала HTTP-ответов.

Итак, если вы обрабатываете массу данных в пользовательском пространстве python, это может быть не лучшим методом.

Ответ 4

Как говорили другие, multiprocessing может передавать объекты Python рабочим процессам только те, которые могут быть обработаны. Если вы не можете реорганизовать свой код, как описано unutbu, вы можете использовать dill расширенные возможности pickling / распаковки для передачи данных (особенно данных кода), как я показываю ниже.

Это решение требует только установки dill и никаких других библиотек, поскольку pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)


def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

pool = Pool(processes=5)

# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)

for job in jobs:
print job.get()
print

# async execution of static method

class O(object):

@staticmethod
def calc():
return os.getpid()

jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)

for job in jobs:
print job.get()
python multiprocessing