Многопроцессорный PicklingError на Python: не удается выполнить pickle <тип 'функция'>
Мне жаль, что я не могу воспроизвести ошибку на более простом примере, а мой код слишком сложен для публикации. Если я запускаю программу в оболочке IPython вместо обычного Python, все работает хорошо.
Я просмотрел несколько предыдущих заметок по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в классе function . Но для меня это не тот случай.
Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Я был бы признателен за любую помощь.
Обновление: функция, которую я выполняю pickle, определена на верхнем уровне модуля. Хотя он вызывает функцию, содержащую вложенную функцию. т.е. f() Вызывает g() вызовы h(), у которых есть вложенная функция i(), и я вызываю pool.apply_async(f). f(), g(), h() все определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он все же работает.
Переведено автоматически
Ответ 1
Вот список того, что можно выполнить pickled. В частности, функции можно выполнять picklingerror только в том случае, если они определены на верхнем уровне модуля.
Этот фрагмент кода:
import multiprocessing as mp
classFoo(): @staticmethod defwork(self): pass
if __name__ == '__main__': pool = mp.Pool() foo = Foo() pool.apply_async(foo.work) pool.close() pool.join()
выдает ошибку, почти идентичную той, которую вы опубликовали:
Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks put(task) PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Проблема в том, что pool все методы используют a mp.SimpleQueue для передачи задач рабочим процессам. Все, что проходит через mp.SimpleQueue должно быть выбираемым, и foo.work не может быть выбрано, поскольку оно не определено на верхнем уровне модуля.
Это можно исправить, определив функцию на верхнем уровне, которая вызывает foo.work():
defwork(foo): foo.work()
pool.apply_async(work,args=(foo,))
Обратите внимание, что это foo выбираемо, поскольку Foo определено на верхнем уровне и foo.__dict__ может быть выбрано.
Ответ 2
Я бы использовал pathos.multiprocesssing, вместо multiprocessing. pathos.multiprocessing это форк multiprocessing, который использует dill. dill можно сериализовать практически все, что угодно в python, так что вы можете отправлять гораздо больше данных параллельно. В pathos fork также есть возможность работать напрямую с функциями с несколькими аргументами, что необходимо для методов класса.
>>> from pathos.multiprocessing import ProcessingPool as Pool >>> p = Pool(4) >>> classTest(object): ... defplus(self, x, y): ... return x+y ... >>> t = Test() >>> p.map(t.plus, x, y) [4, 6, 8, 10] >>> >>> classFoo(object): ... @staticmethod ... defwork(self, x): ... return x+1 ... >>> f = Foo() >>> p.apipe(f.work, f, 100) <processing.pool.ApplyResult object at 0x10504f8d0> >>> res = _ >>> res.get() 101
Когда возникает эта проблема с multiprocessing простым решением является переключение с Pool на ThreadPool. Это можно сделать без изменения кода, кроме импорта-
from multiprocessing.pool import ThreadPool as Pool
Это работает, потому что ThreadPool разделяет память с основным потоком, а не создает новый процесс - это означает, что обработка не требуется.
Недостатком этого метода является то, что python не самый лучший язык для обработки потоков - он использует нечто, называемое глобальной блокировкой интерпретатора, для обеспечения потокобезопасности, что может замедлить некоторые варианты использования здесь. Однако, если вы в основном взаимодействуете с другими системами (запускаете HTTP-команды, общаетесь с базой данных, записываете в файловые системы), то ваш код, скорее всего, не привязан к процессору и не сильно пострадает. Фактически, при написании тестов HTTP / HTTPS я обнаружил, что используемая здесь потоковая модель имеет меньшие накладные расходы и задержки, поскольку накладные расходы на создание новых процессов намного выше, чем накладные расходы на создание новых потоков, и в противном случае программа просто ожидала HTTP-ответов.
Итак, если вы обрабатываете массу данных в пользовательском пространстве python, это может быть не лучшим методом.
Ответ 4
Как говорили другие, multiprocessing может передавать объекты Python рабочим процессам только те, которые могут быть обработаны. Если вы не можете реорганизовать свой код, как описано unutbu, вы можете использовать dill расширенные возможности pickling / распаковки для передачи данных (особенно данных кода), как я показываю ниже.
Это решение требует только установки dill и никаких других библиотек, поскольку pathos: