What is the proper way to make downstream Https requests inside of Uvicorn/FastAPI?
Как правильно отправлять нисходящие Https-запросы внутри Uvicorn / FastAPI?
У меня есть конечная точка API (FastAPI / Uvicorn). Помимо прочего, он запрашивает информацию у еще одного API. Когда я загружаю свой API несколькими одновременными запросами, я начинаю получать следующую ошибку:
h31._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
В обычной среде я бы воспользовался request.session, но я понимаю, что это не полностью потокобезопасно.
Таким образом, каков правильный подход к использованию запросов в рамках такой платформы, как FastAPI, где несколько потоков будут использовать requests библиотеку одновременно?
Приведенный ниже пример основан на примере, приведенном в httpx документации, демонстрирующем, как использовать библиотеку для выполнения асинхронного HTTP (s) запроса и последующей потоковой передачи ответа обратно клиенту. httpx.AsyncClient() — это то, что вы можете использовать вместо requests.Session(), что полезно, когда к одному хосту выполняется несколько запросов, поскольку базовое TCP-соединение будет использоваться повторно, вместо воссоздания одного для каждого отдельного запроса - следовательно, что приведет к значительному повышению производительности. Кроме того, это позволяет повторно использовать headers и другие настройки (такие как proxies и timeout), а также сохранять cookies между запросами. Вы создаете Client и повторно используете его каждый раз, когда это вам нужно. Вы можете использовать await client.aclose() для явного закрытия клиента, как только закончите с ним (вы могли бы сделать это внутри обработчика shutdown события). Примеры и более подробную информацию также можно найти в этом ответе.
Пример
from fastapi import FastAPI from starlette.background import BackgroundTask from fastapi.responses import StreamingResponse import httpx
Поскольку startup и shutdown теперь устарели (и могут быть полностью удалены в будущем), вы могли бы вместо этого использовать lifespan обработчик для инициализации httpx клиента, а также закрыть экземпляр клиента при завершении работы, аналогично тому, что было продемонстрировано в этом ответе. Starlette специально предоставляет пример использования lifespan обработчика и httpx клиента на своей странице документации. Как описано в документации Starlette:
В lifespan есть концепция state, которая представляет собой словарь, который можно использовать для совместного использования объектов между временем жизни и запросами.
state, получаемое при запросах, представляет собой поверхностную копию состояния, полученного обработчиком продолжительности жизни.
Следовательно, к объектам, добавленным в состояние в обработчике продолжительности жизни, можно получить доступ внутри конечных точек с помощью request.state. В приведенном ниже примере используется потоковый ответ как для связи с внешним сервером, так и для отправки ответа обратно клиенту. Смотрите Здесь для получения более подробной информации о async методах потоковой передачи ответов в httpx (т.е., aiter_bytes(), aiter_text(), aiter_lines() и т.д.).
Если вы хотите использовать a media_type для StreamingResponse, вы могли бы использовать a из исходного ответа следующим образом: media_type=r.headers['content-type']. Однако, как описано в этом ответе, вам нужно убедиться, что для media_type не установлено значение text/plain; в противном случае содержимое не будет транслироваться в браузере ожидаемым образом, если вы не отключите MIME-анализ (посмотрите связанный ответ для получения более подробной информации и решений).
from fastapi import FastAPI, Request from contextlib import asynccontextmanager from fastapi.responses import StreamingResponse from starlette.background import BackgroundTask import httpx
@asynccontextmanager asyncdeflifespan(app: FastAPI): # Initialise the Client on startup and add it to the state asyncwith httpx.AsyncClient() as client: yield {'client': client} # The Client closes on shutdown
Если по какой-либо причине вам нужно читать содержимое по частям на стороне сервера, прежде чем отвечать клиенту, вы можете сделать это следующим образом:
asyncdefgen(): asyncfor chunk in r.aiter_raw(): yield chunk await r.aclose()
return StreamingResponse(gen())
Если вы не хотите использовать потоковый ответ, а скорее должны httpx прочитать ответ для вас в первую очередь (что сохранит данные ответа в оперативной памяти сервера; следовательно, вы должны убедиться, что доступно достаточно места для размещения данных), вы могли бы использовать следующее. Обратите внимание, что использование r.json() должно применяться только к случаям, когда данные ответа представлены в формате JSON; в противном случае вы могли бы вернуть PlainTextResponse или пользовательский Response запрос напрямую, как показано ниже.
from fastapi import Response from fastapi.responses import PlainTextResponse
Использование async API httpx означало бы, что вы должны определять свои конечные точки с помощью async def; в противном случае вам пришлось бы использовать стандартный синхронный API (для def vs async def см. Этот ответ), и как описано в этом обсуждении на github:
Да. HTTPXпредполагается, что он потокобезопасен, и да, один клиентский экземпляр во всех потоках будет работать лучше с точки зрения пула подключений, чем использование экземпляра для каждого потока.
Вы также можете управлять размером пула соединений, используя limits аргумент ключевого слова в Client (см. Конфигурацию ограничения пула). Например: