Вопрос-Ответ

How to get the return value from a thread?

Как получить возвращаемое значение из потока?

Функция foo ниже возвращает строку 'foo'. Как я могу получить значение, 'foo' которое возвращается из целевого объекта потока?

from threading import Thread

def foo(bar):
print('hello {}'.format(bar))
return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

"Один очевидный способ сделать это", показанный выше, не работает: thread.join() возвращается None.

Переведено автоматически
Ответ 1

Один из способов, который я видел, - это передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или каким-либо другим идентификатором. Затем поток может сохранить свои результаты в выделенном слоте в этом объекте. Например:

def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()

# do some other stuff

for i in range(len(threads)):
threads[i].join()

print " ".join(results) # what sound does a metasyntactic locomotive make?

Если вы действительно хотите join() вернуть возвращаемое значение вызываемой функции, вы можете сделать это с помощью Thread подкласса, подобного следующему:

from threading import Thread

def foo(bar):
print 'hello {0}'.format(bar)
return "foo"

class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None
):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join() # prints foo

Это становится немного запутанным из-за некоторого искажения имен, и он обращается к "частным" структурам данных, специфичным для Thread реализации... но это работает.

Для Python 3:

class ThreadWithReturnValue(Thread):

def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None
):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None

def run(self):
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
Ответ 2

FWIW, у multiprocessing модуля есть приятный интерфейс для этого с использованием Pool класса. И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool класс в качестве дополнительной замены.

def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get() # get the return value from your function.
Ответ 3

В Python 3.2+, stdlib concurrent.futures модуль предоставляет более высокоуровневый API для threading, включая передачу возвращаемых значений или исключений из рабочего потока обратно в основной поток:

import concurrent.futures

def foo(bar):
print('hello {}'.format(bar))
return 'foo'

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
Ответ 4

Ответ Джейка хорош, но если вы не хотите использовать threadpool (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), то хорошим способом передачи информации между потоками является встроенная очередь.Класс Queue, поскольку он обеспечивает потокобезопасность.

Я создал следующий декоратор, чтобы заставить его действовать аналогично threadpool:

def threaded(f, daemon=False):
import Queue

def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''

ret = f(*args, **kwargs)
q.put(ret)

def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''


q = Queue.Queue()

t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t

return wrap

Затем вы просто используете его как:

@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Оформленная функция создает новый поток каждый раз, когда она вызывается, и возвращает объект Thread, содержащий очередь, которая получит результат.

Обновить

Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он по-прежнему получает просмотры, поэтому я подумал, что обновлю его, чтобы отразить способ, которым я делаю это в более новых версиях Python:

В concurrent.futures модуль добавлен Python 3.2, который предоставляет высокоуровневый интерфейс для параллельных задач. Он предоставляет ThreadPoolExecutor и ProcessPoolExecutor, поэтому вы можете использовать поток или пул процессов с тем же api.

Одним из преимуществ этого api является то, что отправка задачи в Executor возвращает Future объект, который завершается возвращаемым значением вызываемого объекта, который вы отправляете.

Это делает ненужным прикрепление queue объекта, что значительно упрощает работу декоратора:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

return wrap

При этом будет использоваться модуль threadpool executor по умолчанию, если он не передан.

Использование очень похоже на предыдущее:

@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Если вы используете Python 3.4+, одна действительно приятная особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращаемое future может быть обернуто, чтобы превратить его в asyncio.Future с asyncio.wrap_future. Это упрощает работу с сопрограммами:

result = await asyncio.wrap_future(long_task(10))

Если вам не нужен доступ к базовому concurrent.Future объекту, вы можете включить перенос в декоратор:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

return wrap

Затем, всякий раз, когда вам нужно удалить ресурсоемкий или блокирующий код из потока цикла событий, вы можете поместить его в оформленную функцию:

@threadpool
def some_long_calculation():
...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
2023-06-20 17:07 python multithreading function