Вопрос-Ответ

How to Upload a large File (≥3GB) to FastAPI backend?

Как загрузить большой файл (≥ 3 ГБ) в серверную часть FastAPI?

Я пытаюсь загрузить большой файл (≥ 3 ГБ) на мой сервер FastAPI, не загружая весь файл в память, поскольку на моем сервере всего 2 ГБ свободной памяти.

Серверная часть:

async def uploadfiles(upload_file: UploadFile = File(...):

Клиентская часть:

m = MultipartEncoder(fields = {"upload_file":open(file_name,'rb')})
prefix = "http://xxx:5000"
url = "{}/v1/uploadfiles".format(prefix)
try:
req = requests.post(
url,
data=m,
verify=False,
)

который возвращает:

HTTP 422 {"detail":[{"loc":["body","upload_file"],"msg":"field required","type":"value_error.missing"}]}

Я не уверен, что MultipartEncoder на самом деле отправляет на сервер, поэтому запрос не соответствует. Есть идеи?

Переведено автоматически
Ответ 1

С requests-toolbelt библиотекой вы также должны передать filename при объявлении field for upload_file, а также установить Content-Type заголовок — что является основной причиной ошибки, которую вы получаете, поскольку вы отправляете запрос без установки Content-Type заголовка на multipart/form-data, за которым следует необходимая boundary строка — как показано в документации. Пример:

filename = 'my_file.txt'
m = MultipartEncoder(fields={'upload_file': (filename, open(filename, 'rb'))})
r = requests.post(url, data=m, headers={'Content-Type': m.content_type})
print(r.request.headers) # confirm that the 'Content-Type' header has been set

Однако я бы не рекомендовал использовать библиотеку (т. Е. requests-toolbelt), которая не выпускала новых версий уже более трех лет. Я бы предложил вместо этого использовать запросы Python, как показано в этом ответе и том ответе (также см. Потоковые загрузки и запросы в кодировке фрагментов), или, предпочтительно, использовать HTTPX библиотеку, которая поддерживает async запросы (если вам пришлось отправлять несколько запросов одновременно), а также потоковые File загрузки по умолчанию, что означает, что в память будет загружаться только один фрагмент за раз (см. Документацию). Примеры приведены ниже.

Вариант 1 (быстрый) - загрузка File и Form данных с помощью .stream()

Как ранее подробно объяснялось в этом ответе, когда вы объявляете UploadFile объект, FastAPI / Starlette под капотом использует SpooledTemporaryFile с max_size атрибутом, равным 1 МБ, что означает, что данные файла сохраняются в памяти до тех пор, пока размер файла не превысит max_size, после чего содержимое записывается на диск; более конкретно, в temporary файл во временном каталоге вашей ОС — см. Этот ответ о том, как найти / изменить временный каталог по умолчанию, — который вы хотите сохранить. позже нужно будет прочитать данные из, используя метод .read(). Следовательно, весь этот процесс делает загрузку файла довольно медленной; особенно, если это большой файл (как вы увидите в варианте 2 ниже позже).

Чтобы избежать этого и ускорить процесс, как было предложено в приведенном выше связанном ответе, можно получить доступ к request телу в виде потока. Согласно документации Starlette, если вы используете .stream() метод, (запрашивающие) фрагменты байтов предоставляются без сохранения всего тела в памяти (и позже во временном файле, если размер тела превышает 1 МБ). Этот метод позволяет вам считывать и обрабатывать фрагменты байтов по мере их поступления. Ниже предлагается решение на шаг дальше, с использованием streaming-form-data библиотеки, которая предоставляет анализатор Python для анализа потоковых multipart/form-data входных фрагментов. Это означает, что вы не только можете загружать Form данные вместе с File(s), но вам также не нужно ждать получения всего тела запроса, чтобы начать синтаксический анализ данных. Способ, которым это делается, заключается в том, что вы инициализируете основной класс синтаксического анализатора (передавая HTTP-запрос, headers который помогает определить входные данные Content-Type и, следовательно, boundary строку, используемую для разделения каждой части тела в составных полезных данных и т.д.), И связываете один из Target классов, чтобы определить, что следует делать с полем, когда оно будет извлечено из тела запроса. Например, FileTarget будет передавать данные в файл на диске, тогда как ValueTarget будет хранить данные в памяти (этот класс может использоваться для любого из Form или File также данные, если вам не нужны файлы, сохраненные на диске). Также возможно определить ваши собственные пользовательские Target классы. Я должен упомянуть, что streaming-form-data библиотека в настоящее время не поддерживает async вызовы операций ввода-вывода, что означает, что запись фрагментов происходит syncхронологически (внутри def функции). Хотя, поскольку конечная точка ниже использует .stream() (которая является async функцией), она откажется от управления другими задачами / запросами для запуска в цикле событий, ожидая, пока данные станут доступны из потока. Вы также могли бы запустить функцию для синтаксического анализа полученных данных в отдельном потоке и await это, используя Starlette run_in_threadpool()— например, await run_in_threadpool(parser.data_received, chunk)— который используется FastAPI внутренне при вызове async методов UploadFile, как показано здесь. Для получения более подробной информации о def vs async def, пожалуйста, ознакомьтесь с этим ответом.

Вы также можете выполнять определенные задачи проверки, например, следить за тем, чтобы размер входных данных не превышал определенного значения. Это можно сделать с помощью MaxSizeValidator. Однако, поскольку это будет применяться только к определенным вами полям — и, следовательно, это не помешает злоумышленнику отправить чрезвычайно большое тело запроса, что может привести к потреблению ресурсов сервера таким образом, что приложение может в конечном итоге завершиться сбоем — приведенное ниже включает пользовательский MaxBodySizeValidator класс, который используется, чтобы убедиться, что размер тела запроса не превышает заранее определенного значения. Оба описанных выше средства проверки решают проблему ограничения размера загружаемого файла (а также всего тела запроса), вероятно, лучшим способом, чем тот, который описан здесь, который использует UploadFile, и, следовательно, файл должен быть полностью получен и сохранен во временном каталоге перед выполнением проверки (не говоря уже о том, что подход вообще не учитывает размер тела запроса) — использование промежуточного программного обеспечения ASGI, такого как this, было бы альтернативным решением для ограничения размера файла. тело запроса. Также, если вы используете Gunicorn с помощью Uvicorn вы также можете определить ограничения в отношении, например, количества полей заголовка HTTP-запроса в запросе, размера поля заголовка HTTP-запроса и так далее (см. Документацию). Аналогичные ограничения могут применяться при использовании обратных прокси-серверов, таких как Nginx (который также позволяет вам устанавливать максимальный размер тела запроса с помощью client_max_body_size директивы).

Несколько замечаний к приведенному ниже примеру. Поскольку он использует Request объект напрямую, а не объекты UploadFile и Form, конечная точка не будет должным образом задокументирована в автоматически сгенерированных документах по адресу /docs (если это вообще важно для вашего приложения). Это также означает, что вы должны самостоятельно выполнить некоторые проверки, например, были ли получены требуемые поля для конечной точки или нет, и были ли они в ожидаемом формате. Например, для data поля вы могли бы проверить, является ли data.value пустым или нет (пустое будет означать, что пользователь либо не включил это поле в multipart/form-data, либо отправил пустое значение), а также, если isinstance(data.value, str). Что касается файла (ов), вы можете проверить, не является ли file_.multipart_filename пустым; однако, поскольку filename какой-либо пользователь, вероятно, не мог быть включен в Content-Disposition, вы также можете захотеть проверить, существует ли файл в файловой системе, используя os.path.isfile(filepath) (Примечание: вам нужно убедиться, что в указанном месте нет ранее существовавшего файла с таким же именем; в противном случае вышеупомянутая функция всегда возвращала бы True, даже если пользователь не отправлял файл).

Что касается применяемых ограничений по размеру, то MAX_REQUEST_BODY_SIZE приведенные ниже значения должны быть больше, чем MAX_FILE_SIZE (плюс все Form значения size), которые вы ожидаете получить, поскольку исходное тело запроса (которое вы получаете с помощью .stream() метода) включает в себя еще несколько байтов для --boundary и Content-Disposition заголовок для каждого из полей в теле. Следовательно, вам следует добавить еще несколько байт, в зависимости от Form значений и количества файлов, которые вы ожидаете получить (отсюда MAX_FILE_SIZE + 1024 ниже).

app.py

from fastapi import FastAPI, Request, HTTPException, status
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import FileTarget, ValueTarget
from streaming_form_data.validators import MaxSizeValidator
import streaming_form_data
from starlette.requests import ClientDisconnect
import os

MAX_FILE_SIZE = 1024 * 1024 * 1024 * 4 # = 4GB
MAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024

app = FastAPI()

class MaxBodySizeException(Exception):
def __init__(self, body_len: str):
self.body_len = body_len

class MaxBodySizeValidator:
def __init__(self, max_size: int):
self.body_len = 0
self.max_size = max_size

def __call__(self, chunk: bytes):
self.body_len += len(chunk)
if self.body_len > self.max_size:
raise MaxBodySizeException(body_len=self.body_len)

@app.post('/upload')
async def upload(request: Request):
body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)
filename = request.headers.get('Filename')

if not filename:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Filename header is missing')
try:
filepath = os.path.join('./', os.path.basename(filename))
file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))
data = ValueTarget()
parser = StreamingFormDataParser(headers=request.headers)
parser.register('file', file_)
parser.register('data', data)

async for chunk in request.stream():
body_validator(chunk)
parser.data_received(chunk)
except ClientDisconnect:
print("Client Disconnected")
except MaxBodySizeException as e:
raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f'Maximum request body size limit ({MAX_REQUEST_BODY_SIZE} bytes) exceeded ({e.body_len} bytes read)')
except streaming_form_data.validators.ValidationError:
raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f'Maximum file size limit ({MAX_FILE_SIZE} bytes) exceeded')
except Exception:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='There was an error uploading the file')

if not file_.multipart_filename:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail='File is missing')

print(data.value.decode())
print(file_.multipart_filename)

return {"message": f"Successfuly uploaded {filename}"}

Как упоминалось ранее, для загрузки данных (на стороне клиента) вы можете использовать HTTPX библиотеку, которая по умолчанию поддерживает потоковую загрузку файлов и, таким образом, позволяет отправлять большие потоки / файлы, не загружая их полностью в память. Вы также можете передать дополнительные Form данные, используя data аргумент. Ниже пользовательский заголовок, т. Е. Filename, используется для передачи имени файла серверу, чтобы сервер создавал экземпляр FileTarget класса с этим именем (вы могли бы использовать X- префикс для пользовательских заголовков, если хотите; однако это официально больше не рекомендуется).

Чтобы загрузить несколько файлов, используйте заголовок для каждого файла (или используйте случайные имена на стороне сервера, и как только файл будет полностью загружен, вы можете при желании переименовать его, используя file_.multipart_filename атрибут), передайте список файлов, как описано в документации (Примечание: используйте разные имена полей для каждого файла, чтобы они не перекрывались при их разборе на стороне сервера, например, files = [('file', open('bigFile.zip', 'rb')),('file_2', open('bigFile2.zip', 'rb'))], и, наконец, соответствующим образом определите Target классы на стороне сервера.

test.py

import httpx
import time

url ='http://127.0.0.1:8000/upload'
files = {'file': open('bigFile.zip', 'rb')}
headers={'Filename': 'bigFile.zip'}
data = {'data': 'Hello World!'}

with httpx.Client() as client:
start = time.time()
r = client.post(url, data=data, files=files, headers=headers)
end = time.time()
print(f'Time elapsed: {end - start}s')
print(r.status_code, r.json(), sep=' ')

Загрузить как File, так и JSON тело

В случае, если вы хотите загрузить оба файла (ов) и JSON вместо Form данных, вы можете использовать подход, описанный в Методе 3 этого ответа, что также избавит вас от выполнения ручных проверок полученных Form полей, как объяснялось ранее (подробнее смотрите Связанный ответ). Для этого внесите следующие изменения в приведенный выше код.

app.py

#...
from fastapi import Form
from pydantic import BaseModel, ValidationError
from typing import Optional
from fastapi.encoders import jsonable_encoder

#...

class Base(BaseModel):
name: str
point: Optional[float] = None
is_accepted: Optional[bool] = False

def checker(data: str = Form(...)):
try:
return Base.parse_raw(data)
except ValidationError as e:
raise HTTPException(detail=jsonable_encoder(e.errors()), status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)


@app.post('/upload')
async def upload(request: Request):
#...

# place the below after the try-except block in the example given earlier
model = checker(data.value.decode())
print(dict(model))

test.py

#...
import json

data = {'data': json.dumps({"name": "foo", "point": 0.13, "is_accepted": False})}
#...

Вариант 2 (медленный) - Загрузка File и Form данных с использованием UploadFile и Form

Если вы хотите использовать вместо этого обычную def конечную точку, смотрите Этот ответ .

app.py

from fastapi import FastAPI, File, UploadFile, Form, HTTPException, status
import aiofiles
import os

CHUNK_SIZE = 1024 * 1024 # adjust the chunk size as desired
app = FastAPI()

@app.post("/upload")
async def upload(file: UploadFile = File(...), data: str = Form(...)):
try:
filepath = os.path.join('./', os.path.basename(file.filename))
async with aiofiles.open(filepath, 'wb') as f:
while chunk := await file.read(CHUNK_SIZE):
await f.write(chunk)
except Exception:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='There was an error uploading the file')
finally:
await file.close()

return {"message": f"Successfuly uploaded {file.filename}"}

Как упоминалось ранее, при использовании этой опции для завершения загрузки файла потребуется больше времени, а поскольку HTTPX по умолчанию используется тайм-аут в 5 секунд, вы, скорее всего, получите ReadTimeout исключение (поскольку серверу потребуется некоторое время, чтобы прочитать SpooledTemporaryFile фрагменты и записать содержимое в постоянное место на диске). Таким образом, вы можете настроить время ожидания (см. Также класс Timeout в исходном коде), и, более конкретно, read время ожидания, которое "определяет максимальную продолжительность ожидания получения фрагмента данных (например, фрагмента тела ответа)". Если установить значение None вместо некоторого положительного числового значения, таймаута не будет read.

test.py

import httpx
import time

url ='http://127.0.0.1:8000/upload'
files = {'file': open('bigFile.zip', 'rb')}
headers={'Filename': 'bigFile.zip'}
data = {'data': 'Hello World!'}
timeout = httpx.Timeout(None, read=180.0)

with httpx.Client(timeout=timeout) as client:
start = time.time()
r = client.post(url, data=data, files=files, headers=headers)
end = time.time()
print(f'Time elapsed: {end - start}s')
print(r.status_code, r.json(), sep=' ')
python