0

0

ExpressJs中并发处理异步任务并等待所有Promise完成

碧海醫心

碧海醫心

发布时间:2025-09-17 11:40:51

|

991人浏览过

|

来源于php中文网

原创

expressjs中并发处理异步任务并等待所有promise完成

本文旨在探讨在ExpressJs应用中如何高效地并发执行多个异步任务,并确保所有Promise都已完成处理后再向客户端发送响应。我们将重点介绍async/await语法与Promise.all()的结合使用,优化异步代码的可读性和健壮性,同时提供错误处理的最佳实践,以确保API行为符合预期。

异步操作与ExpressJs响应机制

在Node.js和ExpressJs开发中,处理I/O密集型或网络请求等异步操作是常态。当一个API请求需要触发多个独立的异步任务(例如,并发请求外部服务并写入文件)时,我们通常希望在所有这些任务都完成后再向客户端返回最终结果或确认信息。然而,如果不正确地管理这些异步操作,服务器可能会在所有任务完成之前就发送响应,导致数据不一致或客户端获取到不完整的信息。

问题分析:为何await Promise.all()未生效

开发者在使用async/await和Promise.all()时,常遇到的一个核心问题是,尽管代码中包含了await Promise.all(tasks);,但Express路由处理函数似乎并未等待所有任务完成。这通常是由于以下两个关键点被忽略:

  1. Express路由处理函数必须标记为async: await关键字只能在async函数内部使用。如果Express的路由处理函数(例如app.post('/', (req, res) => { ... }))没有被标记为async,那么其中的await语句将不会真正暂停函数的执行,而是会直接被解析为一个普通的表达式,导致后续代码立即执行。
  2. Promise的正确返回与错误传播: 确保所有被Promise.all()聚合的Promise都能正确地返回(resolve)或拒绝(reject),并且错误能够被有效地传播。原始的processTask函数在某些情况下可能没有正确地拒绝Promise,或者在fs.writeFile的回调中没有处理错误,导致Promise链断裂或无法被Promise.all()捕获。

解决方案:使用async/await重构异步逻辑

为了确保Express路由能够正确等待所有并发的Promise完成,我们需要对代码进行两方面的优化:

1. 优化 processTask 函数

原始的processTask函数使用了new Promise构造函数和嵌套的.then().catch(),这在现代JavaScript中通常可以通过async/await来简化。同时,需要确保文件写入操作的错误也能被捕获并拒绝Promise。

原始 processTask 示例(问题中的第一版):

function processTask(task: Task, configs: Configs) {
  return new Promise((resolve, reject) => {
    try {
      const fileName = './output/' + task.tag + 's.json';
      fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
        method: 'GET'
      }).then(result => {
        result.json().then(jsonResult => {
          fs.writeFile(fileName, JSON.stringify(jsonResult), function () { // 缺少错误处理
            console.log('finished writing :' + fileName);
            resolve();
          });
        }).catch(err => reject(err));
      }).catch(err => reject(err));
    } catch (err) {
      console.log(err); // 这里的错误不会拒绝外部Promise
    }
  });
}

优化后的 processTask 函数:

使用async/await和fs.promises模块可以大大简化代码,并提供更清晰的错误处理机制。

import * as fs from 'fs/promises'; // 导入fs.promises

async function processTask(task: Task, configs: Configs): Promise {
  try {
    const fileName = './output/' + task.tag + 's.json';

    // 使用 await 等待 fetch 请求完成
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    // 使用 await 等待 JSON 解析完成
    const jsonResult = await result.json();

    // 使用 fs.promises.writeFile 写入文件,它返回一个 Promise
    await fs.writeFile(fileName, JSON.stringify(jsonResult));

    console.log('finished writing :' + fileName);
  } catch (err) {
    // 捕获任何发生在 fetch、json解析或文件写入过程中的错误
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,以便 Promise.all 能够捕获到它
    throw err; 
  }
}

注意事项:

Lobe
Lobe

微软旗下的一个训练器学习模型的平台

下载
  • async函数默认返回一个Promise。当函数正常执行完毕时,Promise会以undefined(如果函数没有明确return值)或return的值来resolve。
  • 当async函数内部抛出错误时,它返回的Promise会自动reject。
  • 我们使用了fs.promises模块,它提供了Promise版本的Node.js文件系统API,避免了回调地狱。

2. 优化 Express 路由处理函数

Express路由处理函数必须被标记为async,才能在其内部正确使用await。

原始 Express 路由处理函数示例(问题中的第二版):

app.post('/',  (req: Request, res: Response) => { // 缺少 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks = []

  tasks = tasksRequest.tasks.map( (t) =>  processTask(t, tasksRequest.configs));

  console.log(tasks);
  Promise.all(tasks).then(res=>{ // 缺少 await
    console.log('After awaiting');
  });
});

优化后的 Express 路由处理函数:

import { Request, Response } from 'express'; // 假设类型定义

app.post('/', async (req: Request, res: Response) => { // 关键:添加 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks: Promise[] = []; // 明确 Promise 类型

  try {
    tasks = tasksRequest.tasks.map((t) => processTask(t, tasksRequest.configs));

    console.log('Starting all tasks...');
    // 关键:使用 await Promise.all() 等待所有 Promise 完成
    await Promise.all(tasks); 
    console.log('After awaiting all tasks.');

    // 所有任务完成后,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error) {
    console.error('An error occurred during task processing:', error);
    // 如果任何一个 Promise 拒绝,Promise.all 会立即拒绝
    // 在这里发送错误响应
    res.status(500).json({ message: 'Failed to process some tasks.', error: error.message });
  }
});

注意事项:

  • app.post('/', async (req, res) => { ... })是确保await在路由处理函数中生效的关键。
  • await Promise.all(tasks);会暂停当前async函数的执行,直到tasks数组中的所有Promise都成功解决,或者其中任何一个Promise被拒绝。
  • 当Promise.all()中的任何一个Promise被拒绝时,Promise.all()自身也会立即拒绝,并抛出第一个拒绝的原因。因此,使用try...catch块来捕获潜在的错误并向客户端发送适当的错误响应至关重要。

完整示例代码

结合上述优化,一个完整的、健壮的Express路由处理并发异步任务的示例如下:

import express, { Request, Response } from 'express';
import * as fs from 'fs/promises'; // 导入fs.promises

const app = express();
app.use(express.json()); // 用于解析请求体

// 假设的类型定义
interface Task {
  tag: string;
  parentResource: string;
  mostRelatedPath: string;
}

interface Configs {
  Host: string;
  APIsBasePrefix: string;
}

interface TasksRequest {
  tasks: Task[];
  configs: Configs;
}

// 异步处理单个任务的函数
async function processTask(task: Task, configs: Configs): Promise {
  try {
    const fileName = `./output/${task.tag}s.json`; // 使用模板字符串更简洁

    // 模拟外部 API 请求
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    if (!result.ok) {
      throw new Error(`HTTP error! status: ${result.status}`);
    }

    const jsonResult = await result.json();

    // 确保 output 目录存在
    const outputDir = './output';
    await fs.mkdir(outputDir, { recursive: true });

    // 写入文件
    await fs.writeFile(fileName, JSON.stringify(jsonResult, null, 2)); // 美化JSON输出

    console.log(`Finished writing: ${fileName}`);
  } catch (err) {
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,让调用者(Promise.all)能够捕获
    throw err; 
  }
}

// Express POST 路由处理函数
app.post('/', async (req: Request, res: Response) => {
  const tasksRequest = req.body as TasksRequest;

  if (!tasksRequest || !tasksRequest.tasks || !Array.isArray(tasksRequest.tasks) || !tasksRequest.configs) {
    return res.status(400).json({ message: 'Invalid request body.' });
  }

  const tasksPromises: Promise[] = [];

  try {
    // 为每个任务创建并收集 Promise
    for (const t of tasksRequest.tasks) {
      tasksPromises.push(processTask(t, tasksRequest.configs));
    }

    console.log(`Processing ${tasksPromises.length} tasks concurrently...`);
    // 等待所有任务 Promise 完成
    await Promise.all(tasksPromises);
    console.log('All tasks completed successfully.');

    // 所有任务成功完成,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error: any) {
    // 捕获 Promise.all 中任何一个任务的错误
    console.error('An error occurred during concurrent task processing:', error);
    res.status(500).json({ 
      message: 'Failed to process some tasks.', 
      error: error.message,
      details: error.stack // 生产环境不建议直接暴露堆栈信息
    });
  }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

// 示例用法 (假设在其他地方调用此API)
// curl -X POST -H "Content-Type: application/json" -d '{
//   "tasks": [
//     {"tag": "user", "parentResource": "/api/v1/", "mostRelatedPath": "users"},
//     {"tag": "product", "parentResource": "/api/v1/", "mostRelatedPath": "products"}
//   ],
//   "configs": {
//     "Host": "https://jsonplaceholder.typicode.com",
//     "APIsBasePrefix": "/"
//   }
// }' http://localhost:3000/

总结与最佳实践

在ExpressJs中处理并发异步任务并确保所有Promise完成,核心在于正确利用JavaScript的async/await语法和Promise.all()方法:

  1. 标记async路由处理函数: 任何包含await关键字的Express路由处理函数都必须用async关键字标记。
  2. 优化异步函数: 将复杂的.then().catch()链重构为更简洁、更易读的async/await模式。
  3. 使用Promise.all()并发执行: 对于多个相互独立的异步任务,使用Promise.all()可以高效地并发执行它们,并等待所有任务完成。
  4. 健壮的错误处理: 在async函数内部使用try...catch捕获并传播错误。在Express路由处理函数中,使用try...catch包裹await Promise.all(),以便在任何一个并发任务失败时能够捕获错误并向客户端发送适当的错误响应(例如500 Internal Server Error)。
  5. 利用Promise-based API: 优先使用返回Promise的API(如fetch、fs.promises),而不是基于回调的API,以更好地融入async/await生态。

通过遵循这些实践,开发者可以构建出更稳定、更易维护的ExpressJs应用,有效管理复杂的异步流程。

相关专题

更多
js获取数组长度的方法
js获取数组长度的方法

在js中,可以利用array对象的length属性来获取数组长度,该属性可设置或返回数组中元素的数目,只需要使用“array.length”语句即可返回表示数组对象的元素个数的数值,也就是长度值。php中文网还提供JavaScript数组的相关下载、相关课程等内容,供大家免费下载使用。

558

2023.06.20

js刷新当前页面
js刷新当前页面

js刷新当前页面的方法:1、reload方法,该方法强迫浏览器刷新当前页面,语法为“location.reload([bForceGet]) ”;2、replace方法,该方法通过指定URL替换当前缓存在历史里(客户端)的项目,因此当使用replace方法之后,不能通过“前进”和“后退”来访问已经被替换的URL,语法为“location.replace(URL) ”。php中文网为大家带来了js刷新当前页面的相关知识、以及相关文章等内容

416

2023.07.04

js四舍五入
js四舍五入

js四舍五入的方法:1、tofixed方法,可把 Number 四舍五入为指定小数位数的数字;2、round() 方法,可把一个数字舍入为最接近的整数。php中文网为大家带来了js四舍五入的相关知识、以及相关文章等内容

756

2023.07.04

js删除节点的方法
js删除节点的方法

js删除节点的方法有:1、removeChild()方法,用于从父节点中移除指定的子节点,它需要两个参数,第一个参数是要删除的子节点,第二个参数是父节点;2、parentNode.removeChild()方法,可以直接通过父节点调用来删除子节点;3、remove()方法,可以直接删除节点,而无需指定父节点;4、innerHTML属性,用于删除节点的内容。

479

2023.09.01

JavaScript转义字符
JavaScript转义字符

JavaScript中的转义字符是反斜杠和引号,可以在字符串中表示特殊字符或改变字符的含义。本专题为大家提供转义字符相关的文章、下载、课程内容,供大家免费下载体验。

534

2023.09.04

js生成随机数的方法
js生成随机数的方法

js生成随机数的方法有:1、使用random函数生成0-1之间的随机数;2、使用random函数和特定范围来生成随机整数;3、使用random函数和round函数生成0-99之间的随机整数;4、使用random函数和其他函数生成更复杂的随机数;5、使用random函数和其他函数生成范围内的随机小数;6、使用random函数和其他函数生成范围内的随机整数或小数。

1091

2023.09.04

如何启用JavaScript
如何启用JavaScript

JavaScript启用方法有内联脚本、内部脚本、外部脚本和异步加载。详细介绍:1、内联脚本是将JavaScript代码直接嵌入到HTML标签中;2、内部脚本是将JavaScript代码放置在HTML文件的`<script>`标签中;3、外部脚本是将JavaScript代码放置在一个独立的文件;4、外部脚本是将JavaScript代码放置在一个独立的文件。

659

2023.09.12

Js中Symbol类详解
Js中Symbol类详解

javascript中的Symbol数据类型是一种基本数据类型,用于表示独一无二的值。Symbol的特点:1、独一无二,每个Symbol值都是唯一的,不会与其他任何值相等;2、不可变性,Symbol值一旦创建,就不能修改或者重新赋值;3、隐藏性,Symbol值不会被隐式转换为其他类型;4、无法枚举,Symbol值作为对象的属性名时,默认是不可枚举的。

554

2023.09.20

c++ 根号
c++ 根号

本专题整合了c++根号相关教程,阅读专题下面的文章了解更多详细内容。

44

2026.01.23

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 4.1万人学习

TypeScript 教程
TypeScript 教程

共19课时 | 2.4万人学习

Bootstrap 5教程
Bootstrap 5教程

共46课时 | 3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号