Как протоколировать необработанный HTTP-запрос / ответ в Python FastAPI?
Мы пишем веб-сервис с использованием Python FastAPI, который будет размещен в Kubernetes. Для целей аудита нам нужно сохранить необработанное тело JSON-файла request
/response
для определенных маршрутов. Размер тела как request
, так и response
JSON составляет около 1 МБ, и желательно, чтобы это не влияло на время ответа. Как мы можем это сделать?
Переведено автоматически
Ответ 1
Вы могли бы использовать Middleware
. A middleware
принимает каждый запрос, поступающий в ваше приложение, и, следовательно, позволяет вам обрабатывать request
до того, как он будет обработан какой-либо конкретной конечной точкой, а также response
до того, как он будет возвращен клиенту. Чтобы создать middleware
, вы используете декоратор @app.middleware("http")
поверх функции, как показано ниже.
Поскольку вам нужно использовать тело запроса из потока внутри middleware
— используя либо request.body()
, либо request.stream()
, как показано в этом ответе (за кулисами первый метод фактически вызывает второй, см. Здесь) — тогда он будет недоступен, когда вы позже передадите request
соответствующей конечной точке. Таким образом, вы можете следовать подходу, описанному в этом посте, чтобы сделать тело запроса доступным в дальнейшем (т. Е. Используя set_body()
функцию ниже). ОБНОВЛЕНИЕ: Эта проблема теперь исправлена, и, следовательно, нет необходимости использовать это обходное решение, если вы используете FastAPI 0.108.0 или более поздней версии.
Что касается response
тела, вы можете использовать тот же подход, что описан в этом ответе, чтобы использовать тело, а затем вернуть response
клиенту. Любой из вариантов, описанных в вышеупомянутом связанном ответе, будет работать; однако ниже используется вариант 2, который сохраняет тело в объекте bytes и возвращает пользовательский Response
напрямую (вместе с status_code
, headers
и media_type
оригинала response
).
Для протоколирования данных вы могли бы использовать BackgroundTask
, как описано в этом ответе, а также в этом ответе и этом ответе. A BackgroundTask
будет выполняться только после отправки ответа (см. Также Документацию Starlette); таким образом, клиенту не придется ждать завершения протоколирования перед получением response
(и, следовательно, время ответа заметно не сократится).
Если бы у вас была потоковая передача request
или response
с телом, которое не помещалось бы в оперативную память вашего сервера (например, представьте тело объемом 100 ГБ на компьютере с 8 ГБ оперативной памяти), это стало бы проблематичным, поскольку вы сохраняете данные в оперативной памяти, в которой было бы недостаточно свободного места для размещения накопленных данных. Кроме того, в случае большого response
(например, большого FileResponse
или StreamingResponse
) вы можете столкнуться с Timeout
ошибками на стороне клиента (или на стороне обратного прокси, если вы его используете), поскольку вы не сможете ответить клиенту, пока не прочтете все тело ответа (поскольку вы перебираете response.body_iterator
).). Вы упомянули, что "размер тела как запроса, так и ответа JSON составляет около 1 МБ"; следовательно, обычно это должно быть нормально (однако всегда рекомендуется заранее учитывать вопросы, например, сколько запросов ваш API, как ожидается, будет обслуживать одновременно, какие другие приложения могут использовать оперативную память и т.д., Чтобы определить, является ли это проблемой или нет). При необходимости вы могли бы ограничить количество запросов к конечным точкам вашего API, используя, например, SlowAPI (как показано в этом ответе).
middleware
только определенными маршрутамиВы могли бы ограничить использование middleware
определенными конечными точками,:
request.url.path
внутри промежуточного программного обеспечения по заранее определенному списку маршрутов, для которых вы хотели бы протоколировать request
и response
, как описано в этом ответе (см. Раздел "Обновление"),APIRoute
класса, как показано в варианте 2 ниже.from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging
app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
# not needed when using FastAPI>=0.108.0.
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = await request.body()
await set_body(request, req_body) # not needed when using FastAPI>=0.108.0.
response = await call_next(request)
res_body = b''
async for chunk in response.body_iterator:
res_body += chunk
task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)
@app.post('/')
def main(payload: Dict[Any, Any]):
return payload
В случае, если вы хотите выполнить некоторую проверку тела запроса — например, убедиться, что размер тела запроса не превышает определенного значения — вместо использования request.body()
, вы можете обрабатывать тело по одному фрагменту за раз, используя .stream()
метод, как показано ниже (аналогично этому ответу).
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = b''
async for chunk in request.stream():
req_body += chunk
...
APIRoute
классаВ качестве альтернативы вы можете использовать пользовательский APIRoute
класс — аналогичный here и here — который, среди прочего, позволил бы вам манипулировать request
телом до того, как оно будет обработано вашим приложением, а также response
телом до того, как оно будет возвращено клиенту. Эта опция также позволяет вам ограничить использование этого класса маршрутами, которые вы хотите, поскольку только конечные точки под APIRouter
(т.Е. router
В примере ниже) будут использовать пользовательский APIRoute
класс .
Следует отметить, что те же комментарии, упомянутые в варианте 1 выше, в разделе "Примечание", применимы и к этому варианту. Например, если ваш API возвращает StreamingResponse
— например, в /video
маршруте приведенного ниже примера, который представляет собой потоковую передачу видеофайла из онлайн—источника (общедоступные видеоролики для проверки этого можно найти здесь, и вы даже можете использовать более длинное видео, чем то, которое используется ниже, чтобы более четко увидеть эффект) - вы можете столкнуться с проблемами на стороне сервера, если оперативная память вашего сервера не справляется с этим, а также с задержками на стороне клиента (и обратного прокси-сервера, если он используется) из-за всего ( потоковый) ответ считывается и сохраняется в оперативной памяти, прежде чем он будет возвращен клиенту (как объяснялось ранее). В таких случаях вы могли бы исключить такие конечные точки, которые возвращают StreamingResponse
из пользовательского APIRoute
класса, и ограничить его использование только желаемыми маршрутами — особенно, если это большой видеофайл или даже видео в реальном времени, которое вряд ли имело бы смысл сохранять в журналах — просто не используя @<name_of_router>
декоратор (т.Е. @router
В примере ниже) для таких конечных точек, а скорее используя @<name_of_app>
декоратор (т.Е. @app
в примере ниже), или какое-либо другое APIRouter
или вспомогательное приложение.
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
response = await original_route_handler(request)
tasks = response.background
if isinstance(response, StreamingResponse):
res_body = b''
async for item in response.body_iterator:
res_body += item
task = BackgroundTask(log_info, req_body, res_body)
response = Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)
else:
task = BackgroundTask(log_info, req_body, response.body)
# check if the original response had background tasks already attached to it
if tasks:
tasks.add_task(task) # add the new task to the tasks list
response.background = tasks
else:
response.background = task
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)
@router.post('/')
def main(payload: Dict[Any, Any]):
return payload
@router.get('/video')
def get_video():
url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
def gen():
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw():
yield chunk
return StreamingResponse(gen(), media_type='video/mp4')
app.include_router(router)
Ответ 2
Вы можете попробовать настроить APIRouter, как в официальной документации FastAPI:
import time
from typing import Callable
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@app.get("/")
async def not_timed():
return {"message": "Not timed"}
@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}
app.include_router(router)
Ответ 3
Поскольку другие ответы у меня не сработали, и я довольно тщательно искал в stackoverflow, чтобы исправить эту проблему, я покажу свое решение ниже.
Основная проблема заключается в том, что при использовании тела запроса или ответа многие подходы / решения, предлагаемые онлайн, просто не работают, поскольку тело запроса / ответа используется при чтении его из потока.
Для решения этой проблемы я адаптировал подход, который в основном реконструирует запрос и ответ после их прочтения. Это в значительной степени основано на комментарии пользователя 'kovalevvlad' на https://github.com/encode/starlette/issues/495.
Создается пользовательское промежуточное программное обеспечение, которое позже добавляется в приложение для протоколирования всех запросов и ответов. Обратите внимание, что вам нужен какой-то регистратор для хранения ваших журналов.
from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message
# Set up your custom logger here
logger = ""
class RequestWithBody(Request):
"""Creation of new request with body"""
def __init__(self, scope: Scope, body: bytes) -> None:
super().__init__(scope, self._receive)
self._body = body
self._body_returned = False
async def _receive(self) -> Message:
if self._body_returned:
return {"type": "http.disconnect"}
else:
self._body_returned = True
return {"type": "http.request", "body": self._body, "more_body": False}
class CustomLoggingMiddleware(BaseHTTPMiddleware):
"""
Use of custom middleware since reading the request body and the response consumes the bytestream.
Hence this approach to basically generate a new request/response when we read the attributes for logging.
"""
async def dispatch( # type: ignore
self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
) -> Response:
# Store request body in a variable and generate new request as it is consumed.
request_body_bytes = await request.body()
request_with_body = RequestWithBody(request.scope, request_body_bytes)
# Store response body in a variable and generate new response as it is consumed.
response = await call_next(request_with_body)
response_content_bytes, response_headers, response_status = await self._get_response_params(response)
# Logging
# If there is no request body handle exception, otherwise convert bytes to JSON.
try:
req_body = json.loads(request_body_bytes)
except JSONDecodeError:
req_body = ""
# Logging of relevant variables.
logger.info(
f"{request.method} request to {request.url} metadata\n"
f"\tStatus_code: {response.status_code}\n"
f"\tRequest_Body: {req_body}\n"
)
# Finally, return the newly instantiated response values
return Response(response_content_bytes, response_status, response_headers)
async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
"""Getting the response parameters of a response and create a new response."""
response_byte_chunks: List[bytes] = []
response_status: List[int] = []
response_headers: List[Dict[str, str]] = []
async def send(message: Message) -> None:
if message["type"] == "http.response.start":
response_status.append(message["status"])
response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
else:
response_byte_chunks.append(message["body"])
await response.stream_response(send)
content = b"".join(response_byte_chunks)
return content, response_headers[0], response_status[0]