大文件分片上传通过切块避免内存溢出,客户端逐片上传并携带元数据,服务端按序合并;核心是生成器读取、分片传输与完整校验,支持断点续传和进度跟踪,确保GB级文件稳定上传。

大文件上传在Web开发中很常见,直接上传可能因内存占用高或网络中断导致失败。Python中实现大文件分片上传,核心思路是将文件切块、逐个上传、服务端合并。以下是具体处理方法。
1. 文件分片读取
避免一次性加载整个文件到内存,使用生成器按固定大小读取文件块。
- 设定分片大小(如5MB)
- 通过循环读取文件流,每次返回一部分数据
- 适合任意大小的文件,内存占用稳定
示例代码:
def read_file_chunks(file_path, chunk_size=5 * 1024 * 1024):
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
2. 客户端上传分片
每上传一个分片,携带必要信息:文件名、分片序号、总分片数等,便于服务端识别和重组。
立即学习“Python免费学习笔记(深入)”;
- 使用requests库发送POST请求
- 每个请求附带当前分片数据及元信息(可放在form-data中)
- 支持断点续传时,可先查询已上传的分片
上传示例:
模板采用响应式设计,自动适应手机,电脑及平板显示;满足单一店铺外卖需求。功能:1.菜单分类管理2.菜品管理:菜品增加,删除,修改3.订单管理4.友情链接管理5.数据库备份6.文章模块:如:促销活动,帮助中心7.单页模块:如:企业信息,关于我们更强大的功能在开发中……安装方法:上传到网站根目录,运行http://www.***.com/install 自动
import requestsfor index, chunk in enumerate(read_file_chunks('largefile.zip')): files = {'file': ('chunk%d' % index, chunk)} data = { 'filename': 'large_file.zip', 'chunk_index': index, 'total_chunks': 10 # 可预先计算 } response = requests.post('https://www.php.cn/link/c0dba5809c620f70942856ad09b144d0', data=data, files=files) if response.status_code != 200: print(f"上传失败: 分片 {index}") break
3. 服务端接收与合并
服务端需保存每个分片,待全部接收后按顺序合并。
- 根据文件名创建临时目录存放分片
- 收到分片后以序号命名存储(如 part_0, part_1)
- 检查是否所有分片已上传,自动触发合并
Flask 示例处理逻辑:
from flask import Flask, request import osapp = Flask(name) UPLOAD_DIR = '/tmp/uploads' CHUNKS_DIR = '/tmp/chunks'
@app.route('/upload', methods=['POST']) def handle_upload(): file = request.files['file'] filename = request.form['filename'] chunk_index = int(request.form['chunk_index']) total_chunks = int(request.form['total_chunks'])
chunk_dir = os.path.join(CHUNKS_DIR, filename + "_parts") os.makedirs(chunk_dir, exist_ok=True) chunk_path = os.path.join(chunk_dir, f"part_{chunk_index}") file.save(chunk_path) # 检查是否全部上传完成 if all(os.path.exists(os.path.join(chunk_dir, f"part_{i}")) for i in range(total_chunks)): merge_files(chunk_dir, os.path.join(UPLOAD_DIR, filename)) cleanup(chunk_dir) # 删除分片 return "OK", 200def merge_files(chunk_dir, target_path): with open(target_path, 'wb') as f: for i in sorted(os.listdir(chunk_dir)): part_path = os.path.join(chunk_dir, i) with open(part_path, 'rb') as part: f.write(part.read())
4. 增强稳定性与用户体验
实际应用中还需考虑错误重试、进度展示、唯一标识等问题。
- 为每个上传任务生成唯一ID(如UUID),避免文件名冲突
- 记录上传状态,支持断点续传
- 添加MD5校验,确保合并后文件完整性
- 前端可轮询或WebSocket获取上传进度
基本上就这些。关键在于分片读取不占内存、传输信息完整、服务端可靠合并。实现后能稳定上传GB级文件。不复杂但容易忽略细节。









