首页 > web前端 > js教程 > 正文

Node.js中利用Redis实现并发受限的资源队列与轮询机制

霞舞
发布: 2025-11-22 14:50:13
原创
944人浏览过

node.js中利用redis实现并发受限的资源队列与轮询机制

本文旨在解决Node.js应用中,管理有限资源(如特定“标题”)的并发访问问题。当资源持有时间较长,且存在全局并发限制和资源特有队列时,传统基于内存队列的方案可能导致不必要的阻塞。我们将探讨如何通过解耦长时操作,并结合Redis进行状态管理和客户端轮询机制,实现高效、可扩展且非阻塞的资源分配策略,确保不同资源请求间的独立性。

挑战:管理并发受限的资源与长时间占用

在Node.js应用中,经常会遇到需要管理有限资源(例如本例中的“标题A”或“标题B”)的场景。这些场景通常伴随着以下复杂性:

  1. 全局并发限制: 应用可能在任何时刻只能处理一个核心任务(例如,为用户“获取标题”的初始处理阶段)。
  2. 资源特有队列: 针对同一资源(如“标题A”),多个用户请求时需要排队,确保一次只有一个用户持有该资源。
  3. 长时间占用: 用户获取资源后,可能会长时间(例如60秒)持有该资源,期间其他等待同一资源的用户需要等待。
  4. 非阻塞需求: 关键在于,持有不同资源的用户之间不应相互阻塞其请求的初始处理阶段。例如,如果用户1持有“标题A”并进入60秒的占用期,用户3请求“标题B”时,不应等待用户1的60秒占用期结束,而应仅等待全局核心任务完成,然后立即处理其“标题B”的请求。

传统的基于p-queue等内存队列库的实现,虽然能有效管理并发和排队,但当“任务”被定义为包含长时间占用逻辑时,会将整个占用期也纳入队列的阻塞范围。例如,将addTitle函数(其中包含fetchTitleForUser和promptForCancel)作为一个整体任务加入全局队列,会导致promptForCancel的60秒占用期阻塞后续所有任务,即使是请求不同资源的任务。这违背了不同资源请求间独立性的原则。

解决方案核心思想:解耦、状态管理与客户端轮询

为了解决上述问题,我们需要将资源分配的“持有”阶段从服务器的阻塞执行流中解耦出来,并引入一个外部、持久化的状态管理机制。

核心思想包括:

  1. 解耦长时操作: 服务器不再为用户持有资源而阻塞HTTP连接或内部任务队列。
  2. 利用Redis进行状态管理: 使用Redis作为集中式存储,管理每个资源的当前持有者和等待队列。Redis的过期键(TTL)机制天然适用于资源占用时间的管理。
  3. 客户端轮询(Fire-and-Forget): 用户请求资源后,服务器立即响应其请求已进入处理流程或已排队,然后客户端通过周期性轮询来查询是否已获得资源访问权限。

基于Redis的资源分配机制设计

以下是利用Redis实现高效资源分配的详细设计:

LobeHub
LobeHub

LobeChat brings you the best user experience of ChatGPT, OLLaMA, Gemini, Claude

LobeHub 201
查看详情 LobeHub

1. Redis数据结构

针对每个可分配的资源(例如Title.A和Title.B),我们可以在Redis中维护以下数据结构:

  • 资源持有者键 (String with TTL):
    • 键名:title:<title_name>:holder (例如 title:A:holder)
    • 键值:当前持有该资源的用户ID
    • TTL (Time To Live):设置为资源占用时间(例如60秒)。当键过期时,表示资源被自动释放。
  • 资源等待队列 (List):
    • 键名:title:<title_name>:queue (例如 title:A:queue)
    • 键值:一个用户ID列表,按请求顺序存储等待该资源的用户。

2. 资源请求流程

当用户请求一个标题时,服务器端的处理流程如下:

  1. 接收请求: 用户向服务器发送请求,指定所需标题。
  2. 核心处理(可选的全局并发控制): 如果fetchTitleForUser这类初始处理操作确实需要全局串行执行(如原问题中的queue = new PQueue({ concurrency: 1 })),则可以在这里使用一个独立的p-queue来管理这些短时、高耗能的初始化任务。完成初始化后,该任务即从全局队列中释放。
  3. 管理资源队列:
    • 服务器首先检查 title:<title_name>:holder 键是否存在且未过期。
    • 如果资源空闲:
      • 将当前用户ID设置为 title:<title_name>:holder 的值,并设置60秒的TTL。
      • 服务器立即响应用户,告知其已获得资源。
    • 如果资源已被占用:
      • 将当前用户ID添加到 title:<title_name>:queue 列表的末尾(使用 RPUSH 命令)。
      • 服务器立即响应用户,告知其已排队等待。
  4. 服务器响应: 无论用户是否立即获得资源,服务器都应立即返回一个响应,告知客户端请求状态(已获得或已排队),而不是阻塞等待。

3. 客户端轮询机制

客户端在发送请求后,不再等待服务器的最终结果,而是进入轮询模式:

  1. 周期性查询: 客户端每隔X秒(例如5秒)向服务器发送一个查询请求(例如 /checkAccess?title=A&userId=user123)。
  2. 服务器处理轮询:
    • 接收到轮询请求后,服务器检查 title:<title_name>:holder 键。
    • 如果键存在且其值与当前轮询用户ID匹配:
      • 表示该用户仍持有资源。服务器响应确认。
    • 如果键不存在或其值不匹配(可能已过期或被其他用户持有):
      • 服务器进一步检查 title:<title_name>:queue 列表。
      • 如果列表不为空,且当前轮询用户ID是列表的第一个元素(使用 LINDEX 检查):
        • 表示该用户现在是队列的首位,可以尝试获取资源。
        • 使用Redis事务(MULTI/EXEC)或Lua脚本确保原子性:
          • 检查 title:<title_name>:holder 是否仍为空。
          • 如果为空,则 LPOP 移除队列头部的用户,然后 SET title:<title_name>:holder 为该用户ID并设置TTL。
        • 如果成功获取,服务器响应告知用户已获得资源。
        • 如果未能获取(例如,在检查到设置之间,另一个并发的轮询请求抢占了),则服务器响应告知用户仍需等待。
      • 如果列表为空,且当前轮询用户ID不是列表的第一个元素:
        • 服务器响应告知用户仍需等待或资源已被其他人获取。

4. 手动释放资源

当用户选择提前释放资源时:

  1. 发送释放请求: 用户向服务器发送一个释放请求。
  2. 服务器处理: 服务器简单地删除 title:<title_name>:holder 键(使用 DEL 命令)。
  3. 后续处理: 资源被释放后,下一个轮询到位的用户(位于 title:<title_name>:queue 列表头部的用户)将在其下一次轮询时检测到资源空闲并尝试获取。

示例代码结构 (Node.js & ioredis)

import Redis from 'ioredis';
import PQueue from 'p-queue'; // 用于管理短时核心任务的全局并发

const redis = new Redis({
  host: '127.0.0.1',
  port: 6379,
});

// 全局队列,用于串行执行耗时的初始处理任务 (fetchTitleForUser)
const processingQueue = new PQueue({ concurrency: 1 });

enum Title {
  A = 'a',
  B = 'b',
}

const TITLE_HOLD_DURATION_SECONDS = 60; // 资源持有时间

/**
 * 模拟获取标题的初始处理过程,可能耗时但不会长时间阻塞
 * @param title 标题类型
 * @param userId 用户ID
 */
async function fetchTitleForUser(title: Title, userId: string): Promise<void> {
  console.log(`[${userId}] 正在处理 ${title} 的初始请求...`);
  await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); // 模拟耗时
  console.log(`[${userId}] ${title} 初始处理完成。`);
}

/**
 * 用户请求标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {status: 'granted' | 'queued', message: string}
 */
async function requestTitle(title: Title, userId: string): Promise<{ status: string; message: string }> {
  // 1. 将初始处理任务加入全局队列,确保串行执行
  await processingQueue.add(() => fetchTitleForUser(title, userId));

  const holderKey = `title:${title}:holder`;
  const queueKey = `title:${title}:queue`;

  // 2. 尝试获取资源或加入队列
  const currentHolder = await redis.get(holderKey);

  if (!currentHolder) {
    // 资源空闲,尝试获取
    const setResult = await redis.set(holderKey, userId, 'EX', TITLE_HOLD_DURATION_SECONDS, 'NX');
    if (setResult === 'OK') {
      console.log(`[${userId}] 成功获取 ${title}。`);
      return { status: 'granted', message: `您已获得标题 ${title},有效期 ${TITLE_HOLD_DURATION_SECONDS} 秒。` };
    }
    // 理论上并发极低,但仍有可能在GET和SET之间被抢占,此时进入队列
  }

  // 资源已被占用或并发抢占失败,加入队列
  await redis.rpush(queueKey, userId);
  console.log(`[${userId}] ${title} 已被占用,您已加入队列。`);
  return { status: 'queued', message: `标题 ${title} 已被占用,您已进入等待队列。` };
}

/**
 * 用户轮询检查是否获得标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {status: 'granted' | 'waiting' | 'error', message: string}
 */
async function checkTitleAccess(title: Title, userId: string): Promise<{ status: string; message: string }> {
  const holderKey = `title:${title}:holder`;
  const queueKey = `title:${title}:queue`;

  const currentHolder = await redis.get(holderKey);

  if (currentHolder === userId) {
    // 当前用户持有资源
    return { status: 'granted', message: `您当前持有标题 ${title}。` };
  }

  // 资源未被当前用户持有,尝试从队列中获取
  const queueHead = await redis.lindex(queueKey, 0);

  if (queueHead === userId) {
    // 当前用户是队列头,尝试获取资源
    const script = `
      if redis.call('get', KEYS[1]) == nil then
        redis.call('lpop', KEYS[2])
        redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2])
        return 1
      end
      return 0
    `;
    const result = await redis.eval(script, 2, holderKey, queueKey, userId, TITLE_HOLD_DURATION_SECONDS);

    if (result === 1) {
      console.log(`[${userId}] 从队列中获取了 ${title}。`);
      return { status: 'granted', message: `您已从队列中获得标题 ${title}。` };
    }
  }

  // 仍需等待
  return { status: 'waiting', message: `您仍在等待标题 ${title}。` };
}

/**
 * 用户手动释放标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {success: boolean, message: string}
 */
async function abortTitle(title: Title, userId: string): Promise<{ success: boolean; message: string }> {
  const holderKey = `title:${title}:holder`;
  const currentHolder = await redis.get(holderKey);

  if (currentHolder === userId) {
    await redis.del(holderKey);
    console.log(`[${userId}] 手动释放了 ${title}。`);
    return { success: true, message: `您已成功释放标题 ${title}。` };
  } else if (!currentHolder) {
    return { success: false, message: `标题 ${title} 当前无人持有或已被释放。` };
  } else {
    return { success: false, message: `您不持有标题 ${title},无法释放。` };
  }
}

// --- 模拟客户端交互 ---
async function simulateUser(userId: string, title: Title, pollIntervalMs = 5000) {
  console.log(`--- ${userId} 请求 ${title} ---`);
  const initialResponse = await requestTitle(title, userId);
  console.log(`[${userId}] 初始响应: ${initialResponse.message}`);

  if (initialResponse.status === 'granted') {
    console.log(`[${userId}] 立即获得标题 ${title}。`);
    // 假设用户持有一段时间后手动释放或等待过期
    setTimeout(async () => {
      // await abortTitle(title, userId); // 模拟手动释放
      console.log(`[${userId}] 结束持有 ${title}。`);
    }, (TITLE_HOLD_
登录后复制

以上就是Node.js中利用Redis实现并发受限的资源队列与轮询机制的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号