Вопрос-Ответ

FastAPI runs api-calls in serial instead of parallel fashion

FastAPI выполняет api-вызовы последовательно, а не параллельно

У меня есть следующий код:

import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.get("/ping")
async def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return {"ping": "pong!"}

Если я запускаю свой код на localhost - например, http://localhost:8501/ping - на разных вкладках одного и того же окна браузера, я получаю:

Hello
bye
Hello
bye

вместо:

Hello
Hello
bye
bye

Я читал об использовании httpx, но все равно у меня не получается полноценного распараллеливания. В чем проблема?

Переведено автоматически
Ответ 1

Согласно документации FastAPI:


Когда вы объявляете функцию path operation с помощью normal def вместо async def, она запускается во внешнем пуле потоков, который затем awaitредактируется, вместо того, чтобы вызываться напрямую (поскольку это заблокировало бы сервер).


также, как описано здесь:


Если вы используете стороннюю библиотеку, которая взаимодействует с чем-либо (базой данных, API, файловой системой и т.д.) И не поддерживает использование await, (в настоящее время это имеет место для большинства библиотек баз данных), то объявите, что ваша операция path функционирует как обычно, просто с помощью def.


Если вашему приложению (каким-то образом) не нужно ни с чем взаимодействовать и ждать ответа, используйте async def.


Если вы просто не знаете, используйте обычный def.


Примечание: Вы можете смешивать def и async def в своих функциях path столько функций, сколько вам нужно, и определять каждую из них, используя наилучший для вас вариант. FastAPI сделает с ними все правильно.


В любом случае, в любом из приведенных выше случаев FastAPI все равно будет работать асинхронно и будет чрезвычайно быстрым.


Но, выполнив описанные выше шаги, он сможет выполнить некоторую оптимизацию производительности.


Таким образом, def конечные точки (в контексте асинхронного программирования функция, определенная с помощью just def, называется синхронной функцией) в FastAPI выполняются в отдельном потоке из внешнего пула потоков, который затем awaitредактируется, и, следовательно, FastAPI по-прежнему будет работать асинхронно. Другими словами, сервер будет обрабатывать запросы к таким конечным точкам одновременно. Принимая во внимание, что async def конечные точки выполняются непосредственно в event loop— который выполняется в основном (одиночном) потоке — то есть сервер также будет обрабатывать запросы к таким конечным точкам одновременно/асинхронно, пока существует await вызов неблокирующих операций ввода-вывода внутри таких async def конечных точек / маршрутов, таких как ожидание (1) отправки данных от клиента по сети, (2) считывания содержимого файла на диске, (3) завершение операции с базой данных и т.д. (посмотрите здесь). Однако, если конечная точка, определенная с помощью async def не await для некоторой внутренней сопрограммы, чтобы освободить время для выполнения других задач в event loop (например, запросов к той же или другим конечным точкам, фоновых задач и т.д.), Каждый запрос к такой конечной точке должен быть полностью завершен (т. Е. Выйти из конечной точки), прежде чем возвращать управление обратно в event loop и разрешать выполнение других задач. Другими словами, в таких случаях сервер будет обрабатывать запросы последовательно.

Обратите внимание, что та же концепция также применима к функциям, определенным с помощью normal, def которые используются в качестве StreamingResponseгенераторов (см. StreamingResponse Реализацию класса) или Background Tasks (см. BackgroundTask Реализацию класса и этот ответ), что означает, что FastAPI, за кулисами, также будет запускать такие функции в отдельном потоке из того же внешнего пула потоков. Количество рабочих потоков этого внешнего пула потоков по умолчанию равно 40 и может быть скорректировано по мере необходимости — пожалуйста, взгляните на этот ответ о том, как это сделать. Следовательно, прочитав этот ответ до конца, вы должны быть в состоянии решить, следует ли вам определять конечную точку FastAPI или StreamingResponseгенератор или функцию фоновой задачи с помощью def или async def.

Ключевое слово await (которое работает только внутри async def функции) передает управление функцией обратно event loop. Другими словами, он приостанавливает выполнение окружающей сопрограммы (т. Е. Объект сопрограммы является результатом вызова async def функции) и сообщает event loop разрешить выполнение какой-либо другой задачи, пока эта awaitотредактированная задача не будет завершена. Обратите внимание, что только потому, что вы можете определить пользовательскую функцию с помощью async def, а затем await внутри вашей async def конечной точки, это не означает, что ваш код будет работать асинхронно, если эта пользовательская функция содержит, например, вызовы time.sleep(), задачи с привязкой к процессору, неасинхронные библиотеки ввода-вывода или любой другой блокирующий вызов, несовместимый с асинхронным кодом Python. В FastAPI, например, при использовании async методов UploadFile, таких как await file.read() и await file.write(), FastAPI/Starlette, за кулисами, фактически вызывает соответствующие синхронные файловые методы в отдельном потоке из внешнего пула потоков, описанного ранее (с использованием async run_in_threadpool() функции) и awaitподдерживает его; в противном случае такие методы / операции блокировали бы event loop — вы могли бы узнать больше, посмотрев на реализацию UploadFile класса.

Обратите внимание, async что означает не параллельный, а одновременный". Как упоминалось ранее, асинхронный код с async и await часто обобщается как использующий сопрограммы. Сопрограммы являются совместными (или совместными многозадачными), что означает, что "в любой момент времени программа с сопрограммами выполняет только одну из своих сопрограмм, и эта запущенная сопрограмма приостанавливает свое выполнение только тогда, когда она явно запрашивает приостановку" (смотрите Здесь и здесь для получения дополнительной информации о сопрограммах). Как описано в этой статье:


В частности, всякий раз, когда выполнение текущей сопрограммы достигает await выражения, сопрограмма может быть приостановлена, а другая ранее приостановленная сопрограмма может возобновить выполнение, если то, на чем она была приостановлена, с тех пор вернуло значение. Приостановка также может произойти, когда async for блок запрашивает следующее значение у асинхронного итератора или когда async with блок вводится или завершается, поскольку эти операции используются await под капотом.


Однако, если бы блокирующая операция ввода-вывода или операция, связанная с ЦП, была непосредственно выполнена / вызвана внутри async def функции / конечной точки, это заблокировало бы цикл событий и, следовательно, основной поток (event loop выполняется в главном потоке процесса / воркера). Следовательно, операция блокировки, такая как time.sleep() в async def конечной точке, заблокирует весь сервер (как в примере кода, приведенном в вашем вопросе). Таким образом, если ваша конечная точка не собирается выполнять никаких async вызовов, вы могли бы объявить ее с помощью normal def вместо этого, и в этом случае FastAPI запустил бы ее в отдельном потоке от внешнего пула потоков и await it, как объяснялось ранее (дополнительные решения приведены в следующих разделах). Пример:

@app.get("/ping")
def ping(request: Request):
#print(request.client)
print("Hello")
time.sleep(5)
print("bye")
return "pong"

В противном случае, если функции, которые вы должны были выполнять внутри конечной точки, являются async функциями, которые вы должны были await выполнять, вы должны определить свою конечную точку с помощью async def. Чтобы продемонстрировать это, в приведенном ниже примере используется asyncio.sleep() функция (из asyncio библиотеки), которая обеспечивает неблокирующую операцию ожидания. await asyncio.sleep() Метод приостанавливает выполнение окружающей сопрограммы (до завершения операции ожидания), тем самым позволяя выполнять другие задачи в event loop. Аналогичные примеры приведены здесь и здесь также.

import asyncio

@app.get("/ping")
async def ping(request: Request):
#print(request.client)
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"

Обе указанные выше конечные точки будут выводить указанные сообщения на экран в том же порядке, что и указано в вашем вопросе — если два запроса поступили примерно в одно и то же время — то есть:

Hello
Hello
bye
bye

Важное примечание

При использовании браузера для вызова одной и той же конечной точки во второй (третий и т.д.) раз, пожалуйста, не забудьте сделать это из вкладки, изолированной от основного сеанса браузера; в противном случае последующие запросы (т. Е. идущие после первого) могут быть заблокированы браузером (на стороне клиента), поскольку браузер может ожидать ответа на предыдущий запрос от сервера перед отправкой следующего запроса. Это обычное поведение, по крайней мере, для браузера Chrome, из-за ожидания результата запроса и проверки, может ли результат быть кэширован, прежде чем снова запрашивать тот же ресурс.

Вы могли бы подтвердить это, используя print(request.client) внутри конечной точки, где вы увидите, что hostname и port номер одинаковы для всех входящих запросов — в случае, если запросы были инициированы из вкладок, открытых в одном окне браузера / сеансе; в противном случае, port номер обычно был бы разным для каждого запроса — и, следовательно, эти запросы обрабатывались бы сервером последовательно, поскольку браузер / клиент отправляет их последовательно в первую очередь. Чтобы преодолеть это, вы могли бы либо:


  1. Перезагрузите ту же вкладку (которая запущена), или



  2. Откройте новую вкладку в окне инкогнито или



  3. Используйте другой браузер / клиент для отправки запроса, или



  4. Используйте httpx библиотеку для выполнения асинхронных HTTP-запросов вместе с awaitable asyncio.gather(), которая позволяет выполнять несколько асинхронных операций одновременно, а затем возвращает список результатов в том же порядке, в каком ожидаемые файлы (задачи) были переданы этой функции (посмотрите на этот ответ для получения более подробной информации).


    Пример:


    import httpx
    import asyncio

    URLS = ['http://127.0.0.1:8000/ping'] * 2

    async def send(url, client):
    return await client.get(url, timeout=10)

    async def main():
    async with httpx.AsyncClient() as client:
    tasks = [send(url, client) for url in URLS]
    responses = await asyncio.gather(*tasks)
    print(*[r.json() for r in responses], sep='\n')

    asyncio.run(main())

    В случае, если вам пришлось вызывать разные конечные точки, для обработки запроса которым может потребоваться разное время, и вы хотели бы распечатать ответ на стороне клиента, как только он будет возвращен с сервера — вместо ожидания, пока asyncio.gather() соберет результаты всех задач и распечатает их в том же порядке, в каком задачи были переданы в send() функцию — вы могли бы заменить send() функцию из приведенного выше примера на показанную ниже:


    async def send(url, client):
    res = await client.get(url, timeout=10)
    print(res.json())
    return res


Async/await и блокирует операции ввода-вывода, связанные с ЦП

Если вам требуется определить конечную точку FastAPI (или StreamingResponseгенератор s, или функцию фоновой задачи) с помощью async def (как вам может понадобиться await для некоторых сопрограмм внутри него), но также есть некоторые синхронные блокирующие операции ввода-вывода или ЦП (трудоемкая задача), которые блокируют event loop (по сути, весь сервер) и не пропускают другие запросы, например:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents) # this would block the event loop
finally:
await file.close()
print("bye")
return "pong"

затем:


  1. Вам следует проверить, могли бы вы изменить определение вашей конечной точки на normal def вместо async def. Например, если единственный метод в вашей конечной точке, который должен ожидаться, - это метод чтения содержимого файла (как вы упомянули в разделе комментариев ниже), вы могли бы вместо этого объявить тип параметра конечной точки как bytes (т.Е. file: bytes = File()) и, таким образом, FastAPI прочитает файл за вас, и вы получите содержимое как bytes. Следовательно, не было бы необходимости использовать await file.read(). Пожалуйста, обратите внимание, что описанный выше подход должен работать для небольших файлов, поскольку содержимое файла enitre будет храниться в памяти (см. Документацию по File параметрам); и, следовательно, если в вашей системе недостаточно оперативной памяти для размещения накопленных данных (например, если у вас 8 ГБ оперативной памяти, вы не сможете загрузить файл объемом 50 ГБ), ваше приложение может в конечном итоге завершиться сбоем. В качестве альтернативы, вы могли бы вызвать .read() метод SpooledTemporaryFile напрямую (к которому можно получить доступ через .file атрибут UploadFile объекта), так что вам снова не нужно использовать await .read() метод — и поскольку теперь вы можете объявить свою конечную точку обычным способом def, каждый запрос будет выполняться в отдельном потоке (пример приведен ниже). Для получения более подробной информации о том, как загрузить File а также то, как Starlette / FastAPI использует SpooledTemporaryFile за кулисами, пожалуйста, взгляните на этот ответ и этот ответ.


    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
    contents = file.file.read()
    res = cpu_bound_task(contents)
    finally:
    file.file.close()
    print("bye")
    return "pong"


  2. Используйте run_in_threadpool() функцию Starlette от FastAPI из concurrency модуля — как предложил @tiangolo здесь — который "запустит функцию в отдельном потоке, чтобы гарантировать, что основной поток (где выполняются сопрограммы) не будет заблокирован" (см. Здесь). Как описано @tiangolo здесь, "run_in_threadpool является awaitработоспособной функцией; первый параметр является обычной функцией, следующие параметры передаются этой функции напрямую. Он поддерживает как аргументы последовательности, так и аргументы ключевого слова".


    from fastapi.concurrency import run_in_threadpool

    res = await run_in_threadpool(cpu_bound_task, contents)


  3. В качестве альтернативы, используйте asyncio's loop.run_in_executor() — после получения running event loop используя asyncio.get_running_loop() — для запуска задачи, которую, в данном случае, вы можете await завершить и вернуть результат (ы), прежде чем переходить к следующей строке кода. При передаче None аргументу executor будет использоваться executor по умолчанию; который является ThreadPoolExecutor:


    import asyncio

    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)

    или, если вы хотите передавать аргументы ключевого слова вместо этого, вы могли бы использовать lambda выражение (например, lambda: cpu_bound_task(some_arg=contents)), или, предпочтительно, functools.partial(), что специально рекомендуется в документации для loop.run_in_executor():


    import asyncio
    from functools import partial

    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))

    В Python 3.9+ вы также могли бы использовать asyncio.to_thread() для асинхронного запуска синхронной функции в отдельном потоке, который, по сути, использует await loop.run_in_executor(None, func_call) under the cood , как можно видеть в реализации asyncio.to_thread(). to_thread() Функция принимает имя блокирующей функции для выполнения, а также любые аргументы (*args и / или **kwargs) функции, а затем возвращает сопрограмму, которая может быть awaitотредактирована. Пример:


    import asyncio

    res = await asyncio.to_thread(cpu_bound_task, contents)

    Обратите внимание, что, как объяснено в этом ответе, передача None в executor аргумент не создает новый ThreadPoolExecutor при каждом вызове await loop.run_in_executor(None, ...), а вместо этого повторно использует исполнитель по умолчанию с количеством рабочих потоков по умолчанию (т.Е. min(32, os.cpu_count() + 4)). Таким образом, в зависимости от требований вашего приложения, это число может быть довольно низким. В этом случае вам лучше использовать пользовательский ThreadPoolExecutor. Например:


    import asyncio
    import concurrent.futures

    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, cpu_bound_task, contents)

    Я бы настоятельно рекомендовал взглянуть на связанный ответ выше, чтобы узнать о разнице между использованием run_in_threadpool() и run_in_executor(), а также о том, как создать пользовательский интерфейс, который можно использовать повторно ThreadPoolExecutor при запуске приложения, и отрегулировать максимальное количество рабочих потоков по мере необходимости.



  4. ThreadPoolExecutor успешно предотвратит блокировку event loop, но не даст вам того повышения производительности, которого вы ожидали бы от параллельного выполнения кода; особенно, когда нужно выполнять CPU-bound задачи, подобные тем, которые описаны здесь (например, обработка аудио или изображений, машинное обучение и так далее). Таким образом, предпочтительнее запускать задачи, связанные с ЦП, в отдельном процессе- используя ProcessPoolExecutor, как показано ниже, — с которым, опять же, вы можете интегрироваться asyncio, чтобы await он завершил свою работу и вернул результат (ы). Как описано здесь, важно защитить точку входа в программу, чтобы избежать рекурсивного порождения подпроцессов и т.д. В принципе, ваш код должен находиться под if __name__ == '__main__'.


    import concurrent.futures

    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
    res = await loop.run_in_executor(pool, cpu_bound_task, contents)

    Опять же, я бы посоветовал взглянуть на связанный ранее ответ о том, как создать многоразовый ProcessPoolExecutor при запуске приложения. Вы также можете найти этот ответ полезным.



  5. Используйте больше воркеров, чтобы воспользоваться преимуществами многоядерных процессоров, чтобы запускать несколько процессов параллельно и иметь возможность обслуживать больше запросов. Например, uvicorn main:app --workers 4 (если вы используете Gunicorn в качестве менеджера процессов с Uvicorn workers, пожалуйста, взгляните на этот ответ). При использовании 1 воркера запускается только один процесс. При использовании нескольких воркеров это приведет к появлению нескольких процессов (все однопоточные). Каждый процесс имеет отдельную глобальную блокировку интерпретатора (GIL), а также свою собственную event loop, которая запускается в главном потоке каждого процесса и выполняет все задачи в своем потоке. Это означает, что есть только один поток, который может блокировать интерпретатор каждого процесса; если, конечно, вы не используете дополнительные потоки, либо снаружи, либо внутри event loop, например, при использовании ThreadPoolExecutor с loop.run_in_executor, или определении конечных точек / фоновых задач / StreamingResponse генераторов с помощью обычных def вместо async def, а также при вызове UploadFile методов (подробнее см. Первые два абзаца этого ответа).


    Примечание: У каждого воркера "есть свои вещи, переменные и память". Это означает, что global переменные / объекты и т.д. Не будут совместно использоваться процессами / рабочими элементами. В этом случае вам следует рассмотреть возможность использования хранилища базы данных или хранилищ ключ-значение (кэшей), как описано здесь и здесь. Кроме того, обратите внимание, что "если вы потребляете большой объем памяти в своем коде, каждый процесс будет потреблять эквивалентный объем памяти".



  6. Если вам нужно выполнить тяжелые фоновые вычисления и вам не обязательно, чтобы они выполнялись одним и тем же процессом (например, вам не нужно совместно использовать память, переменные и т.д.), вам может быть полезно использовать другие более мощные инструменты, такие как Celery, как описано в документации FastAPI.



Ответ 2

Q :
" ... В чем проблема? "


A :
В документации FastAPI явно указано, что фреймворк использует задачи в процессе (унаследованные от Starlette ).

Это само по себе означает, что все такие задачи конкурируют за получение (время от времени ) GIL-блокировки интерпретатора Python, которая фактически является глобальной блокировкой интерпретатора, терроризирующей МЬЮТЕКСЫ, которая фактически повторно [SERIAL]заставляет любое количество потоков интерпретатора Python в процессе
работать как одно-и--работает-только-один-, в то время как-все-остальные-остаются-в-ожидании...

В мелкозернистом масштабе вы видите результат - если создание другого обработчика для второго (инициируемого вручную со второй вкладки FireFox ) поступающего http-запроса на самом деле занимает больше времени, чем переход в режим ожидания, результат чередования GIL-lock ~ 100 [ms] циклический перебор временных квантов ( все-ожидание-можно-работать ~ 100 [ms] перед каждым следующим раундом GIL-lock release-acquire-roulette выполняется ), внутренняя работа интерпретатора Python не показывает больше деталей, вы можете использовать больше деталей ( в зависимости от O / Тип или версия ) из здесь, чтобы увидеть больше встроенных LoD, подобных этому, внутри выполняемого асинхронно оформленного кода :

import time
import threading
from fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Python Interpreter __main__ was started ..."
)
...
@app.get("/ping")
async def ping( request: Request ):
""" __doc__
[DOC-ME]
ping( Request ): a mock-up AS-IS function to yield
a CLI/GUI self-evidence of the order-of-execution
RETURNS: a JSON-alike decorated dict

[TEST-ME] ...
"""

print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"Hello..."
)
#------------------------------------------------- actual blocking work
time.sleep( 5 )
#------------------------------------------------- actual blocking work
print( TEMPLATE.format( time.perf_counter_ns(),
threading.get_ident(),
"...bye"
)
return { "ping": "pong!" }

И последнее, но не менее важное: не стесняйтесь читать больше обо всех других акулах, от которых может пострадать код на основе потоков ... или даже стать причиной ... за кулисами ...

Рекламный меморандум

Сочетание GIL-блокировки, пулов на основе потоков, асинхронных декораторов, блокировки и обработки событий - верное сочетание для устранения неопределенностей & HWY2HELL ; o)

python