首页 > web前端 > js教程 > 正文

Node.js 流式处理CSV与API限速的异步控制策略

花韻仙語
发布: 2025-10-30 11:05:01
原创
787人浏览过

Node.js 流式处理CSV与API限速的异步控制策略

本文深入探讨了在node.js中使用文件流处理csv数据并按行调用外部api时,如何有效管理api请求速率限制的问题。通过分析常见错误模式,文章提出了利用 `for await...of` 循环结合 `csv-parse` 库来顺序控制异步操作的解决方案,从而避免api过载,确保数据处理的稳定性和可靠性。

在现代应用开发中,处理大量数据文件(如CSV)并与外部API交互是常见需求。然而,外部API通常设有速率限制,若不加以控制,短时间内发起过多请求会导致API拒绝服务,甚至封禁IP。Node.js的非阻塞I/O和流(Stream)特性使得处理大文件变得高效,但结合异步操作时,需要特别注意并发控制。

问题场景与常见误区

设想这样一个场景:你需要读取一个大型CSV文件,文件的每一行都包含调用某个外部API所需的信息。为了避免API过载,每次API调用后都需要等待一段随机时间。一个直观的尝试可能是使用 fs.createReadStream 管道 csv-parse,然后在 on('data') 事件中执行异步操作并引入延迟:

const fs = require('fs');
const { parse } = require('csv-parse');
const { stringify } = require('csv-stringify');

// 辅助函数:生成指定范围内的随机整数
function randomIntFromInterval(min, max) {
  return Math.floor(Math.random() * (max - min + 1) + min);
}

// 辅助函数:创建Promise延迟
function timeout() {
  return new Promise((resolve) =>
    setTimeout(resolve, randomIntFromInterval(3000, 39990)) // 3秒到近40秒的随机延迟
  );
}

// 模拟API调用
async function callAPI(firstName) {
  console.log(`Calling API for: ${firstName}`);
  await timeout(); // 模拟API响应时间
  return `Info for ${firstName}`;
}

// 处理单行数据
async function processData(row, data) {
  const firstName = row[0];
  const infos = await callAPI(firstName);
  // 再次引入延迟,避免后续处理过快,但这里不是关键
  await timeout(); 
  const newRow = [firstName, infos];
  data.push(newRow);
  return data;
}

function processCsvWithFloodingRisk() {
  return new Promise((resolve, reject) => {
    const promises = [];
    const processedData = []; // 用于存储处理后的数据

    fs.createReadStream("./input.csv")
      .pipe(parse({ delimiter: ",", from_line: 1 }))
      .on("data", async (row) => {
        // 问题所在:on('data') 事件是异步触发的,每次触发都会立即执行
        // processData并将其Promise推入数组,而不会等待上一个Promise完成。
        // 即使这里有 await timeout(),它也只延迟了当前 on('data') 回调的后续代码,
        // 而不是阻止下一个 on('data') 事件的触发。
        promises.push(processData(row, processedData));
        await timeout(); // 这个延迟仅影响当前 on('data') 回调的内部流程
      })
      .on("end", async () => {
        console.log("Stream ended, waiting for all promises to resolve...");
        await Promise.all(promises); // 此时所有API调用可能已经并发发出
        stringify(processedData, (err, output) => {
          if (err) return reject(err);
          fs.writeFileSync("output.csv", output);
          console.log("Output written to output.csv");
          resolve();
        });
      })
      .on("error", function (error) {
        console.error("Stream error:", error.message);
        reject(error);
      });
  });
}

// 假设 input.csv 存在,内容类似:
// Name,Age
// Alice,30
// Bob,25
// Charlie,35
// ...
// processCsvWithFloodingRisk().catch(console.error);
登录后复制

上述代码的问题在于,on('data') 事件是流在读取到足够数据块时异步触发的。这意味着,当一个 on('data') 回调正在执行其 async 逻辑(包括 await timeout())时,流可能已经读取了更多数据,并触发了下一个 on('data') 事件。结果就是,所有的 processData Promise几乎同时被创建并开始执行,导致API在短时间内接收到大量请求,从而引发限速问题。

解决方案:利用 for await...of 控制异步流

要解决这个问题,我们需要一种机制来暂停流的读取,直到当前的异步操作完成。Node.js 10及更高版本引入的 for await...of 循环正是为此而生。它可以直接迭代异步可迭代对象(如可读流),并在每次迭代时等待异步操作完成。

星流
星流

LiblibAI推出的一站式AI图像创作平台

星流85
查看详情 星流

以下是修正后的代码示例:

const fs = require('fs');
const { parse } = require('csv-parse');
const { stringify } = require('csv-stringify');

// 辅助函数:生成指定范围内的随机整数
function randomIntFromInterval(min, max) {
  return Math.floor(Math.random() * (max - min + 1) + min);
}

// 辅助函数:创建Promise延迟
function timeout() {
  return new Promise((resolve) =>
    setTimeout(resolve, randomIntFromInterval(3000, 39990)) // 3秒到近40秒的随机延迟
  );
}

// 模拟API调用
async function callAPI(firstName) {
  console.log(`Calling API for: ${firstName} at ${new Date().toLocaleTimeString()}`);
  await timeout(); // 模拟API响应时间
  return `Info for ${firstName}`;
}

// 处理单行数据
async function processData(row) {
  const firstName = row[0];
  const infos = await callAPI(firstName);
  return [firstName, infos];
}

async function processCsvWithRateLimitControl() {
  const readStream = fs.createReadStream('./input.csv');
  const writeStream = fs.createWriteStream('output.csv');
  const parser = parse({ delimiter: ',', from_line: 1 });

  const processedRows = []; // 存储处理后的行数据

  // 将读取流管道到解析器
  readStream.pipe(parser);

  try {
    // 使用 for await...of 迭代解析器,它是一个异步可迭代对象
    for await (const row of parser) {
      console.log(`Processing row: ${row[0]}`);
      const newRow = await processData(row); // 顺序处理每一行,等待API调用完成
      processedRows.push(newRow);
      await timeout(); // 在处理下一行之前引入延迟,严格控制请求速率
    }

    // 所有行处理完毕后,将结果写入CSV
    stringify(processedRows, (err, output) => {
      if (err) {
        console.error("Error stringifying data:", err);
        return;
      }
      writeStream.write(output);
      writeStream.end();
      console.log('Output written to output.csv. OK');
    });

  } catch (error) {
    console.error("An error occurred during CSV processing:", error.message);
  } finally {
    // 确保流被关闭
    readStream.destroy();
    writeStream.end();
  }
}

// 启动处理流程
processCsvWithRateLimitControl().catch(console.error);

// 示例 input.csv 内容:
// Name,Age
// Alice,30
// Bob,25
// Charlie,35
// David,40
// Eve,28
登录后复制

核心改进点解析

  1. for await...of 循环: 这是解决问题的关键。当 parser 对象被 for await...of 迭代时,它会按顺序吐出解析后的行。每次迭代都会等待 await processData(row) 完成,然后等待 await timeout() 完成,才会进入下一次迭代,从而确保了API调用的严格顺序和速率控制。
  2. processData 函数: 现在 processData 函数可以直接返回处理后的单行数据,而不再需要操作一个外部的 data 数组,使得函数职责更单一。
  3. 延迟位置: await timeout() 被放置在 for await (const row of parser) 循环的内部,processData 调用之后。这意味着在处理完当前行并调用API后,程序会暂停一段随机时间,才会继续从流中获取下一行数据并处理。这有效地控制了API请求的频率。
  4. Promise.all 的移除: 在这种顺序处理的场景下,不再需要 Promise.all 来等待所有 Promise 完成,因为 for await...of 已经确保了顺序执行。

注意事项与最佳实践

  • 错误处理: 在实际应用中,应为文件流、解析器和API调用添加健壮的错误处理机制。例如,readStream.on('error') 和 writeStream.on('error') 以及 try...catch 块对于捕获和处理异常至关重要。
  • 资源管理: 确保文件流在使用完毕后被正确关闭,例如调用 readStream.destroy() 和 writeStream.end()。
  • 可配置的延迟: 将 timeout 函数中的延迟时间作为参数或从配置中读取,以便灵活调整API请求速率。
  • API响应码处理: callAPI 函数应检查API的响应状态码,对限速错误(如HTTP 429 Too Many Requests)进行特殊处理,例如指数退避重试策略。
  • 日志记录: 详细的日志记录有助于监控处理进度和调试问题。
  • 内存使用: 尽管 for await...of 控制了并发,但如果 processedRows 数组累积了大量数据,仍可能导致内存占用过高。对于超大文件,可以考虑直接将处理后的数据流式写入输出文件,而不是先全部收集到内存中。

总结

通过巧妙地结合Node.js的流机制和 for await...of 异步迭代语法,我们可以精确控制对外部API的请求速率,有效避免API限速问题。这种模式不仅适用于CSV文件,也适用于任何需要按顺序处理异步可迭代数据的场景,是构建健壮、高效的Node.js数据处理管道的关键技术之一。理解并正确运用这些异步控制模式,对于开发可靠的后端服务至关重要。

以上就是Node.js 流式处理CSV与API限速的异步控制策略的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号