Вопрос-Ответ

How to run functions in parallel?

Как запускать функции параллельно?

Я пытаюсь запустить несколько функций параллельно в Python.

У меня есть что-то вроде этого:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)

def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)

Я хочу вызвать func1 и func2 и запустить их одновременно. Функции не взаимодействуют друг с другом или на одном объекте. Прямо сейчас мне нужно дождаться завершения func1, прежде чем запустить func2. Как мне сделать что-то вроде приведенного ниже:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

Я хочу иметь возможность создавать оба каталога практически одновременно, потому что каждую минуту я подсчитываю, сколько файлов создается. Если каталога там нет, это сбивает мое время.

Переведено автоматически
Ответ 1

Вы могли бы использовать threading или multiprocessing.

Из-за особенностей CPython, threading вряд ли удастся достичь истинного параллелизма. По этой причине, как правило, лучше использовать multiprocessing.

Вот полный пример:

from multiprocessing import Process


def func1():
print("func1: starting")
for i in range(10000000):
pass

print("func1: finishing")


def func2():
print("func2: starting")
for i in range(10000000):
pass

print("func2: finishing")


if __name__ == "__main__":
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()

Механика запуска / присоединения дочерних процессов может быть легко инкапсулирована в функцию в соответствии с вашими runBothFunc:

def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()

runInParallel(func1, func2)
Ответ 2

Если ваши функции в основном выполняют работу ввода-вывода (и меньше работают с процессором) и у вас Python 3.2+, вы можете использовать ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()

run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])

Если ваши функции в основном выполняют работу процессора (и меньше работы по вводу-выводу) и у вас Python 3.2+, вы можете использовать ProcessPoolExecutor:

from concurrent.futures import ProcessPoolExecutor

def run_cpu_tasks_in_parallel(tasks):
with ProcessPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()

def task_1():
print('CPU task 1 running!')

def task_2():
print('CPU task 2 running!')

if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])

В качестве альтернативы, если у вас есть только Python 2.6+, вы можете использовать многопроцессорный модуль напрямую:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()

def task_1():
print('CPU task 1 running!')

def task_2():
print('CPU task 2 running!')

if __name__ == '__main__':
run_cpu_tasks_in_parallel([
task_1,
task_2,
])
Ответ 3

Это можно элегантно сделать с помощью Ray, системы, которая позволяет вам легко распараллеливать и распространять ваш код на Python.

Чтобы распараллелить ваш пример, вам нужно будет определить свои функции с помощью @ray.remote декоратора, а затем вызвать их с помощью .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])

Если вы передаете один и тот же аргумент обеим функциям, и аргумент большой, более эффективный способ сделать это - использовать ray.put(). Это позволяет избежать двойной сериализации большого аргумента и создания двух его копий в памяти:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Важно - If func1() и func2() возвращают результаты, вам нужно переписать код следующим образом:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

Использование Ray имеет ряд преимуществ перед многопроцессорным модулем. В частности, один и тот же код будет выполняться как на одной машине, так и на кластере машин. Подробнее о преимуществах Ray читайте в этом соответствующем посте.

Ответ 4

Похоже, у вас есть единственная функция, которую вам нужно вызывать с двумя разными параметрами. Это можно элегантно сделать, используя комбинацию concurrent.futures и map с Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
time.sleep(seconds)
print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Теперь, если ваша операция связана с вводом-выводом, вы можете использовать ThreadPoolExecutor как таковой:

with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)

Обратите внимание, как map используется здесь, чтобы map привести вашу функцию к списку аргументов.

Теперь, если ваша функция привязана к процессору, вы можете использовать ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)

Если вы не уверены, вы можете просто попробовать оба варианта и посмотреть, какой из них дает вам лучшие результаты.

Наконец, если вы хотите распечатать свои результаты, вы можете просто сделать это:

with ThreadPoolExecutor() as executor:
results = executor.map(sleep_secs, secs_list)
for result in results:
print(result)
2024-01-01 15:58 python multithreading multiprocessing