Как получить возвращаемое значение из потока?
Функция foo
ниже возвращает строку 'foo'
. Как я могу получить значение, 'foo'
которое возвращается из целевого объекта потока?
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
"Один очевидный способ сделать это", показанный выше, не работает: thread.join()
возвращается None
.
Переведено автоматически
Ответ 1
Один из способов, который я видел, - это передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или каким-либо другим идентификатором. Затем поток может сохранить свои результаты в выделенном слоте в этом объекте. Например:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
Если вы действительно хотите join()
вернуть возвращаемое значение вызываемой функции, вы можете сделать это с помощью Thread
подкласса, подобного следующему:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
Это становится немного запутанным из-за некоторого искажения имен, и он обращается к "частным" структурам данных, специфичным для Thread
реализации... но это работает.
Для Python 3:
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
Ответ 2
FWIW, у multiprocessing
модуля есть приятный интерфейс для этого с использованием Pool
класса. И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать multiprocessing.pool.ThreadPool
класс в качестве дополнительной замены.
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
Ответ 3
В Python 3.2+, stdlib concurrent.futures
модуль предоставляет более высокоуровневый API для threading
, включая передачу возвращаемых значений или исключений из рабочего потока обратно в основной поток:
import concurrent.futures
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
Ответ 4
Ответ Джейка хорош, но если вы не хотите использовать threadpool (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), то хорошим способом передачи информации между потоками является встроенная очередь.Класс Queue, поскольку он обеспечивает потокобезопасность.
Я создал следующий декоратор, чтобы заставить его действовать аналогично threadpool:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
Затем вы просто используете его как:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
Оформленная функция создает новый поток каждый раз, когда она вызывается, и возвращает объект Thread, содержащий очередь, которая получит результат.
Обновить
Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он по-прежнему получает просмотры, поэтому я подумал, что обновлю его, чтобы отразить способ, которым я делаю это в более новых версиях Python:
В concurrent.futures
модуль добавлен Python 3.2, который предоставляет высокоуровневый интерфейс для параллельных задач. Он предоставляет ThreadPoolExecutor
и ProcessPoolExecutor
, поэтому вы можете использовать поток или пул процессов с тем же api.
Одним из преимуществ этого api является то, что отправка задачи в Executor
возвращает Future
объект, который завершается возвращаемым значением вызываемого объекта, который вы отправляете.
Это делает ненужным прикрепление queue
объекта, что значительно упрощает работу декоратора:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
При этом будет использоваться модуль threadpool executor по умолчанию, если он не передан.
Использование очень похоже на предыдущее:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
Если вы используете Python 3.4+, одна действительно приятная особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращаемое future может быть обернуто, чтобы превратить его в asyncio.Future
с asyncio.wrap_future
. Это упрощает работу с сопрограммами:
result = await asyncio.wrap_future(long_task(10))
Если вам не нужен доступ к базовому concurrent.Future
объекту, вы можете включить перенос в декоратор:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
Затем, всякий раз, когда вам нужно удалить ресурсоемкий или блокирующий код из потока цикла событий, вы можете поместить его в оформленную функцию:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()