Вопрос-Ответ

What is the proper way to make downstream Https requests inside of Uvicorn/FastAPI?

Как правильно отправлять нисходящие Https-запросы внутри Uvicorn / FastAPI?

У меня есть конечная точка API (FastAPI / Uvicorn). Помимо прочего, он запрашивает информацию у еще одного API. Когда я загружаю свой API несколькими одновременными запросами, я начинаю получать следующую ошибку:

h31._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE

В обычной среде я бы воспользовался request.session, но я понимаю, что это не полностью потокобезопасно.

Таким образом, каков правильный подход к использованию запросов в рамках такой платформы, как FastAPI, где несколько потоков будут использовать requests библиотеку одновременно?

Переведено автоматически
Ответ 1

Вместо использования requests вы могли бы использовать httpx, который также предлагает async API (httpx также предлагается в документации FastAPI при выполнении async тестов, а также FastAPI / Starlette недавно заменили HTTP-клиент на TestClient с requests на httpx).

Приведенный ниже пример основан на примере, приведенном в httpx документации, демонстрирующем, как использовать библиотеку для выполнения асинхронного HTTP (s) запроса и последующей потоковой передачи ответа обратно клиенту. httpx.AsyncClient() — это то, что вы можете использовать вместо requests.Session(), что полезно, когда к одному хосту выполняется несколько запросов, поскольку базовое TCP-соединение будет использоваться повторно, вместо воссоздания одного для каждого отдельного запроса - следовательно, что приведет к значительному повышению производительности. Кроме того, это позволяет повторно использовать headers и другие настройки (такие как proxies и timeout), а также сохранять cookies между запросами. Вы создаете Client и повторно используете его каждый раз, когда это вам нужно. Вы можете использовать await client.aclose() для явного закрытия клиента, как только закончите с ним (вы могли бы сделать это внутри обработчика shutdown события). Примеры и более подробную информацию также можно найти в этом ответе.

Пример

from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx


app = FastAPI()


@app.on_event("startup")
async def startup_event():
app.state.client = httpx.AsyncClient()


@app.on_event('shutdown')
async def shutdown_event():
await app.state.client.aclose()


@app.get('/')
async def home():
client = app.state.client
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))

Пример (обновлен)

Поскольку startup и shutdown теперь устарели (и могут быть полностью удалены в будущем), вы могли бы вместо этого использовать lifespan обработчик для инициализации httpx клиента, а также закрыть экземпляр клиента при завершении работы, аналогично тому, что было продемонстрировано в этом ответе. Starlette специально предоставляет пример использования lifespan обработчика и httpx клиента на своей странице документации. Как описано в документации Starlette:


В lifespan есть концепция state, которая представляет собой словарь, который можно использовать для совместного использования объектов между временем жизни и запросами.


state, получаемое при запросах, представляет собой поверхностную копию состояния, полученного обработчиком продолжительности жизни.


Следовательно, к объектам, добавленным в состояние в обработчике продолжительности жизни, можно получить доступ внутри конечных точек с помощью request.state. В приведенном ниже примере используется потоковый ответ как для связи с внешним сервером, так и для отправки ответа обратно клиенту. Смотрите Здесь для получения более подробной информации о async методах потоковой передачи ответов в httpx (т.е., aiter_bytes(), aiter_text(), aiter_lines() и т.д.).

Если вы хотите использовать a media_type для StreamingResponse, вы могли бы использовать a из исходного ответа следующим образом: media_type=r.headers['content-type']. Однако, как описано в этом ответе, вам нужно убедиться, что для media_type не установлено значение text/plain; в противном случае содержимое не будет транслироваться в браузере ожидаемым образом, если вы не отключите MIME-анализ (посмотрите связанный ответ для получения более подробной информации и решений).

from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx


@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown


app = FastAPI(lifespan=lifespan)


@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))

Если по какой-либо причине вам нужно читать содержимое по частям на стороне сервера, прежде чем отвечать клиенту, вы можете сделать это следующим образом:

@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)

async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()

return StreamingResponse(gen())

Если вы не хотите использовать потоковый ответ, а скорее должны httpx прочитать ответ для вас в первую очередь (что сохранит данные ответа в оперативной памяти сервера; следовательно, вы должны убедиться, что доступно достаточно места для размещения данных), вы могли бы использовать следующее. Обратите внимание, что использование r.json() должно применяться только к случаям, когда данные ответа представлены в формате JSON; в противном случае вы могли бы вернуть PlainTextResponse или пользовательский Response запрос напрямую, как показано ниже.

from fastapi import Response
from fastapi.responses import PlainTextResponse

@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')

if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)

Использование async API httpx означало бы, что вы должны определять свои конечные точки с помощью async def; в противном случае вам пришлось бы использовать стандартный синхронный API (для def vs async def см. Этот ответ), и как описано в этом обсуждении на github:


Да. HTTPX предполагается, что он потокобезопасен, и да, один клиентский экземпляр во всех потоках будет работать лучше с точки зрения пула подключений, чем использование экземпляра для каждого потока.


Вы также можете управлять размером пула соединений, используя limits аргумент ключевого слова в Client (см. Конфигурацию ограничения пула). Например:

limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)
python python-requests