
在Node.js和ExpressJs开发中,处理I/O密集型或网络请求等异步操作是常态。当一个API请求需要触发多个独立的异步任务(例如,并发请求外部服务并写入文件)时,我们通常希望在所有这些任务都完成后再向客户端返回最终结果或确认信息。然而,如果不正确地管理这些异步操作,服务器可能会在所有任务完成之前就发送响应,导致数据不一致或客户端获取到不完整的信息。
开发者在使用async/await和Promise.all()时,常遇到的一个核心问题是,尽管代码中包含了await Promise.all(tasks);,但Express路由处理函数似乎并未等待所有任务完成。这通常是由于以下两个关键点被忽略:
为了确保Express路由能够正确等待所有并发的Promise完成,我们需要对代码进行两方面的优化:
原始的processTask函数使用了new Promise构造函数和嵌套的.then().catch(),这在现代JavaScript中通常可以通过async/await来简化。同时,需要确保文件写入操作的错误也能被捕获并拒绝Promise。
原始 processTask 示例(问题中的第一版):
function processTask(task: Task, configs: Configs) {
return new Promise<void>((resolve, reject) => {
try {
const fileName = './output/' + task.tag + 's.json';
fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
method: 'GET'
}).then(result => {
result.json().then(jsonResult => {
fs.writeFile(fileName, JSON.stringify(jsonResult), function () { // 缺少错误处理
console.log('finished writing :' + fileName);
resolve();
});
}).catch(err => reject(err));
}).catch(err => reject(err));
} catch (err) {
console.log(err); // 这里的错误不会拒绝外部Promise
}
});
}优化后的 processTask 函数:
使用async/await和fs.promises模块可以大大简化代码,并提供更清晰的错误处理机制。
import * as fs from 'fs/promises'; // 导入fs.promises
async function processTask(task: Task, configs: Configs): Promise<void> {
try {
const fileName = './output/' + task.tag + 's.json';
// 使用 await 等待 fetch 请求完成
const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
method: 'GET'
});
// 使用 await 等待 JSON 解析完成
const jsonResult = await result.json();
// 使用 fs.promises.writeFile 写入文件,它返回一个 Promise
await fs.writeFile(fileName, JSON.stringify(jsonResult));
console.log('finished writing :' + fileName);
} catch (err) {
// 捕获任何发生在 fetch、json解析或文件写入过程中的错误
console.error(`Error processing task ${task.tag}:`, err);
// 重新抛出错误,以便 Promise.all 能够捕获到它
throw err;
}
}注意事项:
Express路由处理函数必须被标记为async,才能在其内部正确使用await。
原始 Express 路由处理函数示例(问题中的第二版):
app.post('/', (req: Request, res: Response) => { // 缺少 async 关键字
const tasksRequest = req.body as TasksRequest;
let tasks = []
tasks = tasksRequest.tasks.map( (t) => processTask(t, tasksRequest.configs));
console.log(tasks);
Promise.all(tasks).then(res=>{ // 缺少 await
console.log('After awaiting');
});
});优化后的 Express 路由处理函数:
import { Request, Response } from 'express'; // 假设类型定义
app.post('/', async (req: Request, res: Response) => { // 关键:添加 async 关键字
const tasksRequest = req.body as TasksRequest;
let tasks: Promise<void>[] = []; // 明确 Promise 类型
try {
tasks = tasksRequest.tasks.map((t) => processTask(t, tasksRequest.configs));
console.log('Starting all tasks...');
// 关键:使用 await Promise.all() 等待所有 Promise 完成
await Promise.all(tasks);
console.log('After awaiting all tasks.');
// 所有任务完成后,发送成功响应
res.status(200).json({ message: 'All tasks processed successfully.' });
} catch (error) {
console.error('An error occurred during task processing:', error);
// 如果任何一个 Promise 拒绝,Promise.all 会立即拒绝
// 在这里发送错误响应
res.status(500).json({ message: 'Failed to process some tasks.', error: error.message });
}
});注意事项:
结合上述优化,一个完整的、健壮的Express路由处理并发异步任务的示例如下:
import express, { Request, Response } from 'express';
import * as fs from 'fs/promises'; // 导入fs.promises
const app = express();
app.use(express.json()); // 用于解析请求体
// 假设的类型定义
interface Task {
tag: string;
parentResource: string;
mostRelatedPath: string;
}
interface Configs {
Host: string;
APIsBasePrefix: string;
}
interface TasksRequest {
tasks: Task[];
configs: Configs;
}
// 异步处理单个任务的函数
async function processTask(task: Task, configs: Configs): Promise<void> {
try {
const fileName = `./output/${task.tag}s.json`; // 使用模板字符串更简洁
// 模拟外部 API 请求
const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
method: 'GET'
});
if (!result.ok) {
throw new Error(`HTTP error! status: ${result.status}`);
}
const jsonResult = await result.json();
// 确保 output 目录存在
const outputDir = './output';
await fs.mkdir(outputDir, { recursive: true });
// 写入文件
await fs.writeFile(fileName, JSON.stringify(jsonResult, null, 2)); // 美化JSON输出
console.log(`Finished writing: ${fileName}`);
} catch (err) {
console.error(`Error processing task ${task.tag}:`, err);
// 重新抛出错误,让调用者(Promise.all)能够捕获
throw err;
}
}
// Express POST 路由处理函数
app.post('/', async (req: Request, res: Response) => {
const tasksRequest = req.body as TasksRequest;
if (!tasksRequest || !tasksRequest.tasks || !Array.isArray(tasksRequest.tasks) || !tasksRequest.configs) {
return res.status(400).json({ message: 'Invalid request body.' });
}
const tasksPromises: Promise<void>[] = [];
try {
// 为每个任务创建并收集 Promise
for (const t of tasksRequest.tasks) {
tasksPromises.push(processTask(t, tasksRequest.configs));
}
console.log(`Processing ${tasksPromises.length} tasks concurrently...`);
// 等待所有任务 Promise 完成
await Promise.all(tasksPromises);
console.log('All tasks completed successfully.');
// 所有任务成功完成,发送成功响应
res.status(200).json({ message: 'All tasks processed successfully.' });
} catch (error: any) {
// 捕获 Promise.all 中任何一个任务的错误
console.error('An error occurred during concurrent task processing:', error);
res.status(500).json({
message: 'Failed to process some tasks.',
error: error.message,
details: error.stack // 生产环境不建议直接暴露堆栈信息
});
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
// 示例用法 (假设在其他地方调用此API)
// curl -X POST -H "Content-Type: application/json" -d '{
// "tasks": [
// {"tag": "user", "parentResource": "/api/v1/", "mostRelatedPath": "users"},
// {"tag": "product", "parentResource": "/api/v1/", "mostRelatedPath": "products"}
// ],
// "configs": {
// "Host": "https://jsonplaceholder.typicode.com",
// "APIsBasePrefix": "/"
// }
// }' http://localhost:3000/在ExpressJs中处理并发异步任务并确保所有Promise完成,核心在于正确利用JavaScript的async/await语法和Promise.all()方法:
通过遵循这些实践,开发者可以构建出更稳定、更易维护的ExpressJs应用,有效管理复杂的异步流程。
以上就是ExpressJs中并发处理异步任务并等待所有Promise完成的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号