FastAPI выполняет api-вызовы последовательно, а не параллельно
У меня есть следующий код:
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}
Если я запускаю свой код на localhost - например, http://localhost:8501/ping
- на разных вкладках одного и того же окна браузера, я получаю:
Hello
bye
Hello
bye
вместо:
Hello
Hello
bye
bye
Я читал об использовании httpx
, но все равно у меня не получается полноценного распараллеливания. В чем проблема?
Переведено автоматически
Ответ 1
Согласно документации FastAPI:
Когда вы объявляете функцию path operation с помощью normal
def
вместоasync def
, она запускается во внешнем пуле потоков, который затемawait
редактируется, вместо того, чтобы вызываться напрямую (поскольку это заблокировало бы сервер).
также, как описано здесь:
Если вы используете стороннюю библиотеку, которая взаимодействует с чем-либо (базой данных, API, файловой системой и т.д.) И не поддерживает использование
await
, (в настоящее время это имеет место для большинства библиотек баз данных), то объявите, что ваша операция path функционирует как обычно, просто с помощьюdef
.Если вашему приложению (каким-то образом) не нужно ни с чем взаимодействовать и ждать ответа, используйте
async def
.Если вы просто не знаете, используйте обычный
def
.Примечание: Вы можете смешивать
def
иasync def
в своих функциях path столько функций, сколько вам нужно, и определять каждую из них, используя наилучший для вас вариант. FastAPI сделает с ними все правильно.В любом случае, в любом из приведенных выше случаев FastAPI все равно будет работать асинхронно и будет чрезвычайно быстрым.
Но, выполнив описанные выше шаги, он сможет выполнить некоторую оптимизацию производительности.
Таким образом, def
конечные точки (в контексте асинхронного программирования функция, определенная с помощью just def
, называется синхронной функцией) в FastAPI выполняются в отдельном потоке из внешнего пула потоков, который затем await
редактируется, и, следовательно, FastAPI по-прежнему будет работать асинхронно. Другими словами, сервер будет обрабатывать запросы к таким конечным точкам одновременно. Принимая во внимание, что async def
конечные точки выполняются непосредственно в event loop
— который выполняется в основном (одиночном) потоке — то есть сервер также будет обрабатывать запросы к таким конечным точкам одновременно/асинхронно, пока существует await
вызов неблокирующих операций ввода-вывода внутри таких async def
конечных точек / маршрутов, таких как ожидание (1) отправки данных от клиента по сети, (2) считывания содержимого файла на диске, (3) завершение операции с базой данных и т.д. (посмотрите здесь). Однако, если конечная точка, определенная с помощью async def
не await
для некоторой внутренней сопрограммы, чтобы освободить время для выполнения других задач в event loop
(например, запросов к той же или другим конечным точкам, фоновых задач и т.д.), Каждый запрос к такой конечной точке должен быть полностью завершен (т. Е. Выйти из конечной точки), прежде чем возвращать управление обратно в event loop
и разрешать выполнение других задач. Другими словами, в таких случаях сервер будет обрабатывать запросы последовательно.
Обратите внимание, что та же концепция также применима к функциям, определенным с помощью normal, def
которые используются в качестве StreamingResponse
генераторов (см. StreamingResponse
Реализацию класса) или Background Tasks
(см. BackgroundTask
Реализацию класса и этот ответ), что означает, что FastAPI, за кулисами, также будет запускать такие функции в отдельном потоке из того же внешнего пула потоков. Количество рабочих потоков этого внешнего пула потоков по умолчанию равно 40
и может быть скорректировано по мере необходимости — пожалуйста, взгляните на этот ответ о том, как это сделать. Следовательно, прочитав этот ответ до конца, вы должны быть в состоянии решить, следует ли вам определять конечную точку FastAPI или StreamingResponse
генератор или функцию фоновой задачи с помощью def
или async def
.
Ключевое слово await
(которое работает только внутри async def
функции) передает управление функцией обратно event loop
. Другими словами, он приостанавливает выполнение окружающей сопрограммы (т. Е. Объект сопрограммы является результатом вызова async def
функции) и сообщает event loop
разрешить выполнение какой-либо другой задачи, пока эта await
отредактированная задача не будет завершена. Обратите внимание, что только потому, что вы можете определить пользовательскую функцию с помощью async def
, а затем await
внутри вашей async def
конечной точки, это не означает, что ваш код будет работать асинхронно, если эта пользовательская функция содержит, например, вызовы time.sleep()
, задачи с привязкой к процессору, неасинхронные библиотеки ввода-вывода или любой другой блокирующий вызов, несовместимый с асинхронным кодом Python. В FastAPI, например, при использовании async
методов UploadFile
, таких как await file.read()
и await file.write()
, FastAPI/Starlette, за кулисами, фактически вызывает соответствующие синхронные файловые методы в отдельном потоке из внешнего пула потоков, описанного ранее (с использованием async
run_in_threadpool()
функции) и await
поддерживает его; в противном случае такие методы / операции блокировали бы event loop
— вы могли бы узнать больше, посмотрев на реализацию UploadFile
класса.
Обратите внимание, async
что означает не параллельный, а одновременный". Как упоминалось ранее, асинхронный код с async
и await
часто обобщается как использующий сопрограммы. Сопрограммы являются совместными (или совместными многозадачными), что означает, что "в любой момент времени программа с сопрограммами выполняет только одну из своих сопрограмм, и эта запущенная сопрограмма приостанавливает свое выполнение только тогда, когда она явно запрашивает приостановку" (смотрите Здесь и здесь для получения дополнительной информации о сопрограммах). Как описано в этой статье:
В частности, всякий раз, когда выполнение текущей сопрограммы достигает
await
выражения, сопрограмма может быть приостановлена, а другая ранее приостановленная сопрограмма может возобновить выполнение, если то, на чем она была приостановлена, с тех пор вернуло значение. Приостановка также может произойти, когдаasync for
блок запрашивает следующее значение у асинхронного итератора или когдаasync with
блок вводится или завершается, поскольку эти операции используютсяawait
под капотом.
Однако, если бы блокирующая операция ввода-вывода или операция, связанная с ЦП, была непосредственно выполнена / вызвана внутри async def
функции / конечной точки, это заблокировало бы цикл событий и, следовательно, основной поток (event loop
выполняется в главном потоке процесса / воркера). Следовательно, операция блокировки, такая как time.sleep()
в async def
конечной точке, заблокирует весь сервер (как в примере кода, приведенном в вашем вопросе). Таким образом, если ваша конечная точка не собирается выполнять никаких async
вызовов, вы могли бы объявить ее с помощью normal def
вместо этого, и в этом случае FastAPI запустил бы ее в отдельном потоке от внешнего пула потоков и await
it, как объяснялось ранее (дополнительные решения приведены в следующих разделах). Пример:
@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"
В противном случае, если функции, которые вы должны были выполнять внутри конечной точки, являются async
функциями, которые вы должны были await
выполнять, вы должны определить свою конечную точку с помощью async def
. Чтобы продемонстрировать это, в приведенном ниже примере используется asyncio.sleep()
функция (из asyncio
библиотеки), которая обеспечивает неблокирующую операцию ожидания. await asyncio.sleep()
Метод приостанавливает выполнение окружающей сопрограммы (до завершения операции ожидания), тем самым позволяя выполнять другие задачи в event loop
. Аналогичные примеры приведены здесь и здесь также.
import asyncio
@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
Обе указанные выше конечные точки будут выводить указанные сообщения на экран в том же порядке, что и указано в вашем вопросе — если два запроса поступили примерно в одно и то же время — то есть:
Hello
Hello
bye
bye
Важное примечание
При использовании браузера для вызова одной и той же конечной точки во второй (третий и т.д.) раз, пожалуйста, не забудьте сделать это из вкладки, изолированной от основного сеанса браузера; в противном случае последующие запросы (т. Е. идущие после первого) могут быть заблокированы браузером (на стороне клиента), поскольку браузер может ожидать ответа на предыдущий запрос от сервера перед отправкой следующего запроса. Это обычное поведение, по крайней мере, для браузера Chrome, из-за ожидания результата запроса и проверки, может ли результат быть кэширован, прежде чем снова запрашивать тот же ресурс.
Вы могли бы подтвердить это, используя print(request.client)
внутри конечной точки, где вы увидите, что hostname
и port
номер одинаковы для всех входящих запросов — в случае, если запросы были инициированы из вкладок, открытых в одном окне браузера / сеансе; в противном случае, port
номер обычно был бы разным для каждого запроса — и, следовательно, эти запросы обрабатывались бы сервером последовательно, поскольку браузер / клиент отправляет их последовательно в первую очередь. Чтобы преодолеть это, вы могли бы либо:
Перезагрузите ту же вкладку (которая запущена), или
Откройте новую вкладку в окне инкогнито или
Используйте другой браузер / клиент для отправки запроса, или
Используйте
httpx
библиотеку для выполнения асинхронных HTTP-запросов вместе с awaitableasyncio.gather()
, которая позволяет выполнять несколько асинхронных операций одновременно, а затем возвращает список результатов в том же порядке, в каком ожидаемые файлы (задачи) были переданы этой функции (посмотрите на этот ответ для получения более подробной информации).Пример:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())В случае, если вам пришлось вызывать разные конечные точки, для обработки запроса которым может потребоваться разное время, и вы хотели бы распечатать ответ на стороне клиента, как только он будет возвращен с сервера — вместо ожидания, пока
asyncio.gather()
соберет результаты всех задач и распечатает их в том же порядке, в каком задачи были переданы вsend()
функцию — вы могли бы заменитьsend()
функцию из приведенного выше примера на показанную ниже:async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
Async
/await
и блокирует операции ввода-вывода, связанные с ЦП
Если вам требуется определить конечную точку FastAPI (или StreamingResponse
генератор s, или функцию фоновой задачи) с помощью async def
(как вам может понадобиться await
для некоторых сопрограмм внутри него), но также есть некоторые синхронные блокирующие операции ввода-вывода или ЦП (трудоемкая задача), которые блокируют event loop
(по сути, весь сервер) и не пропускают другие запросы, например:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this would block the event loop
finally:
await file.close()
print("bye")
return "pong"
затем:
Вам следует проверить, могли бы вы изменить определение вашей конечной точки на normal
def
вместоasync def
. Например, если единственный метод в вашей конечной точке, который должен ожидаться, - это метод чтения содержимого файла (как вы упомянули в разделе комментариев ниже), вы могли бы вместо этого объявить тип параметра конечной точки какbytes
(т.Е.file: bytes = File()
) и, таким образом, FastAPI прочитает файл за вас, и вы получите содержимое какbytes
. Следовательно, не было бы необходимости использоватьawait file.read()
. Пожалуйста, обратите внимание, что описанный выше подход должен работать для небольших файлов, поскольку содержимое файла enitre будет храниться в памяти (см. Документацию поFile
параметрам); и, следовательно, если в вашей системе недостаточно оперативной памяти для размещения накопленных данных (например, если у вас 8 ГБ оперативной памяти, вы не сможете загрузить файл объемом 50 ГБ), ваше приложение может в конечном итоге завершиться сбоем. В качестве альтернативы, вы могли бы вызвать.read()
методSpooledTemporaryFile
напрямую (к которому можно получить доступ через.file
атрибутUploadFile
объекта), так что вам снова не нужно использоватьawait
.read()
метод — и поскольку теперь вы можете объявить свою конечную точку обычным способомdef
, каждый запрос будет выполняться в отдельном потоке (пример приведен ниже). Для получения более подробной информации о том, как загрузитьFile
а также то, как Starlette / FastAPI используетSpooledTemporaryFile
за кулисами, пожалуйста, взгляните на этот ответ и этот ответ.@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"Используйте
run_in_threadpool()
функцию Starlette от FastAPI изconcurrency
модуля — как предложил @tiangolo здесь — который "запустит функцию в отдельном потоке, чтобы гарантировать, что основной поток (где выполняются сопрограммы) не будет заблокирован" (см. Здесь). Как описано @tiangolo здесь, "run_in_threadpool
являетсяawait
работоспособной функцией; первый параметр является обычной функцией, следующие параметры передаются этой функции напрямую. Он поддерживает как аргументы последовательности, так и аргументы ключевого слова".from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)В качестве альтернативы, используйте
asyncio
'sloop.run_in_executor()
— после получения runningevent loop
используяasyncio.get_running_loop()
— для запуска задачи, которую, в данном случае, вы можетеawait
завершить и вернуть результат (ы), прежде чем переходить к следующей строке кода. При передачеNone
аргументу executor будет использоваться executor по умолчанию; который являетсяThreadPoolExecutor
:import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)или, если вы хотите передавать аргументы ключевого слова вместо этого, вы могли бы использовать
lambda
выражение (например,lambda: cpu_bound_task(some_arg=contents)
), или, предпочтительно,functools.partial()
, что специально рекомендуется в документации дляloop.run_in_executor()
:import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))В Python 3.9+ вы также могли бы использовать
asyncio.to_thread()
для асинхронного запуска синхронной функции в отдельном потоке, который, по сути, используетawait loop.run_in_executor(None, func_call)
under the cood , как можно видеть в реализацииasyncio.to_thread()
.to_thread()
Функция принимает имя блокирующей функции для выполнения, а также любые аргументы (*args
и / или**kwargs
) функции, а затем возвращает сопрограмму, которая может бытьawait
отредактирована. Пример:import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)Обратите внимание, что, как объяснено в этом ответе, передача
None
вexecutor
аргумент не создает новыйThreadPoolExecutor
при каждом вызовеawait loop.run_in_executor(None, ...)
, а вместо этого повторно использует исполнитель по умолчанию с количеством рабочих потоков по умолчанию (т.Е.min(32, os.cpu_count() + 4)
). Таким образом, в зависимости от требований вашего приложения, это число может быть довольно низким. В этом случае вам лучше использовать пользовательскийThreadPoolExecutor
. Например:import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)Я бы настоятельно рекомендовал взглянуть на связанный ответ выше, чтобы узнать о разнице между использованием
run_in_threadpool()
иrun_in_executor()
, а также о том, как создать пользовательский интерфейс, который можно использовать повторноThreadPoolExecutor
при запуске приложения, и отрегулировать максимальное количество рабочих потоков по мере необходимости.ThreadPoolExecutor
успешно предотвратит блокировкуevent loop
, но не даст вам того повышения производительности, которого вы ожидали бы от параллельного выполнения кода; особенно, когда нужно выполнятьCPU-bound
задачи, подобные тем, которые описаны здесь (например, обработка аудио или изображений, машинное обучение и так далее). Таким образом, предпочтительнее запускать задачи, связанные с ЦП, в отдельном процессе- используяProcessPoolExecutor
, как показано ниже, — с которым, опять же, вы можете интегрироватьсяasyncio
, чтобыawait
он завершил свою работу и вернул результат (ы). Как описано здесь, важно защитить точку входа в программу, чтобы избежать рекурсивного порождения подпроцессов и т.д. В принципе, ваш код должен находиться подif __name__ == '__main__'
.import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)Опять же, я бы посоветовал взглянуть на связанный ранее ответ о том, как создать многоразовый
ProcessPoolExecutor
при запуске приложения. Вы также можете найти этот ответ полезным.Используйте больше воркеров, чтобы воспользоваться преимуществами многоядерных процессоров, чтобы запускать несколько процессов параллельно и иметь возможность обслуживать больше запросов. Например,
uvicorn main:app --workers 4
(если вы используете Gunicorn в качестве менеджера процессов с Uvicorn workers, пожалуйста, взгляните на этот ответ). При использовании 1 воркера запускается только один процесс. При использовании нескольких воркеров это приведет к появлению нескольких процессов (все однопоточные). Каждый процесс имеет отдельную глобальную блокировку интерпретатора (GIL), а также свою собственнуюevent loop
, которая запускается в главном потоке каждого процесса и выполняет все задачи в своем потоке. Это означает, что есть только один поток, который может блокировать интерпретатор каждого процесса; если, конечно, вы не используете дополнительные потоки, либо снаружи, либо внутриevent loop
, например, при использованииThreadPoolExecutor
сloop.run_in_executor
, или определении конечных точек / фоновых задач /StreamingResponse
генераторов с помощью обычныхdef
вместоasync def
, а также при вызовеUploadFile
методов (подробнее см. Первые два абзаца этого ответа).Примечание: У каждого воркера "есть свои вещи, переменные и память". Это означает, что
global
переменные / объекты и т.д. Не будут совместно использоваться процессами / рабочими элементами. В этом случае вам следует рассмотреть возможность использования хранилища базы данных или хранилищ ключ-значение (кэшей), как описано здесь и здесь. Кроме того, обратите внимание, что "если вы потребляете большой объем памяти в своем коде, каждый процесс будет потреблять эквивалентный объем памяти".Если вам нужно выполнить тяжелые фоновые вычисления и вам не обязательно, чтобы они выполнялись одним и тем же процессом (например, вам не нужно совместно использовать память, переменные и т.д.), вам может быть полезно использовать другие более мощные инструменты, такие как Celery, как описано в документации FastAPI.
Ответ 2
Q :
" ... В чем проблема? "
A :
В документации FastAPI явно указано, что фреймворк использует задачи в процессе (унаследованные от Starlette ).
Это само по себе означает, что все такие задачи конкурируют за получение (время от времени ) GIL-блокировки интерпретатора Python, которая фактически является глобальной блокировкой интерпретатора, терроризирующей МЬЮТЕКСЫ, которая фактически повторно [SERIAL]
заставляет любое количество потоков интерпретатора Python в процессе
работать как одно-и--работает-только-один-, в то время как-все-остальные-остаются-в-ожидании...
В мелкозернистом масштабе вы видите результат - если создание другого обработчика для второго (инициируемого вручную со второй вкладки FireFox ) поступающего http-запроса на самом деле занимает больше времени, чем переход в режим ожидания, результат чередования GIL-lock ~ 100 [ms]
циклический перебор временных квантов ( все-ожидание-можно-работать ~ 100 [ms]
перед каждым следующим раундом GIL-lock release-acquire-roulette выполняется ), внутренняя работа интерпретатора Python не показывает больше деталей, вы можете использовать больше деталей ( в зависимости от O / Тип или версия ) из здесь, чтобы увидеть больше встроенных LoD, подобных этому, внутри выполняемого асинхронно оформленного кода :
import time
import threading
from fastapi import FastAPI, Request
TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict
[TEST-ME] ...
"""
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }
И последнее, но не менее важное: не стесняйтесь читать больше обо всех других акулах, от которых может пострадать код на основе потоков ... или даже стать причиной ... за кулисами ...
Рекламный меморандум
Сочетание GIL-блокировки, пулов на основе потоков, асинхронных декораторов, блокировки и обработки событий - верное сочетание для устранения неопределенностей & HWY2HELL ; o)