Вопрос-Ответ

How do I parallelize a simple Python loop?

Как мне распараллелить простой цикл Python?

Вероятно, это тривиальный вопрос, но как мне распараллелить следующий цикл в python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)

# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)

Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собирать" результаты.

Также подойдет несколько процессов - все, что проще всего для данного случая. В настоящее время я использую Linux, но код должен выполняться также на Windows и Mac.

Какой самый простой способ распараллелить этот код?

Переведено автоматически
Ответ 1

В настоящее время реализация CPython имеет глобальную блокировку интерпретатора (GIL), которая предотвращает одновременное выполнение кода Python потоками одного и того же интерпретатора. Это означает, что потоки CPython полезны для параллельных рабочих нагрузок, связанных с вводом-выводом, но обычно не для рабочих нагрузок, связанных с процессором. Название calc_stuff() указывает, что ваша рабочая нагрузка привязана к процессору, поэтому вы хотите использовать здесь несколько процессов (что в любом случае часто является лучшим решением для рабочих нагрузок, привязанных к процессору, независимо от GIL).

Есть два простых способа создания пула процессов в стандартной библиотеке Python. Первый - это multiprocessing модуль, который можно использовать следующим образом:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Обратите внимание, что это не будет работать в интерактивном интерпретаторе из-за способа реализации multiprocessing.

Второй способ создания пула процессов - это concurrent.futures.ProcessPoolExecutor:

with concurrent.futures.ProcessPoolExecutor() as pool:
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Здесь используется multiprocessing модуль под капотом, поэтому он ведет себя идентично первой версии.

Ответ 2
from joblib import Parallel, delayed
def process(i):
return i * i

results = Parallel(n_jobs=2)(delayed(process)(i) for i in range(10))
print(results) # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Вышесказанное прекрасно работает на моем компьютере (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).

Взято из https://blog.dominodatalab.com/simple-parallelization /


Редактировать 31 марта 2021 г.: На joblib, multiprocessing, threading и asyncio


  • joblib в приведенном выше коде используется import multiprocessing under the cood (и, следовательно, несколько процессов, что обычно является лучшим способом выполнения работы процессора между ядрами - из-за GIL)

  • Вы можете позволить joblib использовать несколько потоков вместо нескольких процессов, но это (или использование import threading напрямую) выгодно только в том случае, если потоки тратят значительное время на ввод-вывод (например, чтение / запись на диск, отправка HTTP-запроса). Для работы с вводом-выводом GIL не блокирует выполнение другого потока

  • Начиная с Python 3.7, в качестве альтернативы threading, вы можете распараллелить работу с asyncio, но применим тот же совет, что и для import threading (хотя, в отличие от последнего, будет использоваться только 1 поток; с положительной стороны, asyncio имеет множество приятных функций, полезных для асинхронного программирования)

  • Использование нескольких процессов сопряжено с накладными расходами. Подумайте об этом: как правило, каждый процесс должен инициализировать / загрузить все необходимое для выполнения ваших вычислений. Вам нужно проверить себя, увеличивает ли приведенный выше фрагмент кода ваше время работы. Вот еще один, для которого я подтвердил, что он joblib дает лучшие результаты:

import time
from joblib import Parallel, delayed

def countdown(n):
while n>0:
n -= 1
return n


t = time.time()
for _ in range(20):
print(countdown(10**7), end=" ")
print(time.time() - t)
# takes ~10.5 seconds on medium sized Macbook Pro


t = time.time()
results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ in range(20))
print(results)
print(time.time() - t)
# takes ~6.3 seconds on medium sized Macbook Pro
Ответ 3

Это самый простой способ сделать это!

Вы можете использовать asyncio. (Документацию можно найти здесь). Он используется в качестве основы для множества асинхронных фреймворков Python, которые предоставляют высокопроизводительные сетевые и веб-серверы, библиотеки подключений к базе данных, распределенные очереди задач и т.д. Кроме того, у него есть как высокоуровневые, так и низкоуровневые API для решения любых задач.

import asyncio

def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

return wrapped

@background
def your_function(argument):
#code

Теперь эта функция будет выполняться параллельно при каждом вызове, не переводя основную программу в состояние ожидания. Вы также можете использовать ее для распараллеливания цикла for. При вызове цикла for цикл, хотя и является последовательным, но каждая итерация выполняется параллельно основной программе, как только интерпретатор попадает туда.

1. Запуск цикла параллельно основному потоку без какого-либо ожидания

введите описание изображения здесь

@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))


for i in range(10):
your_function(i)


print('loop finished')

Это приводит к следующему результату:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

Обновление: май 2022

Хотя это отвечает на первоначальный вопрос, есть способы, с помощью которых мы можем дождаться завершения циклов, как того требуют комментарии, за которые проголосовали. Поэтому добавим их и сюда. Ключами к реализациям являются: asyncio.gather() & run_until_complete(). Рассмотрим следующие функции:

import asyncio
import time

def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

return wrapped

@background
def your_function(argument, other_argument): # Added another argument
time.sleep(5)
print(f"function finished for {argument=} and {other_argument=}")

def code_to_run_before():
print('This runs Before Loop!')

def code_to_run_after():
print('This runs After Loop!')

2. Запускайте параллельно, но дождитесь завершения

введите описание изображения здесь

code_to_run_before()                                                         # Anything you want to run before, run here!

loop = asyncio.get_event_loop() # Have a new event loop

looper = asyncio.gather(*[your_function(i, 1) for i in range(1, 5)]) # Run the loop

results = loop.run_until_complete(looper) # Wait until finish

code_to_run_after() # Anything you want to run after, run here!

Это приводит к следующему результату:

This runs Before Loop!
function finished for argument=2 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=1 and other_argument=1
function finished for argument=4 and other_argument=1
This runs After Loop!

3. Запустите несколько циклов параллельно и дождитесь завершения

введите описание изображения здесь

code_to_run_before()                                                         # Anything you want to run before, run here!   

loop = asyncio.get_event_loop() # Have a new event loop

group1 = asyncio.gather(*[your_function(i, 1) for i in range(1, 2)]) # Run all the loops you want
group2 = asyncio.gather(*[your_function(i, 2) for i in range(3, 5)]) # Run all the loops you want
group3 = asyncio.gather(*[your_function(i, 3) for i in range(6, 9)]) # Run all the loops you want

all_groups = asyncio.gather(group1, group2, group3) # Gather them all
results = loop.run_until_complete(all_groups) # Wait until finish

code_to_run_after() # Anything you want to run after, run here!

Это приводит к следующему результату:

This runs Before Loop!
function finished for argument=3 and other_argument=2
function finished for argument=1 and other_argument=1
function finished for argument=6 and other_argument=3
function finished for argument=4 and other_argument=2
function finished for argument=7 and other_argument=3
function finished for argument=8 and other_argument=3
This runs After Loop!

4. Циклы выполняются последовательно, но итерации каждого цикла выполняются параллельно друг другу

введите описание изображения здесь

code_to_run_before()                                                               # Anything you want to run before, run here!

for loop_number in range(3):

loop = asyncio.get_event_loop() # Have a new event loop

looper = asyncio.gather(*[your_function(i, loop_number) for i in range(1, 5)]) # Run the loop

results = loop.run_until_complete(looper) # Wait until finish

print(f"finished for {loop_number=}")

code_to_run_after() # Anything you want to run after, run here!

Это приводит к следующему результату:

This runs Before Loop!
function finished for argument=3 and other_argument=0
function finished for argument=4 and other_argument=0
function finished for argument=1 and other_argument=0
function finished for argument=2 and other_argument=0
finished for loop_number=0
function finished for argument=4 and other_argument=1
function finished for argument=3 and other_argument=1
function finished for argument=2 and other_argument=1
function finished for argument=1 and other_argument=1
finished for loop_number=1
function finished for argument=1 and other_argument=2
function finished for argument=4 and other_argument=2
function finished for argument=3 and other_argument=2
function finished for argument=2 and other_argument=2
finished for loop_number=2
This runs After Loop!

Обновление: июнь 2022

Это в его текущем виде может не выполняться в некоторых версиях jupyter notebook. Причина в том, что jupyter notebook использует цикл событий. Чтобы заставить его работать с такими версиями jupyter, nest_asyncio (который будет включать цикл событий, как видно из названия) - это правильный путь. Просто импортируйте и примените его в верхней части ячейки как:

import nest_asyncio
nest_asyncio.apply()

И вся функциональность, описанная выше, также должна быть доступна в среде notebook.

Ответ 4

Для распараллеливания простого цикла for, joblib приносит большую пользу исходному использованию многопроцессорности. Не только краткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов), или захват обратной трассировки дочернего процесса для улучшения отчетов об ошибках.

Отказ от ответственности: я являюсь оригинальным автором joblib.

2024-03-10 17:13 python