
本文旨在解决Node.js应用中,管理有限资源(如特定“标题”)的并发访问问题。当资源持有时间较长,且存在全局并发限制和资源特有队列时,传统基于内存队列的方案可能导致不必要的阻塞。我们将探讨如何通过解耦长时操作,并结合Redis进行状态管理和客户端轮询机制,实现高效、可扩展且非阻塞的资源分配策略,确保不同资源请求间的独立性。
在Node.js应用中,经常会遇到需要管理有限资源(例如本例中的“标题A”或“标题B”)的场景。这些场景通常伴随着以下复杂性:
传统的基于p-queue等内存队列库的实现,虽然能有效管理并发和排队,但当“任务”被定义为包含长时间占用逻辑时,会将整个占用期也纳入队列的阻塞范围。例如,将addTitle函数(其中包含fetchTitleForUser和promptForCancel)作为一个整体任务加入全局队列,会导致promptForCancel的60秒占用期阻塞后续所有任务,即使是请求不同资源的任务。这违背了不同资源请求间独立性的原则。
为了解决上述问题,我们需要将资源分配的“持有”阶段从服务器的阻塞执行流中解耦出来,并引入一个外部、持久化的状态管理机制。
核心思想包括:
以下是利用Redis实现高效资源分配的详细设计:
针对每个可分配的资源(例如Title.A和Title.B),我们可以在Redis中维护以下数据结构:
当用户请求一个标题时,服务器端的处理流程如下:
客户端在发送请求后,不再等待服务器的最终结果,而是进入轮询模式:
当用户选择提前释放资源时:
import Redis from 'ioredis';
import PQueue from 'p-queue'; // 用于管理短时核心任务的全局并发
const redis = new Redis({
host: '127.0.0.1',
port: 6379,
});
// 全局队列,用于串行执行耗时的初始处理任务 (fetchTitleForUser)
const processingQueue = new PQueue({ concurrency: 1 });
enum Title {
A = 'a',
B = 'b',
}
const TITLE_HOLD_DURATION_SECONDS = 60; // 资源持有时间
/**
* 模拟获取标题的初始处理过程,可能耗时但不会长时间阻塞
* @param title 标题类型
* @param userId 用户ID
*/
async function fetchTitleForUser(title: Title, userId: string): Promise<void> {
console.log(`[${userId}] 正在处理 ${title} 的初始请求...`);
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); // 模拟耗时
console.log(`[${userId}] ${title} 初始处理完成。`);
}
/**
* 用户请求标题
* @param title 标题类型
* @param userId 用户ID
* @returns {status: 'granted' | 'queued', message: string}
*/
async function requestTitle(title: Title, userId: string): Promise<{ status: string; message: string }> {
// 1. 将初始处理任务加入全局队列,确保串行执行
await processingQueue.add(() => fetchTitleForUser(title, userId));
const holderKey = `title:${title}:holder`;
const queueKey = `title:${title}:queue`;
// 2. 尝试获取资源或加入队列
const currentHolder = await redis.get(holderKey);
if (!currentHolder) {
// 资源空闲,尝试获取
const setResult = await redis.set(holderKey, userId, 'EX', TITLE_HOLD_DURATION_SECONDS, 'NX');
if (setResult === 'OK') {
console.log(`[${userId}] 成功获取 ${title}。`);
return { status: 'granted', message: `您已获得标题 ${title},有效期 ${TITLE_HOLD_DURATION_SECONDS} 秒。` };
}
// 理论上并发极低,但仍有可能在GET和SET之间被抢占,此时进入队列
}
// 资源已被占用或并发抢占失败,加入队列
await redis.rpush(queueKey, userId);
console.log(`[${userId}] ${title} 已被占用,您已加入队列。`);
return { status: 'queued', message: `标题 ${title} 已被占用,您已进入等待队列。` };
}
/**
* 用户轮询检查是否获得标题
* @param title 标题类型
* @param userId 用户ID
* @returns {status: 'granted' | 'waiting' | 'error', message: string}
*/
async function checkTitleAccess(title: Title, userId: string): Promise<{ status: string; message: string }> {
const holderKey = `title:${title}:holder`;
const queueKey = `title:${title}:queue`;
const currentHolder = await redis.get(holderKey);
if (currentHolder === userId) {
// 当前用户持有资源
return { status: 'granted', message: `您当前持有标题 ${title}。` };
}
// 资源未被当前用户持有,尝试从队列中获取
const queueHead = await redis.lindex(queueKey, 0);
if (queueHead === userId) {
// 当前用户是队列头,尝试获取资源
const script = `
if redis.call('get', KEYS[1]) == nil then
redis.call('lpop', KEYS[2])
redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2])
return 1
end
return 0
`;
const result = await redis.eval(script, 2, holderKey, queueKey, userId, TITLE_HOLD_DURATION_SECONDS);
if (result === 1) {
console.log(`[${userId}] 从队列中获取了 ${title}。`);
return { status: 'granted', message: `您已从队列中获得标题 ${title}。` };
}
}
// 仍需等待
return { status: 'waiting', message: `您仍在等待标题 ${title}。` };
}
/**
* 用户手动释放标题
* @param title 标题类型
* @param userId 用户ID
* @returns {success: boolean, message: string}
*/
async function abortTitle(title: Title, userId: string): Promise<{ success: boolean; message: string }> {
const holderKey = `title:${title}:holder`;
const currentHolder = await redis.get(holderKey);
if (currentHolder === userId) {
await redis.del(holderKey);
console.log(`[${userId}] 手动释放了 ${title}。`);
return { success: true, message: `您已成功释放标题 ${title}。` };
} else if (!currentHolder) {
return { success: false, message: `标题 ${title} 当前无人持有或已被释放。` };
} else {
return { success: false, message: `您不持有标题 ${title},无法释放。` };
}
}
// --- 模拟客户端交互 ---
async function simulateUser(userId: string, title: Title, pollIntervalMs = 5000) {
console.log(`--- ${userId} 请求 ${title} ---`);
const initialResponse = await requestTitle(title, userId);
console.log(`[${userId}] 初始响应: ${initialResponse.message}`);
if (initialResponse.status === 'granted') {
console.log(`[${userId}] 立即获得标题 ${title}。`);
// 假设用户持有一段时间后手动释放或等待过期
setTimeout(async () => {
// await abortTitle(title, userId); // 模拟手动释放
console.log(`[${userId}] 结束持有 ${title}。`);
}, (TITLE_HOLD_以上就是Node.js中利用Redis实现并发受限的资源队列与轮询机制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号