
本文详细介绍了在fastapi应用中,如何在处理完post请求后,将服务器上生成的文件(如音频、pdf等)安全、高效地提供给用户下载。文章涵盖了两种主要实现方式:一种是直接通过post请求返回文件下载,另一种是结合前端javascript进行异步文件下载,并深入探讨了`fileresponse`、`streamingresponse`等核心组件的使用,以及文件清理和安全注意事项。
在构建Web应用程序时,一个常见的需求是用户通过表单提交数据后,服务器端进行处理并生成一个文件,然后将该文件提供给用户下载。例如,一个文本转语音服务,用户输入文本后,服务器生成MP3文件并允许用户下载。FastAPI提供了灵活且强大的机制来处理这类场景。
最直接的方法是在处理POST请求的端点中,生成文件后立即将其作为响应返回。FastAPI的FileResponse是实现这一功能的关键组件。
为了确保文件被下载而不是在浏览器中预览,我们需要在响应头中设置Content-Disposition。
app.py 示例:
from fastapi import FastAPI, Request, Form, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.responses import FileResponse, Response, StreamingResponse
import os
import uvicorn
app = FastAPI()
templates = Jinja2Templates(directory="templates")
# 假设文件存储在temp目录下
TEMP_DIR = "./temp"
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
# 模拟文本转语音功能
def text_to_speech_mock(language: str, text: str) -> str:
"""模拟文本转语音,生成一个MP3文件"""
filepath = os.path.join(TEMP_DIR, "welcome.mp3")
# 实际应用中这里会调用gTTS或其他TTS库
with open(filepath, "w") as f:
f.write(f"Mock audio content for '{text}' in '{language}'")
print(f"Generated mock MP3 at: {filepath}")
return filepath
@app.get('/')
async def main(request: Request):
"""首页,提供表单"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post('/text2speech_direct')
async def convert_and_download_direct(
request: Request,
message: str = Form(...),
language: str = Form(...),
background_tasks: BackgroundTasks = None
):
"""
通过POST请求直接处理文本转语音并返回文件下载。
使用Form(...)确保参数被正确解析。
"""
print(f"Received message: {message}, language: {language}")
filepath = text_to_speech_mock(language, message) # 模拟生成文件
filename = os.path.basename(filepath)
headers = {
'Content-Disposition': f'attachment; filename="{filename}"'
}
# 添加后台任务以在文件下载后删除
if background_tasks:
background_tasks.add_task(os.remove, filepath)
print(f"Scheduled deletion for: {filepath}")
return FileResponse(filepath, headers=headers, media_type="audio/mp3")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
templates/index.html 示例:
<!doctype html>
<html>
<head>
<title>Convert Text to Speech (Direct Download)</title>
</head>
<body>
<h2>文本转语音并直接下载</h2>
<form method="post" action="/text2speech_direct">
<label for="message">消息:</label>
<input type="text" id="message" name="message" value="This is a sample message"><br><br>
<label for="language">语言:</label>
<input type="text" id="language" name="language" value="en"><br><br>
<input type="submit" value="提交并下载">
</form>
</body>
</html>注意事项:
在某些特定场景下,FileResponse可能不是最佳选择:
文件内容已完全加载到内存中: 如果文件内容已经以字节流或字符串形式存在于内存中,可以直接使用Response返回。
from fastapi import Response
# ...
@app.post('/text2speech_in_memory')
async def convert_and_download_in_memory(message: str = Form(...), language: str = Form(...)):
# 假设文件内容已在内存中生成
# 例如:contents = generate_audio_bytes(message, language)
filepath = text_to_speech_mock(language, message) # 模拟生成文件
with open(filepath, "rb") as f:
contents = f.read() # 读取文件内容到内存
filename = os.path.basename(filepath)
headers = {'Content-Disposition': f'attachment; filename="{filename}"'}
return Response(contents, headers=headers, media_type='audio/mp3')文件过大,无法一次性加载到内存: 对于非常大的文件(例如,几十GB的文件),StreamingResponse是更好的选择。它允许以小块的形式加载和发送文件内容,避免内存溢出。
from fastapi.responses import StreamingResponse
# ...
@app.post('/text2speech_streaming')
async def convert_and_download_streaming(message: str = Form(...), language: str = Form(...)):
filepath = text_to_speech_mock(language, message) # 模拟生成文件
def iterfile():
with open(filepath, "rb") as f:
yield from f # 逐块读取文件
filename = os.path.basename(filepath)
headers = {'Content-Disposition': f'attachment; filename="{filename}"'}
return StreamingResponse(iterfile(), headers=headers, media_type="audio/mp3")虽然FileResponse也支持分块读取,但其块大小是固定的(64KB)。如果需要自定义块大小以优化性能,StreamingResponse提供了更大的灵活性。
当需要更复杂的交互、处理并发用户请求或动态生成文件名时,将文件生成和文件下载分为两个步骤,并通过前端JavaScript进行协调是更健壮的方案。
此方案的工作流程如下:
app.py 示例:
from fastapi import FastAPI, Request, Form, BackgroundTasks
from fastapi.templating import Jinja2Templates
from fastapi.responses import FileResponse
import uuid
import os
import uvicorn
app = FastAPI()
templates = Jinja2Templates(directory="templates")
TEMP_DIR = "./temp"
if not os.path.exists(TEMP_DIR):
os.makedirs(TEMP_DIR)
# 模拟文件存储的字典 (在实际生产环境中应使用数据库或缓存)
files_to_download = {}
# 模拟文本转语音功能
def text_to_speech_mock_unique(language: str, text: str) -> str:
"""模拟文本转语音,生成一个带UUID的文件名"""
unique_filename = f"welcome_{uuid.uuid4().hex}.mp3"
filepath = os.path.join(TEMP_DIR, unique_filename)
with open(filepath, "w") as f:
f.write(f"Mock audio content for '{text}' in '{language}'")
print(f"Generated mock MP3 at: {filepath}")
return filepath
# 后台任务:删除文件并清除缓存
def remove_file_and_cache(filepath: str, file_id: str):
if os.path.exists(filepath):
os.remove(filepath)
print(f"Deleted file: {filepath}")
if file_id in files_to_download:
del files_to_download[file_id]
print(f"Removed file_id from cache: {file_id}")
@app.get('/')
async def main_async(request: Request):
"""首页,提供表单"""
return templates.TemplateResponse("index.html", {"request": request})
@app.post('/text2speech_async')
async def convert_and_get_download_link(
request: Request,
message: str = Form(...),
language: str = Form(...)
):
"""
处理文本转语音,生成文件,并返回一个下载链接ID。
"""
print(f"Received message for async: {message}, language: {language}")
filepath = text_to_speech_mock_unique(language, message) # 模拟生成文件
file_id = str(uuid.uuid4())
files_to_download[file_id] = filepath # 将文件路径与唯一ID关联
file_url = f'/download_async?fileId={file_id}'
return {"fileURL": file_url}
@app.get('/download_async')
async def download_file_async(
request: Request,
fileId: str,
background_tasks: BackgroundTasks
):
"""
根据文件ID下载文件。
"""
filepath = files_to_download.get(fileId)
if filepath and os.path.exists(filepath):
filename = os.path.basename(filepath)
headers = {'Content-Disposition': f'attachment; filename="{filename}"'}
# 添加后台任务以在文件下载后删除
background_tasks.add_task(remove_file_and_cache, filepath=filepath, file_id=fileId)
print(f"Scheduled deletion for: {filepath} with ID: {fileId}")
return FileResponse(filepath, headers=headers, media_type='audio/mp3')
# 如果文件ID无效或文件不存在,返回404
return Response(status_code=404, content="File not found or expired.")
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)templates/index.html 示例:
<!doctype html>
<html>
<head>
<title>Convert Text to Speech (Async Download)</title>
</head>
<body>
<h2>文本转语音并异步下载</h2>
<form method="post" id="myForm">
<label for="message_async">消息:</label>
<input type="text" id="message_async" name="message" value="This is an async sample message"><br><br>
<label for="language_async">语言:</label>
<input type="text" id="language_async" name="language" value="en"><br><br>
<input type="button" value="提交并获取下载链接" onclick="submitFormAsync()">
</form>
<p><a id="downloadLink" href="" style="display:none;"></a></p>
<script type="text/javascript">
function submitFormAsync() {
var formElement = document.getElementById('myForm');
var data = new FormData(formElement);
fetch('/text2speech_async', {
method: 'POST',
body: data,
})
.then(response => response.json())
.then(data => {
if (data.fileURL) {
var downloadLink = document.getElementById("downloadLink");
downloadLink.href = data.fileURL;
downloadLink.innerHTML = "点击下载文件";
downloadLink.style.display = "block"; // 显示下载链接
} else {
console.error("No fileURL received.");
}
})
.catch(error => {
console.error("Error submitting form:", error);
});
}
</script>
</body>
</html>注意事项:
为了避免服务器上堆积大量临时文件,及时清理已下载或过期的文件至关重要。FastAPI提供了BackgroundTasks来实现这一目的。
BackgroundTasks允许你在HTTP响应发送给客户端之后,在后台执行一些任务。这非常适合文件清理。
对于直接下载方案 (Option 1):
from fastapi import BackgroundTasks
import os
@app.post('/text2speech_direct')
async def convert_and_download_direct(
# ... 其他参数
background_tasks: BackgroundTasks # 注入BackgroundTasks
):
filepath = text_to_speech_mock(language, message)
# ... 其他逻辑
background_tasks.add_task(os.remove, filepath) # 添加删除文件的后台任务
return FileResponse(filepath, headers=headers, media_type="audio/mp3")对于异步下载方案 (Option 2): 在异步下载方案中,由于文件ID和文件路径的映射存储在files_to_download字典中,除了删除文件,还需要清除这个映射。因此,可以定义一个辅助函数来完成这两个任务。
from fastapi import BackgroundTasks
import os
# ... files_to_download 字典定义
def remove_file_and_cache(filepath: str, file_id: str):
"""删除文件并从缓存中移除其ID"""
if os.path.exists(filepath):
os.remove(filepath)
if file_id in files_to_download:
del files_to_download[file_id]
@app.get('/download_async')
async def download_file_async(
# ... 其他参数
background_tasks: BackgroundTasks
):
filepath = files_to_download.get(fileId)
if filepath and os.path.exists(filepath):
# ... 其他逻辑
background_tasks.add_task(remove_file_and_cache, filepath=filepath, file_id=fileId)
return FileResponse(filepath, headers=headers, media_type='audio/mp3')
# ... 错误处理FastAPI提供了多种灵活的方式来处理POST请求后的文件下载需求。
无论选择哪种方式,都应注意以下几点:
通过合理选择和组合这些技术,可以构建出高效、健壮的FastAPI文件下载服务。
以上就是FastAPI POST请求后文件下载指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号