Как загрузить большой файл (≥ 3 ГБ) в серверную часть FastAPI?
Я пытаюсь загрузить большой файл (≥ 3 ГБ) на мой сервер FastAPI, не загружая весь файл в память, поскольку на моем сервере всего 2 ГБ свободной памяти.
Серверная часть:
async def uploadfiles(upload_file: UploadFile = File(...):
Клиентская часть:
m = MultipartEncoder(fields = {"upload_file":open(file_name,'rb')})
prefix = "http://xxx:5000"
url = "{}/v1/uploadfiles".format(prefix)
try:
req = requests.post(
url,
data=m,
verify=False,
)
который возвращает:
HTTP 422 {"detail":[{"loc":["body","upload_file"],"msg":"field required","type":"value_error.missing"}]}
Я не уверен, что MultipartEncoder
на самом деле отправляет на сервер, поэтому запрос не соответствует. Есть идеи?
Переведено автоматически
Ответ 1
С requests-toolbelt
библиотекой вы также должны передать filename
при объявлении field
for upload_file
, а также установить Content-Type
заголовок — что является основной причиной ошибки, которую вы получаете, поскольку вы отправляете запрос без установки Content-Type
заголовка на multipart/form-data
, за которым следует необходимая boundary
строка — как показано в документации. Пример:
filename = 'my_file.txt'
m = MultipartEncoder(fields={'upload_file': (filename, open(filename, 'rb'))})
r = requests.post(url, data=m, headers={'Content-Type': m.content_type})
print(r.request.headers) # confirm that the 'Content-Type' header has been set
Однако я бы не рекомендовал использовать библиотеку (т. Е. requests-toolbelt
), которая не выпускала новых версий уже более трех лет. Я бы предложил вместо этого использовать запросы Python, как показано в этом ответе и том ответе (также см. Потоковые загрузки и запросы в кодировке фрагментов), или, предпочтительно, использовать HTTPX
библиотеку, которая поддерживает async
запросы (если вам пришлось отправлять несколько запросов одновременно), а также потоковые File
загрузки по умолчанию, что означает, что в память будет загружаться только один фрагмент за раз (см. Документацию). Примеры приведены ниже.
File
и Form
данных с помощью .stream()
Как ранее подробно объяснялось в этом ответе, когда вы объявляете UploadFile
объект, FastAPI / Starlette под капотом использует SpooledTemporaryFile
с max_size
атрибутом, равным 1 МБ, что означает, что данные файла сохраняются в памяти до тех пор, пока размер файла не превысит max_size
, после чего содержимое записывается на диск; более конкретно, в temporary
файл во временном каталоге вашей ОС — см. Этот ответ о том, как найти / изменить временный каталог по умолчанию, — который вы хотите сохранить. позже нужно будет прочитать данные из, используя метод .read()
. Следовательно, весь этот процесс делает загрузку файла довольно медленной; особенно, если это большой файл (как вы увидите в варианте 2 ниже позже).
Чтобы избежать этого и ускорить процесс, как было предложено в приведенном выше связанном ответе, можно получить доступ к request
телу в виде потока. Согласно документации Starlette, если вы используете .stream()
метод, (запрашивающие) фрагменты байтов предоставляются без сохранения всего тела в памяти (и позже во временном файле, если размер тела превышает 1 МБ). Этот метод позволяет вам считывать и обрабатывать фрагменты байтов по мере их поступления. Ниже предлагается решение на шаг дальше, с использованием streaming-form-data
библиотеки, которая предоставляет анализатор Python для анализа потоковых multipart/form-data
входных фрагментов. Это означает, что вы не только можете загружать Form
данные вместе с File(s)
, но вам также не нужно ждать получения всего тела запроса, чтобы начать синтаксический анализ данных. Способ, которым это делается, заключается в том, что вы инициализируете основной класс синтаксического анализатора (передавая HTTP-запрос, headers
который помогает определить входные данные Content-Type
и, следовательно, boundary
строку, используемую для разделения каждой части тела в составных полезных данных и т.д.), И связываете один из Target
классов, чтобы определить, что следует делать с полем, когда оно будет извлечено из тела запроса. Например, FileTarget
будет передавать данные в файл на диске, тогда как ValueTarget
будет хранить данные в памяти (этот класс может использоваться для любого из Form
или File
также данные, если вам не нужны файлы, сохраненные на диске). Также возможно определить ваши собственные пользовательские Target
классы. Я должен упомянуть, что streaming-form-data
библиотека в настоящее время не поддерживает async
вызовы операций ввода-вывода, что означает, что запись фрагментов происходит sync
хронологически (внутри def
функции). Хотя, поскольку конечная точка ниже использует .stream()
(которая является async
функцией), она откажется от управления другими задачами / запросами для запуска в цикле событий, ожидая, пока данные станут доступны из потока. Вы также могли бы запустить функцию для синтаксического анализа полученных данных в отдельном потоке и await
это, используя Starlette run_in_threadpool()
— например, await run_in_threadpool(parser.data_received, chunk)
— который используется FastAPI внутренне при вызове async
методов UploadFile
, как показано здесь. Для получения более подробной информации о def
vs async def
, пожалуйста, ознакомьтесь с этим ответом.
Вы также можете выполнять определенные задачи проверки, например, следить за тем, чтобы размер входных данных не превышал определенного значения. Это можно сделать с помощью MaxSizeValidator
. Однако, поскольку это будет применяться только к определенным вами полям — и, следовательно, это не помешает злоумышленнику отправить чрезвычайно большое тело запроса, что может привести к потреблению ресурсов сервера таким образом, что приложение может в конечном итоге завершиться сбоем — приведенное ниже включает пользовательский MaxBodySizeValidator
класс, который используется, чтобы убедиться, что размер тела запроса не превышает заранее определенного значения. Оба описанных выше средства проверки решают проблему ограничения размера загружаемого файла (а также всего тела запроса), вероятно, лучшим способом, чем тот, который описан здесь, который использует UploadFile
, и, следовательно, файл должен быть полностью получен и сохранен во временном каталоге перед выполнением проверки (не говоря уже о том, что подход вообще не учитывает размер тела запроса) — использование промежуточного программного обеспечения ASGI, такого как this, было бы альтернативным решением для ограничения размера файла. тело запроса. Также, если вы используете Gunicorn с помощью Uvicorn вы также можете определить ограничения в отношении, например, количества полей заголовка HTTP-запроса в запросе, размера поля заголовка HTTP-запроса и так далее (см. Документацию). Аналогичные ограничения могут применяться при использовании обратных прокси-серверов, таких как Nginx (который также позволяет вам устанавливать максимальный размер тела запроса с помощью client_max_body_size
директивы).
Несколько замечаний к приведенному ниже примеру. Поскольку он использует Request
объект напрямую, а не объекты UploadFile
и Form
, конечная точка не будет должным образом задокументирована в автоматически сгенерированных документах по адресу /docs
(если это вообще важно для вашего приложения). Это также означает, что вы должны самостоятельно выполнить некоторые проверки, например, были ли получены требуемые поля для конечной точки или нет, и были ли они в ожидаемом формате. Например, для data
поля вы могли бы проверить, является ли data.value
пустым или нет (пустое будет означать, что пользователь либо не включил это поле в multipart/form-data
, либо отправил пустое значение), а также, если isinstance(data.value, str)
. Что касается файла (ов), вы можете проверить, не является ли file_.multipart_filename
пустым; однако, поскольку filename
какой-либо пользователь, вероятно, не мог быть включен в Content-Disposition
, вы также можете захотеть проверить, существует ли файл в файловой системе, используя os.path.isfile(filepath)
(Примечание: вам нужно убедиться, что в указанном месте нет ранее существовавшего файла с таким же именем; в противном случае вышеупомянутая функция всегда возвращала бы True
, даже если пользователь не отправлял файл).
Что касается применяемых ограничений по размеру, то MAX_REQUEST_BODY_SIZE
приведенные ниже значения должны быть больше, чем MAX_FILE_SIZE
(плюс все Form
значения size), которые вы ожидаете получить, поскольку исходное тело запроса (которое вы получаете с помощью .stream()
метода) включает в себя еще несколько байтов для --boundary
и Content-Disposition
заголовок для каждого из полей в теле. Следовательно, вам следует добавить еще несколько байт, в зависимости от Form
значений и количества файлов, которые вы ожидаете получить (отсюда MAX_FILE_SIZE + 1024
ниже).
app.py
from fastapi import FastAPI, Request, HTTPException, status
from streaming_form_data import StreamingFormDataParser
from streaming_form_data.targets import FileTarget, ValueTarget
from streaming_form_data.validators import MaxSizeValidator
import streaming_form_data
from starlette.requests import ClientDisconnect
import os
MAX_FILE_SIZE = 1024 * 1024 * 1024 * 4 # = 4GB
MAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024
app = FastAPI()
class MaxBodySizeException(Exception):
def __init__(self, body_len: str):
self.body_len = body_len
class MaxBodySizeValidator:
def __init__(self, max_size: int):
self.body_len = 0
self.max_size = max_size
def __call__(self, chunk: bytes):
self.body_len += len(chunk)
if self.body_len > self.max_size:
raise MaxBodySizeException(body_len=self.body_len)
@app.post('/upload')
async def upload(request: Request):
body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)
filename = request.headers.get('Filename')
if not filename:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail='Filename header is missing')
try:
filepath = os.path.join('./', os.path.basename(filename))
file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))
data = ValueTarget()
parser = StreamingFormDataParser(headers=request.headers)
parser.register('file', file_)
parser.register('data', data)
async for chunk in request.stream():
body_validator(chunk)
parser.data_received(chunk)
except ClientDisconnect:
print("Client Disconnected")
except MaxBodySizeException as e:
raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f'Maximum request body size limit ({MAX_REQUEST_BODY_SIZE} bytes) exceeded ({e.body_len} bytes read)')
except streaming_form_data.validators.ValidationError:
raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f'Maximum file size limit ({MAX_FILE_SIZE} bytes) exceeded')
except Exception:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='There was an error uploading the file')
if not file_.multipart_filename:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail='File is missing')
print(data.value.decode())
print(file_.multipart_filename)
return {"message": f"Successfuly uploaded {filename}"}
Как упоминалось ранее, для загрузки данных (на стороне клиента) вы можете использовать HTTPX
библиотеку, которая по умолчанию поддерживает потоковую загрузку файлов и, таким образом, позволяет отправлять большие потоки / файлы, не загружая их полностью в память. Вы также можете передать дополнительные Form
данные, используя data
аргумент. Ниже пользовательский заголовок, т. Е. Filename
, используется для передачи имени файла серверу, чтобы сервер создавал экземпляр FileTarget
класса с этим именем (вы могли бы использовать X-
префикс для пользовательских заголовков, если хотите; однако это официально больше не рекомендуется).
Чтобы загрузить несколько файлов, используйте заголовок для каждого файла (или используйте случайные имена на стороне сервера, и как только файл будет полностью загружен, вы можете при желании переименовать его, используя file_.multipart_filename
атрибут), передайте список файлов, как описано в документации (Примечание: используйте разные имена полей для каждого файла, чтобы они не перекрывались при их разборе на стороне сервера, например, files = [('file', open('bigFile.zip', 'rb')),('file_2', open('bigFile2.zip', 'rb'))]
, и, наконец, соответствующим образом определите Target
классы на стороне сервера.
test.py
import httpx
import time
url ='http://127.0.0.1:8000/upload'
files = {'file': open('bigFile.zip', 'rb')}
headers={'Filename': 'bigFile.zip'}
data = {'data': 'Hello World!'}
with httpx.Client() as client:
start = time.time()
r = client.post(url, data=data, files=files, headers=headers)
end = time.time()
print(f'Time elapsed: {end - start}s')
print(r.status_code, r.json(), sep=' ')
File
, так и JSON
телоВ случае, если вы хотите загрузить оба файла (ов) и JSON вместо Form
данных, вы можете использовать подход, описанный в Методе 3 этого ответа, что также избавит вас от выполнения ручных проверок полученных Form
полей, как объяснялось ранее (подробнее смотрите Связанный ответ). Для этого внесите следующие изменения в приведенный выше код.
app.py
#...
from fastapi import Form
from pydantic import BaseModel, ValidationError
from typing import Optional
from fastapi.encoders import jsonable_encoder
#...
class Base(BaseModel):
name: str
point: Optional[float] = None
is_accepted: Optional[bool] = False
def checker(data: str = Form(...)):
try:
return Base.parse_raw(data)
except ValidationError as e:
raise HTTPException(detail=jsonable_encoder(e.errors()), status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
@app.post('/upload')
async def upload(request: Request):
#...
# place the below after the try-except block in the example given earlier
model = checker(data.value.decode())
print(dict(model))
test.py
#...
import json
data = {'data': json.dumps({"name": "foo", "point": 0.13, "is_accepted": False})}
#...
File
и Form
данных с использованием UploadFile
и Form
Если вы хотите использовать вместо этого обычную def
конечную точку, смотрите Этот ответ .
app.py
from fastapi import FastAPI, File, UploadFile, Form, HTTPException, status
import aiofiles
import os
CHUNK_SIZE = 1024 * 1024 # adjust the chunk size as desired
app = FastAPI()
@app.post("/upload")
async def upload(file: UploadFile = File(...), data: str = Form(...)):
try:
filepath = os.path.join('./', os.path.basename(file.filename))
async with aiofiles.open(filepath, 'wb') as f:
while chunk := await file.read(CHUNK_SIZE):
await f.write(chunk)
except Exception:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='There was an error uploading the file')
finally:
await file.close()
return {"message": f"Successfuly uploaded {file.filename}"}
Как упоминалось ранее, при использовании этой опции для завершения загрузки файла потребуется больше времени, а поскольку HTTPX
по умолчанию используется тайм-аут в 5 секунд, вы, скорее всего, получите ReadTimeout
исключение (поскольку серверу потребуется некоторое время, чтобы прочитать SpooledTemporaryFile
фрагменты и записать содержимое в постоянное место на диске). Таким образом, вы можете настроить время ожидания (см. Также класс Timeout
в исходном коде), и, более конкретно, read
время ожидания, которое "определяет максимальную продолжительность ожидания получения фрагмента данных (например, фрагмента тела ответа)". Если установить значение None
вместо некоторого положительного числового значения, таймаута не будет read
.
test.py
import httpx
import time
url ='http://127.0.0.1:8000/upload'
files = {'file': open('bigFile.zip', 'rb')}
headers={'Filename': 'bigFile.zip'}
data = {'data': 'Hello World!'}
timeout = httpx.Timeout(None, read=180.0)
with httpx.Client(timeout=timeout) as client:
start = time.time()
r = client.post(url, data=data, files=files, headers=headers)
end = time.time()
print(f'Time elapsed: {end - start}s')
print(r.status_code, r.json(), sep=' ')