
本文深入探讨了在node.js中使用文件流处理csv数据并按行调用外部api时,如何有效管理api请求速率限制的问题。通过分析常见错误模式,文章提出了利用 `for await...of` 循环结合 `csv-parse` 库来顺序控制异步操作的解决方案,从而避免api过载,确保数据处理的稳定性和可靠性。
在现代应用开发中,处理大量数据文件(如CSV)并与外部API交互是常见需求。然而,外部API通常设有速率限制,若不加以控制,短时间内发起过多请求会导致API拒绝服务,甚至封禁IP。Node.js的非阻塞I/O和流(Stream)特性使得处理大文件变得高效,但结合异步操作时,需要特别注意并发控制。
设想这样一个场景:你需要读取一个大型CSV文件,文件的每一行都包含调用某个外部API所需的信息。为了避免API过载,每次API调用后都需要等待一段随机时间。一个直观的尝试可能是使用 fs.createReadStream 管道 csv-parse,然后在 on('data') 事件中执行异步操作并引入延迟:
const fs = require('fs');
const { parse } = require('csv-parse');
const { stringify } = require('csv-stringify');
// 辅助函数:生成指定范围内的随机整数
function randomIntFromInterval(min, max) {
  return Math.floor(Math.random() * (max - min + 1) + min);
}
// 辅助函数:创建Promise延迟
function timeout() {
  return new Promise((resolve) =>
    setTimeout(resolve, randomIntFromInterval(3000, 39990)) // 3秒到近40秒的随机延迟
  );
}
// 模拟API调用
async function callAPI(firstName) {
  console.log(`Calling API for: ${firstName}`);
  await timeout(); // 模拟API响应时间
  return `Info for ${firstName}`;
}
// 处理单行数据
async function processData(row, data) {
  const firstName = row[0];
  const infos = await callAPI(firstName);
  // 再次引入延迟,避免后续处理过快,但这里不是关键
  await timeout(); 
  const newRow = [firstName, infos];
  data.push(newRow);
  return data;
}
function processCsvWithFloodingRisk() {
  return new Promise((resolve, reject) => {
    const promises = [];
    const processedData = []; // 用于存储处理后的数据
    fs.createReadStream("./input.csv")
      .pipe(parse({ delimiter: ",", from_line: 1 }))
      .on("data", async (row) => {
        // 问题所在:on('data') 事件是异步触发的,每次触发都会立即执行
        // processData并将其Promise推入数组,而不会等待上一个Promise完成。
        // 即使这里有 await timeout(),它也只延迟了当前 on('data') 回调的后续代码,
        // 而不是阻止下一个 on('data') 事件的触发。
        promises.push(processData(row, processedData));
        await timeout(); // 这个延迟仅影响当前 on('data') 回调的内部流程
      })
      .on("end", async () => {
        console.log("Stream ended, waiting for all promises to resolve...");
        await Promise.all(promises); // 此时所有API调用可能已经并发发出
        stringify(processedData, (err, output) => {
          if (err) return reject(err);
          fs.writeFileSync("output.csv", output);
          console.log("Output written to output.csv");
          resolve();
        });
      })
      .on("error", function (error) {
        console.error("Stream error:", error.message);
        reject(error);
      });
  });
}
// 假设 input.csv 存在,内容类似:
// Name,Age
// Alice,30
// Bob,25
// Charlie,35
// ...
// processCsvWithFloodingRisk().catch(console.error);上述代码的问题在于,on('data') 事件是流在读取到足够数据块时异步触发的。这意味着,当一个 on('data') 回调正在执行其 async 逻辑(包括 await timeout())时,流可能已经读取了更多数据,并触发了下一个 on('data') 事件。结果就是,所有的 processData Promise几乎同时被创建并开始执行,导致API在短时间内接收到大量请求,从而引发限速问题。
要解决这个问题,我们需要一种机制来暂停流的读取,直到当前的异步操作完成。Node.js 10及更高版本引入的 for await...of 循环正是为此而生。它可以直接迭代异步可迭代对象(如可读流),并在每次迭代时等待异步操作完成。
以下是修正后的代码示例:
const fs = require('fs');
const { parse } = require('csv-parse');
const { stringify } = require('csv-stringify');
// 辅助函数:生成指定范围内的随机整数
function randomIntFromInterval(min, max) {
  return Math.floor(Math.random() * (max - min + 1) + min);
}
// 辅助函数:创建Promise延迟
function timeout() {
  return new Promise((resolve) =>
    setTimeout(resolve, randomIntFromInterval(3000, 39990)) // 3秒到近40秒的随机延迟
  );
}
// 模拟API调用
async function callAPI(firstName) {
  console.log(`Calling API for: ${firstName} at ${new Date().toLocaleTimeString()}`);
  await timeout(); // 模拟API响应时间
  return `Info for ${firstName}`;
}
// 处理单行数据
async function processData(row) {
  const firstName = row[0];
  const infos = await callAPI(firstName);
  return [firstName, infos];
}
async function processCsvWithRateLimitControl() {
  const readStream = fs.createReadStream('./input.csv');
  const writeStream = fs.createWriteStream('output.csv');
  const parser = parse({ delimiter: ',', from_line: 1 });
  const processedRows = []; // 存储处理后的行数据
  // 将读取流管道到解析器
  readStream.pipe(parser);
  try {
    // 使用 for await...of 迭代解析器,它是一个异步可迭代对象
    for await (const row of parser) {
      console.log(`Processing row: ${row[0]}`);
      const newRow = await processData(row); // 顺序处理每一行,等待API调用完成
      processedRows.push(newRow);
      await timeout(); // 在处理下一行之前引入延迟,严格控制请求速率
    }
    // 所有行处理完毕后,将结果写入CSV
    stringify(processedRows, (err, output) => {
      if (err) {
        console.error("Error stringifying data:", err);
        return;
      }
      writeStream.write(output);
      writeStream.end();
      console.log('Output written to output.csv. OK');
    });
  } catch (error) {
    console.error("An error occurred during CSV processing:", error.message);
  } finally {
    // 确保流被关闭
    readStream.destroy();
    writeStream.end();
  }
}
// 启动处理流程
processCsvWithRateLimitControl().catch(console.error);
// 示例 input.csv 内容:
// Name,Age
// Alice,30
// Bob,25
// Charlie,35
// David,40
// Eve,28通过巧妙地结合Node.js的流机制和 for await...of 异步迭代语法,我们可以精确控制对外部API的请求速率,有效避免API限速问题。这种模式不仅适用于CSV文件,也适用于任何需要按顺序处理异步可迭代数据的场景,是构建健壮、高效的Node.js数据处理管道的关键技术之一。理解并正确运用这些异步控制模式,对于开发可靠的后端服务至关重要。
以上就是Node.js 流式处理CSV与API限速的异步控制策略的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                 
                                
                                 收藏
收藏
                                                                             
                                
                                 收藏
收藏
                                                                             
                                
                                 收藏
收藏
                                                                            Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号