Как правильно отправлять нисходящие Https-запросы внутри Uvicorn / FastAPI?
У меня есть конечная точка API (FastAPI / Uvicorn). Помимо прочего, он запрашивает информацию у еще одного API. Когда я загружаю свой API несколькими одновременными запросами, я начинаю получать следующую ошибку:
h31._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
В обычной среде я бы воспользовался request.session
, но я понимаю, что это не полностью потокобезопасно.
Таким образом, каков правильный подход к использованию запросов в рамках такой платформы, как FastAPI, где несколько потоков будут использовать requests
библиотеку одновременно?
Переведено автоматически
Ответ 1
Вместо использования requests
вы могли бы использовать httpx
, который также предлагает async
API (httpx
также предлагается в документации FastAPI при выполнении async
тестов, а также FastAPI / Starlette недавно заменили HTTP-клиент на TestClient
с requests
на httpx
).
Приведенный ниже пример основан на примере, приведенном в httpx
документации, демонстрирующем, как использовать библиотеку для выполнения асинхронного HTTP (s) запроса и последующей потоковой передачи ответа обратно клиенту. httpx.AsyncClient()
— это то, что вы можете использовать вместо requests.Session()
, что полезно, когда к одному хосту выполняется несколько запросов, поскольку базовое TCP-соединение будет использоваться повторно, вместо воссоздания одного для каждого отдельного запроса - следовательно, что приведет к значительному повышению производительности. Кроме того, это позволяет повторно использовать headers
и другие настройки (такие как proxies
и timeout
), а также сохранять cookies
между запросами. Вы создаете Client
и повторно используете его каждый раз, когда это вам нужно. Вы можете использовать await client.aclose()
для явного закрытия клиента, как только закончите с ним (вы могли бы сделать это внутри обработчика shutdown
события). Примеры и более подробную информацию также можно найти в этом ответе.
Пример
from fastapi import FastAPI
from starlette.background import BackgroundTask
from fastapi.responses import StreamingResponse
import httpx
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.client = httpx.AsyncClient()
@app.on_event('shutdown')
async def shutdown_event():
await app.state.client.aclose()
@app.get('/')
async def home():
client = app.state.client
req = client.build_request('GET', 'https://www.example.com/')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
Пример (обновлен)
Поскольку startup
и shutdown
теперь устарели (и могут быть полностью удалены в будущем), вы могли бы вместо этого использовать lifespan
обработчик для инициализации httpx
клиента, а также закрыть экземпляр клиента при завершении работы, аналогично тому, что было продемонстрировано в этом ответе. Starlette специально предоставляет пример использования lifespan
обработчика и httpx
клиента на своей странице документации. Как описано в документации Starlette:
В
lifespan
есть концепцияstate
, которая представляет собой словарь, который можно использовать для совместного использования объектов между временем жизни и запросами.
state
, получаемое при запросах, представляет собой поверхностную копию состояния, полученного обработчиком продолжительности жизни.
Следовательно, к объектам, добавленным в состояние в обработчике продолжительности жизни, можно получить доступ внутри конечных точек с помощью request.state
. В приведенном ниже примере используется потоковый ответ как для связи с внешним сервером, так и для отправки ответа обратно клиенту. Смотрите Здесь для получения более подробной информации о async
методах потоковой передачи ответов в httpx
(т.е., aiter_bytes()
, aiter_text()
, aiter_lines()
и т.д.).
Если вы хотите использовать a media_type
для StreamingResponse
, вы могли бы использовать a из исходного ответа следующим образом: media_type=r.headers['content-type']
. Однако, как описано в этом ответе, вам нужно убедиться, что для media_type
не установлено значение text/plain
; в противном случае содержимое не будет транслироваться в браузере ожидаемым образом, если вы не отключите MIME-анализ (посмотрите связанный ответ для получения более подробной информации и решений).
from fastapi import FastAPI, Request
from contextlib import asynccontextmanager
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
import httpx
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialise the Client on startup and add it to the state
async with httpx.AsyncClient() as client:
yield {'client': client}
# The Client closes on shutdown
app = FastAPI(lifespan=lifespan)
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))
Если по какой-либо причине вам нужно читать содержимое по частям на стороне сервера, прежде чем отвечать клиенту, вы можете сделать это следующим образом:
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req, stream=True)
async def gen():
async for chunk in r.aiter_raw():
yield chunk
await r.aclose()
return StreamingResponse(gen())
Если вы не хотите использовать потоковый ответ, а скорее должны httpx
прочитать ответ для вас в первую очередь (что сохранит данные ответа в оперативной памяти сервера; следовательно, вы должны убедиться, что доступно достаточно места для размещения данных), вы могли бы использовать следующее. Обратите внимание, что использование r.json()
должно применяться только к случаям, когда данные ответа представлены в формате JSON; в противном случае вы могли бы вернуть PlainTextResponse
или пользовательский Response
запрос напрямую, как показано ниже.
from fastapi import Response
from fastapi.responses import PlainTextResponse
@app.get('/')
async def home(request: Request):
client = request.state.client
req = client.build_request('GET', 'https://www.example.com')
r = await client.send(req)
content_type = r.headers.get('content-type')
if content_type == 'application/json':
return r.json()
elif content_type == 'text/plain':
return PlainTextResponse(content=r.text)
else:
return Response(content=r.content)
Использование async
API httpx
означало бы, что вы должны определять свои конечные точки с помощью async def
; в противном случае вам пришлось бы использовать стандартный синхронный API (для def
vs async def
см. Этот ответ), и как описано в этом обсуждении на github:
Да.
HTTPX
предполагается, что он потокобезопасен, и да, один клиентский экземпляр во всех потоках будет работать лучше с точки зрения пула подключений, чем использование экземпляра для каждого потока.
Вы также можете управлять размером пула соединений, используя limits
аргумент ключевого слова в Client
(см. Конфигурацию ограничения пула). Например:
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
client = httpx.Client(limits=limits)