Как мне распараллелить простой цикл Python?
Вероятно, это тривиальный вопрос, но как мне распараллелить следующий цикл в python?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собирать" результаты.
Также подойдет несколько процессов - все, что проще всего для данного случая. В настоящее время я использую Linux, но код должен выполняться также на Windows и Mac.
Какой самый простой способ распараллелить этот код?
Переведено автоматически
Ответ 1
В настоящее время реализация CPython имеет глобальную блокировку интерпретатора (GIL), которая предотвращает одновременное выполнение кода Python потоками одного и того же интерпретатора. Это означает, что потоки CPython полезны для параллельных рабочих нагрузок, связанных с вводом-выводом, но обычно не для рабочих нагрузок, связанных с процессором. Название calc_stuff()
указывает, что ваша рабочая нагрузка привязана к процессору, поэтому вы хотите использовать здесь несколько процессов (что в любом случае часто является лучшим решением для рабочих нагрузок, привязанных к процессору, независимо от GIL).
Есть два простых способа создания пула процессов в стандартной библиотеке Python. Первый - это multiprocessing
модуль, который можно использовать следующим образом:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Обратите внимание, что это не будет работать в интерактивном интерпретаторе из-за способа реализации multiprocessing
.
Второй способ создания пула процессов - это concurrent.futures.ProcessPoolExecutor
:
with concurrent.futures.ProcessPoolExecutor() as pool:
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Здесь используется multiprocessing
модуль под капотом, поэтому он ведет себя идентично первой версии.
Ответ 2
from joblib import Parallel, delayed
def process(i):
return i * i
results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results) # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Вышесказанное прекрасно работает на моем компьютере (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib
).
Взято из https://blog.dominodatalab.com/simple-parallelization /
Редактировать 31 марта 2021 г.: На joblib
, multiprocessing
, threading
и asyncio
joblib
в приведенном выше коде используетсяimport multiprocessing
under the cood (и, следовательно, несколько процессов, что обычно является лучшим способом выполнения работы процессора между ядрами - из-за GIL)- Вы можете позволить
joblib
использовать несколько потоков вместо нескольких процессов, но это (или использованиеimport threading
напрямую) выгодно только в том случае, если потоки тратят значительное время на ввод-вывод (например, чтение / запись на диск, отправка HTTP-запроса). Для работы с вводом-выводом GIL не блокирует выполнение другого потока - Начиная с Python 3.7, в качестве альтернативы
threading
, вы можете распараллелить работу с asyncio, но применим тот же совет, что и дляimport threading
(хотя, в отличие от последнего, будет использоваться только 1 поток; с положительной стороны,asyncio
имеет множество приятных функций, полезных для асинхронного программирования) - Использование нескольких процессов сопряжено с накладными расходами. Подумайте об этом: как правило, каждый процесс должен инициализировать / загрузить все необходимое для выполнения ваших вычислений. Вам нужно проверить себя, увеличивает ли приведенный выше фрагмент кода ваше время работы. Вот еще один, для которого я подтвердил, что он
joblib
дает лучшие результаты:
import time
from joblib import Parallel, delayed
def countdown(n):
while n>0:
n -= 1
return n
t = time.time()
for _ in range(20):
print(countdown(10**7), end=" ")
print(time.time() - t)
# takes ~10.5 seconds on medium sized Macbook Pro
t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro
Ответ 3
Это самый простой способ сделать это!
Вы можете использовать asyncio. (Документацию можно найти здесь). Он используется в качестве основы для множества асинхронных фреймворков Python, которые предоставляют высокопроизводительные сетевые и веб-серверы, библиотеки подключений к базе данных, распределенные очереди задач и т.д. Кроме того, у него есть как высокоуровневые, так и низкоуровневые API для решения любых задач.
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
Теперь эта функция будет выполняться параллельно при каждом вызове, не переводя основную программу в состояние ожидания. Вы также можете использовать ее для распараллеливания цикла for. При вызове цикла for цикл, хотя и является последовательным, но каждая итерация выполняется параллельно основной программе, как только интерпретатор попадает туда.
1. Запуск цикла параллельно основному потоку без какого-либо ожидания
@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))
for i in range(10):
your_function(i)
print('loop finished')
Это приводит к следующему результату:
loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
Обновление: май 2022
Хотя это отвечает на первоначальный вопрос, есть способы, с помощью которых мы можем дождаться завершения циклов, как того требуют комментарии, за которые проголосовали. Поэтому добавим их и сюда. Ключами к реализациям являются: asyncio.gather()
& run_until_complete()
. Рассмотрим следующие функции:
import asyncio
import time
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument, other_argument): # Added another argument
time.sleep(5)
print(f"function finished for {argument=} and {other_argument=}")
def code_to_run_before():
print('This runs Before Loop!')
def code_to_run_after():
print('This runs After Loop!')
2. Запускайте параллельно, но дождитесь завершения
code_to_run_before() # Anything you want to run before, run here!
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)]) # Run the loop
results = loop.run_until_complete(looper) # Wait until finish
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!
3. Запустите несколько циклов параллельно и дождитесь завершения
code_to_run_before() # Anything you want to run before, run here!
loop = asyncio.get_event_loop() # Have a new event loop
group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)]) # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)]) # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)]) # Run all the loops you want
all_groups = asyncio.gather(group1, group2, group3) # Gather them all
results = loop.run_until_complete(all_groups) # Wait until finish
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!
4. Циклы выполняются последовательно, но итерации каждого цикла выполняются параллельно друг другу
code_to_run_before() # Anything you want to run before, run here!
for loop_number in range(3):
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop
results = loop.run_until_complete(looper) # Wait until finish
print(f"finished for {loop_number=}")
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!
Обновление: июнь 2022
Это в его текущем виде может не выполняться в некоторых версиях jupyter notebook. Причина в том, что jupyter notebook использует цикл событий. Чтобы заставить его работать с такими версиями jupyter, nest_asyncio
(который будет включать цикл событий, как видно из названия) - это правильный путь. Просто импортируйте и примените его в верхней части ячейки как:
import nest_asyncio
nest_asyncio.apply()
И вся функциональность, описанная выше, также должна быть доступна в среде notebook.
Ответ 4
Для распараллеливания простого цикла for, joblib приносит большую пользу исходному использованию многопроцессорности. Не только краткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов), или захват обратной трассировки дочернего процесса для улучшения отчетов об ошибках.
Отказ от ответственности: я являюсь оригинальным автором joblib.