Вопрос-Ответ

How to log raw HTTP request/response in Python FastAPI?

Как протоколировать необработанный HTTP-запрос / ответ в Python FastAPI?

Мы пишем веб-сервис с использованием Python FastAPI, который будет размещен в Kubernetes. Для целей аудита нам нужно сохранить необработанное тело JSON-файла request/response для определенных маршрутов. Размер тела как request, так и response JSON составляет около 1 МБ, и желательно, чтобы это не влияло на время ответа. Как мы можем это сделать?

Переведено автоматически
Ответ 1

Вариант 1 - Использование промежуточного программного обеспечения

Вы могли бы использовать Middleware. A middleware принимает каждый запрос, поступающий в ваше приложение, и, следовательно, позволяет вам обрабатывать request до того, как он будет обработан какой-либо конкретной конечной точкой, а также response до того, как он будет возвращен клиенту. Чтобы создать middleware, вы используете декоратор @app.middleware("http") поверх функции, как показано ниже.

Поскольку вам нужно использовать тело запроса из потока внутри middleware — используя либо request.body(), либо request.stream(), как показано в этом ответе (за кулисами первый метод фактически вызывает второй, см. Здесь) — тогда он будет недоступен, когда вы позже передадите request соответствующей конечной точке. Таким образом, вы можете следовать подходу, описанному в этом посте, чтобы сделать тело запроса доступным в дальнейшем (т. Е. Используя set_body() функцию ниже). ОБНОВЛЕНИЕ: Эта проблема теперь исправлена, и, следовательно, нет необходимости использовать это обходное решение, если вы используете FastAPI 0.108.0 или более поздней версии.

Что касается response тела, вы можете использовать тот же подход, что описан в этом ответе, чтобы использовать тело, а затем вернуть response клиенту. Любой из вариантов, описанных в вышеупомянутом связанном ответе, будет работать; однако ниже используется вариант 2, который сохраняет тело в объекте bytes и возвращает пользовательский Response напрямую (вместе с status_code, headers и media_type оригинала response).

Для протоколирования данных вы могли бы использовать BackgroundTask, как описано в этом ответе, а также в этом ответе и этом ответе. A BackgroundTask будет выполняться только после отправки ответа (см. Также Документацию Starlette); таким образом, клиенту не придется ждать завершения протоколирования перед получением response (и, следовательно, время ответа заметно не сократится).

Примечание

Если бы у вас была потоковая передача request или response с телом, которое не помещалось бы в оперативную память вашего сервера (например, представьте тело объемом 100 ГБ на компьютере с 8 ГБ оперативной памяти), это стало бы проблематичным, поскольку вы сохраняете данные в оперативной памяти, в которой было бы недостаточно свободного места для размещения накопленных данных. Кроме того, в случае большого response (например, большого FileResponse или StreamingResponse) вы можете столкнуться с Timeout ошибками на стороне клиента (или на стороне обратного прокси, если вы его используете), поскольку вы не сможете ответить клиенту, пока не прочтете все тело ответа (поскольку вы перебираете response.body_iterator).). Вы упомянули, что "размер тела как запроса, так и ответа JSON составляет около 1 МБ"; следовательно, обычно это должно быть нормально (однако всегда рекомендуется заранее учитывать вопросы, например, сколько запросов ваш API, как ожидается, будет обслуживать одновременно, какие другие приложения могут использовать оперативную память и т.д., Чтобы определить, является ли это проблемой или нет). При необходимости вы могли бы ограничить количество запросов к конечным точкам вашего API, используя, например, SlowAPI (как показано в этом ответе).

Ограничение использования middleware только определенными маршрутами

Вы могли бы ограничить использование middleware определенными конечными точками,:


  • проверка request.url.path внутри промежуточного программного обеспечения по заранее определенному списку маршрутов, для которых вы хотели бы протоколировать request и response, как описано в этом ответе (см. Раздел "Обновление"),

  • или с помощью вспомогательного приложения, как показано в этом ответе

  • или с помощью пользовательского APIRoute класса, как показано в варианте 2 ниже.

Рабочий пример

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging


app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)


def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)


# not needed when using FastAPI>=0.108.0.
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive


@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = await request.body()
await set_body(request, req_body) # not needed when using FastAPI>=0.108.0.
response = await call_next(request)

res_body = b''
async for chunk in response.body_iterator:
res_body += chunk

task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)


@app.post('/')
def main(payload: Dict[Any, Any]):
return payload

В случае, если вы хотите выполнить некоторую проверку тела запроса — например, убедиться, что размер тела запроса не превышает определенного значения — вместо использования request.body(), вы можете обрабатывать тело по одному фрагменту за раз, используя .stream() метод, как показано ниже (аналогично этому ответу).

@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = b''
async for chunk in request.stream():
req_body += chunk
...

Вариант 2 - Использование пользовательского APIRoute класса

В качестве альтернативы вы можете использовать пользовательский APIRoute класс — аналогичный here и here — который, среди прочего, позволил бы вам манипулировать request телом до того, как оно будет обработано вашим приложением, а также response телом до того, как оно будет возвращено клиенту. Эта опция также позволяет вам ограничить использование этого класса маршрутами, которые вы хотите, поскольку только конечные точки под APIRouter (т.Е. router В примере ниже) будут использовать пользовательский APIRoute класс .

Следует отметить, что те же комментарии, упомянутые в варианте 1 выше, в разделе "Примечание", применимы и к этому варианту. Например, если ваш API возвращает StreamingResponse — например, в /video маршруте приведенного ниже примера, который представляет собой потоковую передачу видеофайла из онлайн—источника (общедоступные видеоролики для проверки этого можно найти здесь, и вы даже можете использовать более длинное видео, чем то, которое используется ниже, чтобы более четко увидеть эффект) - вы можете столкнуться с проблемами на стороне сервера, если оперативная память вашего сервера не справляется с этим, а также с задержками на стороне клиента (и обратного прокси-сервера, если он используется) из-за всего ( потоковый) ответ считывается и сохраняется в оперативной памяти, прежде чем он будет возвращен клиенту (как объяснялось ранее). В таких случаях вы могли бы исключить такие конечные точки, которые возвращают StreamingResponse из пользовательского APIRoute класса, и ограничить его использование только желаемыми маршрутами — особенно, если это большой видеофайл или даже видео в реальном времени, которое вряд ли имело бы смысл сохранять в журналах — просто не используя @<name_of_router> декоратор (т.Е. @router В примере ниже) для таких конечных точек, а скорее используя @<name_of_app> декоратор (т.Е. @app в примере ниже), или какое-либо другое APIRouter или вспомогательное приложение.

Рабочий пример

from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx


def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)


class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()

async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
response = await original_route_handler(request)
tasks = response.background

if isinstance(response, StreamingResponse):
res_body = b''
async for item in response.body_iterator:
res_body += item

task = BackgroundTask(log_info, req_body, res_body)
response = Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)
else:
task = BackgroundTask(log_info, req_body, response.body)

# check if the original response had background tasks already attached to it
if tasks:
tasks.add_task(task) # add the new task to the tasks list
response.background = tasks
else:
response.background = task

return response

return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)


@router.post('/')
def main(payload: Dict[Any, Any]):
return payload


@router.get('/video')
def get_video():
url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'

def gen():
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw():
yield chunk

return StreamingResponse(gen(), media_type='video/mp4')


app.include_router(router)
Ответ 2

Вы можете попробовать настроить APIRouter, как в официальной документации FastAPI:

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()

async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response

return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
return {"message": "Not timed"}


@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}


app.include_router(router)
Ответ 3

Поскольку другие ответы у меня не сработали, и я довольно тщательно искал в stackoverflow, чтобы исправить эту проблему, я покажу свое решение ниже.

Основная проблема заключается в том, что при использовании тела запроса или ответа многие подходы / решения, предлагаемые онлайн, просто не работают, поскольку тело запроса / ответа используется при чтении его из потока.

Для решения этой проблемы я адаптировал подход, который в основном реконструирует запрос и ответ после их прочтения. Это в значительной степени основано на комментарии пользователя 'kovalevvlad' на https://github.com/encode/starlette/issues/495.

Создается пользовательское промежуточное программное обеспечение, которое позже добавляется в приложение для протоколирования всех запросов и ответов. Обратите внимание, что вам нужен какой-то регистратор для хранения ваших журналов.

from json import JSONDecodeError
import json
import logging
from typing import Callable, Awaitable, Tuple, Dict, List

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.types import Scope, Message

# Set up your custom logger here
logger = ""

class RequestWithBody(Request):
"""Creation of new request with body"""
def __init__(self, scope: Scope, body: bytes) -> None:
super().__init__(scope, self._receive)
self._body = body
self._body_returned = False

async def _receive(self) -> Message:
if self._body_returned:
return {"type": "http.disconnect"}
else:
self._body_returned = True
return {"type": "http.request", "body": self._body, "more_body": False}


class CustomLoggingMiddleware(BaseHTTPMiddleware):
"""
Use of custom middleware since reading the request body and the response consumes the bytestream.
Hence this approach to basically generate a new request/response when we read the attributes for logging.
"""

async def dispatch( # type: ignore
self, request: Request, call_next: Callable[[Request], Awaitable[StreamingResponse]]
) -> Response:
# Store request body in a variable and generate new request as it is consumed.
request_body_bytes = await request.body()

request_with_body = RequestWithBody(request.scope, request_body_bytes)

# Store response body in a variable and generate new response as it is consumed.
response = await call_next(request_with_body)
response_content_bytes, response_headers, response_status = await self._get_response_params(response)

# Logging

# If there is no request body handle exception, otherwise convert bytes to JSON.
try:
req_body = json.loads(request_body_bytes)
except JSONDecodeError:
req_body = ""
# Logging of relevant variables.
logger.info(
f"{request.method} request to {request.url} metadata\n"
f"\tStatus_code: {response.status_code}\n"
f"\tRequest_Body: {req_body}\n"
)
# Finally, return the newly instantiated response values
return Response(response_content_bytes, response_status, response_headers)

async def _get_response_params(self, response: StreamingResponse) -> Tuple[bytes, Dict[str, str], int]:
"""Getting the response parameters of a response and create a new response."""
response_byte_chunks: List[bytes] = []
response_status: List[int] = []
response_headers: List[Dict[str, str]] = []

async def send(message: Message) -> None:
if message["type"] == "http.response.start":
response_status.append(message["status"])
response_headers.append({k.decode("utf8"): v.decode("utf8") for k, v in message["headers"]})
else:
response_byte_chunks.append(message["body"])

await response.stream_response(send)
content = b"".join(response_byte_chunks)
return content, response_headers[0], response_status[0]
python