0

0

Node.js中利用Redis实现并发受限的资源队列与轮询机制

霞舞

霞舞

发布时间:2025-11-22 14:50:13

|

992人浏览过

|

来源于php中文网

原创

node.js中利用redis实现并发受限的资源队列与轮询机制

本文旨在解决Node.js应用中,管理有限资源(如特定“标题”)的并发访问问题。当资源持有时间较长,且存在全局并发限制和资源特有队列时,传统基于内存队列的方案可能导致不必要的阻塞。我们将探讨如何通过解耦长时操作,并结合Redis进行状态管理和客户端轮询机制,实现高效、可扩展且非阻塞的资源分配策略,确保不同资源请求间的独立性。

挑战:管理并发受限的资源与长时间占用

在Node.js应用中,经常会遇到需要管理有限资源(例如本例中的“标题A”或“标题B”)的场景。这些场景通常伴随着以下复杂性:

  1. 全局并发限制: 应用可能在任何时刻只能处理一个核心任务(例如,为用户“获取标题”的初始处理阶段)。
  2. 资源特有队列: 针对同一资源(如“标题A”),多个用户请求时需要排队,确保一次只有一个用户持有该资源。
  3. 长时间占用: 用户获取资源后,可能会长时间(例如60秒)持有该资源,期间其他等待同一资源的用户需要等待。
  4. 非阻塞需求: 关键在于,持有不同资源的用户之间不应相互阻塞其请求的初始处理阶段。例如,如果用户1持有“标题A”并进入60秒的占用期,用户3请求“标题B”时,不应等待用户1的60秒占用期结束,而应仅等待全局核心任务完成,然后立即处理其“标题B”的请求。

传统的基于p-queue等内存队列库的实现,虽然能有效管理并发和排队,但当“任务”被定义为包含长时间占用逻辑时,会将整个占用期也纳入队列的阻塞范围。例如,将addTitle函数(其中包含fetchTitleForUser和promptForCancel)作为一个整体任务加入全局队列,会导致promptForCancel的60秒占用期阻塞后续所有任务,即使是请求不同资源的任务。这违背了不同资源请求间独立性的原则。

解决方案核心思想:解耦、状态管理与客户端轮询

为了解决上述问题,我们需要将资源分配的“持有”阶段从服务器的阻塞执行流中解耦出来,并引入一个外部、持久化的状态管理机制。

核心思想包括:

  1. 解耦长时操作: 服务器不再为用户持有资源而阻塞HTTP连接或内部任务队列。
  2. 利用Redis进行状态管理: 使用Redis作为集中式存储,管理每个资源的当前持有者和等待队列。Redis的过期键(TTL)机制天然适用于资源占用时间的管理。
  3. 客户端轮询(Fire-and-Forget): 用户请求资源后,服务器立即响应其请求已进入处理流程或已排队,然后客户端通过周期性轮询来查询是否已获得资源访问权限。

基于Redis的资源分配机制设计

以下是利用Redis实现高效资源分配的详细设计:

TapNow
TapNow

新一代AI视觉创作引擎

下载

1. Redis数据结构

针对每个可分配的资源(例如Title.A和Title.B),我们可以在Redis中维护以下数据结构:

  • 资源持有者键 (String with TTL):
    • 键名:title::holder (例如 title:A:holder)
    • 键值:当前持有该资源的用户ID
    • TTL (Time To Live):设置为资源占用时间(例如60秒)。当键过期时,表示资源被自动释放。
  • 资源等待队列 (List):
    • 键名:title::queue (例如 title:A:queue)
    • 键值:一个用户ID列表,按请求顺序存储等待该资源的用户。

2. 资源请求流程

当用户请求一个标题时,服务器端的处理流程如下:

  1. 接收请求: 用户向服务器发送请求,指定所需标题。
  2. 核心处理(可选的全局并发控制): 如果fetchTitleForUser这类初始处理操作确实需要全局串行执行(如原问题中的queue = new PQueue({ concurrency: 1 })),则可以在这里使用一个独立的p-queue来管理这些短时、高耗能的初始化任务。完成初始化后,该任务即从全局队列中释放。
  3. 管理资源队列:
    • 服务器首先检查 title::holder 键是否存在且未过期。
    • 如果资源空闲:
      • 将当前用户ID设置为 title::holder 的值,并设置60秒的TTL。
      • 服务器立即响应用户,告知其已获得资源。
    • 如果资源已被占用:
      • 将当前用户ID添加到 title::queue 列表的末尾(使用 RPUSH 命令)。
      • 服务器立即响应用户,告知其已排队等待。
  4. 服务器响应: 无论用户是否立即获得资源,服务器都应立即返回一个响应,告知客户端请求状态(已获得或已排队),而不是阻塞等待。

3. 客户端轮询机制

客户端在发送请求后,不再等待服务器的最终结果,而是进入轮询模式:

  1. 周期性查询: 客户端每隔X秒(例如5秒)向服务器发送一个查询请求(例如 /checkAccess?title=A&userId=user123)。
  2. 服务器处理轮询:
    • 接收到轮询请求后,服务器检查 title::holder 键。
    • 如果键存在且其值与当前轮询用户ID匹配:
      • 表示该用户仍持有资源。服务器响应确认。
    • 如果键不存在或其值不匹配(可能已过期或被其他用户持有):
      • 服务器进一步检查 title::queue 列表。
      • 如果列表不为空,且当前轮询用户ID是列表的第一个元素(使用 LINDEX 检查):
        • 表示该用户现在是队列的首位,可以尝试获取资源。
        • 使用Redis事务(MULTI/EXEC)或Lua脚本确保原子性:
          • 检查 title::holder 是否仍为空。
          • 如果为空,则 LPOP 移除队列头部的用户,然后 SET title::holder 为该用户ID并设置TTL。
        • 如果成功获取,服务器响应告知用户已获得资源。
        • 如果未能获取(例如,在检查到设置之间,另一个并发的轮询请求抢占了),则服务器响应告知用户仍需等待。
      • 如果列表为空,且当前轮询用户ID不是列表的第一个元素:
        • 服务器响应告知用户仍需等待或资源已被其他人获取。

4. 手动释放资源

当用户选择提前释放资源时:

  1. 发送释放请求: 用户向服务器发送一个释放请求。
  2. 服务器处理: 服务器简单地删除 title::holder 键(使用 DEL 命令)。
  3. 后续处理: 资源被释放后,下一个轮询到位的用户(位于 title::queue 列表头部的用户)将在其下一次轮询时检测到资源空闲并尝试获取。

示例代码结构 (Node.js & ioredis)

import Redis from 'ioredis';
import PQueue from 'p-queue'; // 用于管理短时核心任务的全局并发

const redis = new Redis({
  host: '127.0.0.1',
  port: 6379,
});

// 全局队列,用于串行执行耗时的初始处理任务 (fetchTitleForUser)
const processingQueue = new PQueue({ concurrency: 1 });

enum Title {
  A = 'a',
  B = 'b',
}

const TITLE_HOLD_DURATION_SECONDS = 60; // 资源持有时间

/**
 * 模拟获取标题的初始处理过程,可能耗时但不会长时间阻塞
 * @param title 标题类型
 * @param userId 用户ID
 */
async function fetchTitleForUser(title: Title, userId: string): Promise {
  console.log(`[${userId}] 正在处理 ${title} 的初始请求...`);
  await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); // 模拟耗时
  console.log(`[${userId}] ${title} 初始处理完成。`);
}

/**
 * 用户请求标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {status: 'granted' | 'queued', message: string}
 */
async function requestTitle(title: Title, userId: string): Promise<{ status: string; message: string }> {
  // 1. 将初始处理任务加入全局队列,确保串行执行
  await processingQueue.add(() => fetchTitleForUser(title, userId));

  const holderKey = `title:${title}:holder`;
  const queueKey = `title:${title}:queue`;

  // 2. 尝试获取资源或加入队列
  const currentHolder = await redis.get(holderKey);

  if (!currentHolder) {
    // 资源空闲,尝试获取
    const setResult = await redis.set(holderKey, userId, 'EX', TITLE_HOLD_DURATION_SECONDS, 'NX');
    if (setResult === 'OK') {
      console.log(`[${userId}] 成功获取 ${title}。`);
      return { status: 'granted', message: `您已获得标题 ${title},有效期 ${TITLE_HOLD_DURATION_SECONDS} 秒。` };
    }
    // 理论上并发极低,但仍有可能在GET和SET之间被抢占,此时进入队列
  }

  // 资源已被占用或并发抢占失败,加入队列
  await redis.rpush(queueKey, userId);
  console.log(`[${userId}] ${title} 已被占用,您已加入队列。`);
  return { status: 'queued', message: `标题 ${title} 已被占用,您已进入等待队列。` };
}

/**
 * 用户轮询检查是否获得标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {status: 'granted' | 'waiting' | 'error', message: string}
 */
async function checkTitleAccess(title: Title, userId: string): Promise<{ status: string; message: string }> {
  const holderKey = `title:${title}:holder`;
  const queueKey = `title:${title}:queue`;

  const currentHolder = await redis.get(holderKey);

  if (currentHolder === userId) {
    // 当前用户持有资源
    return { status: 'granted', message: `您当前持有标题 ${title}。` };
  }

  // 资源未被当前用户持有,尝试从队列中获取
  const queueHead = await redis.lindex(queueKey, 0);

  if (queueHead === userId) {
    // 当前用户是队列头,尝试获取资源
    const script = `
      if redis.call('get', KEYS[1]) == nil then
        redis.call('lpop', KEYS[2])
        redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2])
        return 1
      end
      return 0
    `;
    const result = await redis.eval(script, 2, holderKey, queueKey, userId, TITLE_HOLD_DURATION_SECONDS);

    if (result === 1) {
      console.log(`[${userId}] 从队列中获取了 ${title}。`);
      return { status: 'granted', message: `您已从队列中获得标题 ${title}。` };
    }
  }

  // 仍需等待
  return { status: 'waiting', message: `您仍在等待标题 ${title}。` };
}

/**
 * 用户手动释放标题
 * @param title 标题类型
 * @param userId 用户ID
 * @returns {success: boolean, message: string}
 */
async function abortTitle(title: Title, userId: string): Promise<{ success: boolean; message: string }> {
  const holderKey = `title:${title}:holder`;
  const currentHolder = await redis.get(holderKey);

  if (currentHolder === userId) {
    await redis.del(holderKey);
    console.log(`[${userId}] 手动释放了 ${title}。`);
    return { success: true, message: `您已成功释放标题 ${title}。` };
  } else if (!currentHolder) {
    return { success: false, message: `标题 ${title} 当前无人持有或已被释放。` };
  } else {
    return { success: false, message: `您不持有标题 ${title},无法释放。` };
  }
}

// --- 模拟客户端交互 ---
async function simulateUser(userId: string, title: Title, pollIntervalMs = 5000) {
  console.log(`--- ${userId} 请求 ${title} ---`);
  const initialResponse = await requestTitle(title, userId);
  console.log(`[${userId}] 初始响应: ${initialResponse.message}`);

  if (initialResponse.status === 'granted') {
    console.log(`[${userId}] 立即获得标题 ${title}。`);
    // 假设用户持有一段时间后手动释放或等待过期
    setTimeout(async () => {
      // await abortTitle(title, userId); // 模拟手动释放
      console.log(`[${userId}] 结束持有 ${title}。`);
    }, (TITLE_HOLD_

相关专题

更多
string转int
string转int

在编程中,我们经常会遇到需要将字符串(str)转换为整数(int)的情况。这可能是因为我们需要对字符串进行数值计算,或者需要将用户输入的字符串转换为整数进行处理。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

315

2023.08.02

treenode的用法
treenode的用法

​在计算机编程领域,TreeNode是一种常见的数据结构,通常用于构建树形结构。在不同的编程语言中,TreeNode可能有不同的实现方式和用法,通常用于表示树的节点信息。更多关于treenode相关问题详情请看本专题下面的文章。php中文网欢迎大家前来学习。

533

2023.12.01

C++ 高效算法与数据结构
C++ 高效算法与数据结构

本专题讲解 C++ 中常用算法与数据结构的实现与优化,涵盖排序算法(快速排序、归并排序)、查找算法、图算法、动态规划、贪心算法等,并结合实际案例分析如何选择最优算法来提高程序效率。通过深入理解数据结构(链表、树、堆、哈希表等),帮助开发者提升 在复杂应用中的算法设计与性能优化能力。

17

2025.12.22

深入理解算法:高效算法与数据结构专题
深入理解算法:高效算法与数据结构专题

本专题专注于算法与数据结构的核心概念,适合想深入理解并提升编程能力的开发者。专题内容包括常见数据结构的实现与应用,如数组、链表、栈、队列、哈希表、树、图等;以及高效的排序算法、搜索算法、动态规划等经典算法。通过详细的讲解与复杂度分析,帮助开发者不仅能熟练运用这些基础知识,还能在实际编程中优化性能,提高代码的执行效率。本专题适合准备面试的开发者,也适合希望提高算法思维的编程爱好者。

13

2026.01.06

js正则表达式
js正则表达式

php中文网为大家提供各种js正则表达式语法大全以及各种js正则表达式使用的方法,还有更多js正则表达式的相关文章、相关下载、相关课程,供大家免费下载体验。

510

2023.06.20

js获取当前时间
js获取当前时间

JS全称JavaScript,是一种具有函数优先的轻量级,解释型或即时编译型的编程语言;它是一种属于网络的高级脚本语言,主要用于Web,常用来为网页添加各式各样的动态功能。js怎么获取当前时间呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

244

2023.07.28

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

254

2023.08.03

js是什么意思
js是什么意思

JS是JavaScript的缩写,它是一种广泛应用于网页开发的脚本语言。JavaScript是一种解释性的、基于对象和事件驱动的编程语言,通常用于为网页增加交互性和动态性。它可以在网页上实现复杂的功能和效果,如表单验证、页面元素操作、动画效果、数据交互等。

5266

2023.08.17

Java 项目构建与依赖管理(Maven / Gradle)
Java 项目构建与依赖管理(Maven / Gradle)

本专题系统讲解 Java 项目构建与依赖管理的完整体系,重点覆盖 Maven 与 Gradle 的核心概念、项目生命周期、依赖冲突解决、多模块项目管理、构建加速与版本发布规范。通过真实项目结构示例,帮助学习者掌握 从零搭建、维护到发布 Java 工程的标准化流程,提升在实际团队开发中的工程能力与协作效率。

4

2026.01.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.3万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号