for j inrange(0, 10): # calc individual parameter value parameter = j * offset # call the calculation out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list output1.append(out1) output2.append(out2) output3.append(out3)
Я знаю, как запускать отдельные потоки в Python, но я не знаю, как "собирать" результаты.
Также подойдет несколько процессов - все, что проще всего для данного случая. В настоящее время я использую Linux, но код должен выполняться также на Windows и Mac.
Какой самый простой способ распараллелить этот код?
Переведено автоматически
Ответ 1
В настоящее время реализация CPython имеет глобальную блокировку интерпретатора (GIL), которая предотвращает одновременное выполнение кода Python потоками одного и того же интерпретатора. Это означает, что потоки CPython полезны для параллельных рабочих нагрузок, связанных с вводом-выводом, но обычно не для рабочих нагрузок, связанных с процессором. Название calc_stuff() указывает, что ваша рабочая нагрузка привязана к процессору, поэтому вы хотите использовать здесь несколько процессов (что в любом случае часто является лучшим решением для рабочих нагрузок, привязанных к процессору, независимо от GIL).
Есть два простых способа создания пула процессов в стандартной библиотеке Python. Первый - это multiprocessing модуль, который можно использовать следующим образом:
with concurrent.futures.ProcessPoolExecutor() as pool: out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
Здесь используется multiprocessing модуль под капотом, поэтому он ведет себя идентично первой версии.
Ответ 2
from joblib import Parallel, delayed defprocess(i): return i * i
results = Parallel(n_jobs=2)(delayed(process)(i) for i inrange(10)) print(results) # prints [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Вышесказанное прекрасно работает на моем компьютере (Ubuntu, пакет joblib был предварительно установлен, но может быть установлен через pip install joblib).
Редактировать 31 марта 2021 г.: На joblib, multiprocessing, threading и asyncio
joblib в приведенном выше коде используется import multiprocessing under the cood (и, следовательно, несколько процессов, что обычно является лучшим способом выполнения работы процессора между ядрами - из-за GIL)
Вы можете позволить joblib использовать несколько потоков вместо нескольких процессов, но это (или использование import threading напрямую) выгодно только в том случае, если потоки тратят значительное время на ввод-вывод (например, чтение / запись на диск, отправка HTTP-запроса). Для работы с вводом-выводом GIL не блокирует выполнение другого потока
Начиная с Python 3.7, в качестве альтернативы threading, вы можете распараллелить работу с asyncio, но применим тот же совет, что и для import threading (хотя, в отличие от последнего, будет использоваться только 1 поток; с положительной стороны, asyncio имеет множество приятных функций, полезных для асинхронного программирования)
Использование нескольких процессов сопряжено с накладными расходами. Подумайте об этом: как правило, каждый процесс должен инициализировать / загрузить все необходимое для выполнения ваших вычислений. Вам нужно проверить себя, увеличивает ли приведенный выше фрагмент кода ваше время работы. Вот еще один, для которого я подтвердил, что он joblib дает лучшие результаты:
import time from joblib import Parallel, delayed
defcountdown(n): while n>0: n -= 1 return n
t = time.time() for _ inrange(20): print(countdown(10**7), end=" ") print(time.time() - t) # takes ~10.5 seconds on medium sized Macbook Pro
t = time.time() results = Parallel(n_jobs=2)(delayed(countdown)(10**7) for _ inrange(20)) print(results) print(time.time() - t) # takes ~6.3 seconds on medium sized Macbook Pro
Ответ 3
Это самый простой способ сделать это!
Вы можете использовать asyncio. (Документацию можно найти здесь). Он используется в качестве основы для множества асинхронных фреймворков Python, которые предоставляют высокопроизводительные сетевые и веб-серверы, библиотеки подключений к базе данных, распределенные очереди задач и т.д. Кроме того, у него есть как высокоуровневые, так и низкоуровневые API для решения любых задач.
Теперь эта функция будет выполняться параллельно при каждом вызове, не переводя основную программу в состояние ожидания. Вы также можете использовать ее для распараллеливания цикла for. При вызове цикла for цикл, хотя и является последовательным, но каждая итерация выполняется параллельно основной программе, как только интерпретатор попадает туда.
1. Запуск цикла параллельно основному потоку без какого-либо ожидания
@background defyour_function(argument): time.sleep(5) print('function finished for '+str(argument))
for i inrange(10): your_function(i)
print('loop finished')
Это приводит к следующему результату:
loop finished function finished for4 function finished for8 function finished for0 function finished for3 function finished for6 function finished for2 function finished for5 function finished for7 function finished for9 function finished for1
Обновление: май 2022
Хотя это отвечает на первоначальный вопрос, есть способы, с помощью которых мы можем дождаться завершения циклов, как того требуют комментарии, за которые проголосовали. Поэтому добавим их и сюда. Ключами к реализациям являются: asyncio.gather() & run_until_complete(). Рассмотрим следующие функции:
@background defyour_function(argument, other_argument): # Added another argument time.sleep(5) print(f"function finished for {argument=} and {other_argument=}")
defcode_to_run_before(): print('This runs Before Loop!')
defcode_to_run_after(): print('This runs After Loop!')
2. Запускайте параллельно, но дождитесь завершения
code_to_run_before() # Anything you want to run before, run here!
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[your_function(i, 1) for i inrange(1, 5)]) # Run the loop
results = loop.run_until_complete(looper) # Wait until finish
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop! function finished for argument=2and other_argument=1 function finished for argument=3and other_argument=1 function finished for argument=1and other_argument=1 function finished for argument=4and other_argument=1 This runs After Loop!
3. Запустите несколько циклов параллельно и дождитесь завершения
code_to_run_before() # Anything you want to run before, run here!
loop = asyncio.get_event_loop() # Have a new event loop
group1 = asyncio.gather(*[your_function(i, 1) for i inrange(1, 2)]) # Run all the loops you want group2 = asyncio.gather(*[your_function(i, 2) for i inrange(3, 5)]) # Run all the loops you want group3 = asyncio.gather(*[your_function(i, 3) for i inrange(6, 9)]) # Run all the loops you want
all_groups = asyncio.gather(group1, group2, group3) # Gather them all results = loop.run_until_complete(all_groups) # Wait until finish
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop! function finished for argument=3and other_argument=2 function finished for argument=1and other_argument=1 function finished for argument=6and other_argument=3 function finished for argument=4and other_argument=2 function finished for argument=7and other_argument=3 function finished for argument=8and other_argument=3 This runs After Loop!
4. Циклы выполняются последовательно, но итерации каждого цикла выполняются параллельно друг другу
code_to_run_before() # Anything you want to run before, run here!
for loop_number inrange(3):
loop = asyncio.get_event_loop() # Have a new event loop
looper = asyncio.gather(*[your_function(i, loop_number) for i inrange(1, 5)]) # Run the loop
results = loop.run_until_complete(looper) # Wait until finish
print(f"finished for {loop_number=}")
code_to_run_after() # Anything you want to run after, run here!
Это приводит к следующему результату:
This runs Before Loop! function finished for argument=3and other_argument=0 function finished for argument=4and other_argument=0 function finished for argument=1and other_argument=0 function finished for argument=2and other_argument=0 finished for loop_number=0 function finished for argument=4and other_argument=1 function finished for argument=3and other_argument=1 function finished for argument=2and other_argument=1 function finished for argument=1and other_argument=1 finished for loop_number=1 function finished for argument=1and other_argument=2 function finished for argument=4and other_argument=2 function finished for argument=3and other_argument=2 function finished for argument=2and other_argument=2 finished for loop_number=2 This runs After Loop!
Обновление: июнь 2022
Это в его текущем виде может не выполняться в некоторых версиях jupyter notebook. Причина в том, что jupyter notebook использует цикл событий. Чтобы заставить его работать с такими версиями jupyter, nest_asyncio (который будет включать цикл событий, как видно из названия) - это правильный путь. Просто импортируйте и примените его в верхней части ячейки как:
import nest_asyncio nest_asyncio.apply()
И вся функциональность, описанная выше, также должна быть доступна в среде notebook.
Ответ 4
Для распараллеливания простого цикла for, joblib приносит большую пользу исходному использованию многопроцессорности. Не только краткий синтаксис, но и такие вещи, как прозрачное группирование итераций, когда они выполняются очень быстро (для устранения накладных расходов), или захват обратной трассировки дочернего процесса для улучшения отчетов об ошибках.
Отказ от ответственности: я являюсь оригинальным автором joblib.