首页 > web前端 > js教程 > 正文

ExpressJs中并发处理异步任务并等待所有Promise完成

碧海醫心
发布: 2025-09-17 11:40:51
原创
982人浏览过

expressjs中并发处理异步任务并等待所有promise完成

本文旨在探讨在ExpressJs应用中如何高效地并发执行多个异步任务,并确保所有Promise都已完成处理后再向客户端发送响应。我们将重点介绍async/await语法与Promise.all()的结合使用,优化异步代码的可读性和健壮性,同时提供错误处理的最佳实践,以确保API行为符合预期。

异步操作与ExpressJs响应机制

在Node.js和ExpressJs开发中,处理I/O密集型或网络请求等异步操作是常态。当一个API请求需要触发多个独立的异步任务(例如,并发请求外部服务并写入文件)时,我们通常希望在所有这些任务都完成后再向客户端返回最终结果或确认信息。然而,如果不正确地管理这些异步操作,服务器可能会在所有任务完成之前就发送响应,导致数据不一致或客户端获取到不完整的信息。

问题分析:为何await Promise.all()未生效

开发者在使用async/await和Promise.all()时,常遇到的一个核心问题是,尽管代码中包含了await Promise.all(tasks);,但Express路由处理函数似乎并未等待所有任务完成。这通常是由于以下两个关键点被忽略:

  1. Express路由处理函数必须标记为async: await关键字只能在async函数内部使用。如果Express的路由处理函数(例如app.post('/', (req, res) => { ... }))没有被标记为async,那么其中的await语句将不会真正暂停函数的执行,而是会直接被解析为一个普通的表达式,导致后续代码立即执行。
  2. Promise的正确返回与错误传播: 确保所有被Promise.all()聚合的Promise都能正确地返回(resolve)或拒绝(reject),并且错误能够被有效地传播。原始的processTask函数在某些情况下可能没有正确地拒绝Promise,或者在fs.writeFile的回调中没有处理错误,导致Promise链断裂或无法被Promise.all()捕获。

解决方案:使用async/await重构异步逻辑

为了确保Express路由能够正确等待所有并发的Promise完成,我们需要对代码进行两方面的优化:

1. 优化 processTask 函数

原始的processTask函数使用了new Promise构造函数和嵌套的.then().catch(),这在现代JavaScript中通常可以通过async/await来简化。同时,需要确保文件写入操作的错误也能被捕获并拒绝Promise。

原始 processTask 示例(问题中的第一版):

function processTask(task: Task, configs: Configs) {
  return new Promise<void>((resolve, reject) => {
    try {
      const fileName = './output/' + task.tag + 's.json';
      fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
        method: 'GET'
      }).then(result => {
        result.json().then(jsonResult => {
          fs.writeFile(fileName, JSON.stringify(jsonResult), function () { // 缺少错误处理
            console.log('finished writing :' + fileName);
            resolve();
          });
        }).catch(err => reject(err));
      }).catch(err => reject(err));
    } catch (err) {
      console.log(err); // 这里的错误不会拒绝外部Promise
    }
  });
}
登录后复制

优化后的 processTask 函数:

使用async/await和fs.promises模块可以大大简化代码,并提供更清晰的错误处理机制。

import * as fs from 'fs/promises'; // 导入fs.promises

async function processTask(task: Task, configs: Configs): Promise<void> {
  try {
    const fileName = './output/' + task.tag + 's.json';

    // 使用 await 等待 fetch 请求完成
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    // 使用 await 等待 JSON 解析完成
    const jsonResult = await result.json();

    // 使用 fs.promises.writeFile 写入文件,它返回一个 Promise
    await fs.writeFile(fileName, JSON.stringify(jsonResult));

    console.log('finished writing :' + fileName);
  } catch (err) {
    // 捕获任何发生在 fetch、json解析或文件写入过程中的错误
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,以便 Promise.all 能够捕获到它
    throw err; 
  }
}
登录后复制

注意事项:

FashionLabs
FashionLabs

AI服装模特、商品图,可商用,低价提升销量神器

FashionLabs 38
查看详情 FashionLabs
  • async函数默认返回一个Promise。当函数正常执行完毕时,Promise会以undefined(如果函数没有明确return值)或return的值来resolve。
  • 当async函数内部抛出错误时,它返回的Promise会自动reject。
  • 我们使用了fs.promises模块,它提供了Promise版本的Node.js文件系统API,避免了回调地狱。

2. 优化 Express 路由处理函数

Express路由处理函数必须被标记为async,才能在其内部正确使用await。

原始 Express 路由处理函数示例(问题中的第二版):

app.post('/',  (req: Request, res: Response) => { // 缺少 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks = []

  tasks = tasksRequest.tasks.map( (t) =>  processTask(t, tasksRequest.configs));

  console.log(tasks);
  Promise.all(tasks).then(res=>{ // 缺少 await
    console.log('After awaiting');
  });
});
登录后复制

优化后的 Express 路由处理函数:

import { Request, Response } from 'express'; // 假设类型定义

app.post('/', async (req: Request, res: Response) => { // 关键:添加 async 关键字
  const tasksRequest = req.body as TasksRequest;
  let tasks: Promise<void>[] = []; // 明确 Promise 类型

  try {
    tasks = tasksRequest.tasks.map((t) => processTask(t, tasksRequest.configs));

    console.log('Starting all tasks...');
    // 关键:使用 await Promise.all() 等待所有 Promise 完成
    await Promise.all(tasks); 
    console.log('After awaiting all tasks.');

    // 所有任务完成后,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error) {
    console.error('An error occurred during task processing:', error);
    // 如果任何一个 Promise 拒绝,Promise.all 会立即拒绝
    // 在这里发送错误响应
    res.status(500).json({ message: 'Failed to process some tasks.', error: error.message });
  }
});
登录后复制

注意事项:

  • app.post('/', async (req, res) => { ... })是确保await在路由处理函数中生效的关键。
  • await Promise.all(tasks);会暂停当前async函数的执行,直到tasks数组中的所有Promise都成功解决,或者其中任何一个Promise被拒绝。
  • 当Promise.all()中的任何一个Promise被拒绝时,Promise.all()自身也会立即拒绝,并抛出第一个拒绝的原因。因此,使用try...catch块来捕获潜在的错误并向客户端发送适当的错误响应至关重要。

完整示例代码

结合上述优化,一个完整的、健壮的Express路由处理并发异步任务的示例如下:

import express, { Request, Response } from 'express';
import * as fs from 'fs/promises'; // 导入fs.promises

const app = express();
app.use(express.json()); // 用于解析请求体

// 假设的类型定义
interface Task {
  tag: string;
  parentResource: string;
  mostRelatedPath: string;
}

interface Configs {
  Host: string;
  APIsBasePrefix: string;
}

interface TasksRequest {
  tasks: Task[];
  configs: Configs;
}

// 异步处理单个任务的函数
async function processTask(task: Task, configs: Configs): Promise<void> {
  try {
    const fileName = `./output/${task.tag}s.json`; // 使用模板字符串更简洁

    // 模拟外部 API 请求
    const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, {
      method: 'GET'
    });

    if (!result.ok) {
      throw new Error(`HTTP error! status: ${result.status}`);
    }

    const jsonResult = await result.json();

    // 确保 output 目录存在
    const outputDir = './output';
    await fs.mkdir(outputDir, { recursive: true });

    // 写入文件
    await fs.writeFile(fileName, JSON.stringify(jsonResult, null, 2)); // 美化JSON输出

    console.log(`Finished writing: ${fileName}`);
  } catch (err) {
    console.error(`Error processing task ${task.tag}:`, err);
    // 重新抛出错误,让调用者(Promise.all)能够捕获
    throw err; 
  }
}

// Express POST 路由处理函数
app.post('/', async (req: Request, res: Response) => {
  const tasksRequest = req.body as TasksRequest;

  if (!tasksRequest || !tasksRequest.tasks || !Array.isArray(tasksRequest.tasks) || !tasksRequest.configs) {
    return res.status(400).json({ message: 'Invalid request body.' });
  }

  const tasksPromises: Promise<void>[] = [];

  try {
    // 为每个任务创建并收集 Promise
    for (const t of tasksRequest.tasks) {
      tasksPromises.push(processTask(t, tasksRequest.configs));
    }

    console.log(`Processing ${tasksPromises.length} tasks concurrently...`);
    // 等待所有任务 Promise 完成
    await Promise.all(tasksPromises);
    console.log('All tasks completed successfully.');

    // 所有任务成功完成,发送成功响应
    res.status(200).json({ message: 'All tasks processed successfully.' });

  } catch (error: any) {
    // 捕获 Promise.all 中任何一个任务的错误
    console.error('An error occurred during concurrent task processing:', error);
    res.status(500).json({ 
      message: 'Failed to process some tasks.', 
      error: error.message,
      details: error.stack // 生产环境不建议直接暴露堆栈信息
    });
  }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

// 示例用法 (假设在其他地方调用此API)
// curl -X POST -H "Content-Type: application/json" -d '{
//   "tasks": [
//     {"tag": "user", "parentResource": "/api/v1/", "mostRelatedPath": "users"},
//     {"tag": "product", "parentResource": "/api/v1/", "mostRelatedPath": "products"}
//   ],
//   "configs": {
//     "Host": "https://jsonplaceholder.typicode.com",
//     "APIsBasePrefix": "/"
//   }
// }' http://localhost:3000/
登录后复制

总结与最佳实践

在ExpressJs中处理并发异步任务并确保所有Promise完成,核心在于正确利用JavaScript的async/await语法和Promise.all()方法:

  1. 标记async路由处理函数: 任何包含await关键字的Express路由处理函数都必须用async关键字标记。
  2. 优化异步函数: 将复杂的.then().catch()链重构为更简洁、更易读的async/await模式。
  3. 使用Promise.all()并发执行: 对于多个相互独立的异步任务,使用Promise.all()可以高效地并发执行它们,并等待所有任务完成。
  4. 健壮的错误处理: 在async函数内部使用try...catch捕获并传播错误。在Express路由处理函数中,使用try...catch包裹await Promise.all(),以便在任何一个并发任务失败时能够捕获错误并向客户端发送适当的错误响应(例如500 Internal Server Error)。
  5. 利用Promise-based API: 优先使用返回Promise的API(如fetch、fs.promises),而不是基于回调的API,以更好地融入async/await生态。

通过遵循这些实践,开发者可以构建出更稳定、更易维护的ExpressJs应用,有效管理复杂的异步流程。

以上就是ExpressJs中并发处理异步任务并等待所有Promise完成的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号