const RedisUtil = require("./redisUtil");
const fs = require("fs");
/* DelayQueueJob
long id; 延迟任务的唯一标识,用于检索任务
String topic; 任务类型(具体业务类型)
long delayTime; 任务的执行时间
long ttrTime; 任务的执行超时时间
String message; 任务具体的消息内容,用于处理具体业务逻辑用 */
// 可以考虑将bucket 分成多个,有利于提高效率
const DELAY_BUCKET_KEY_PREFIX = "delayBucket";
// bucket的数量
// const DELAY_BUCKET_NUM = 1;
/**
* 添加
* {id: string; topic: string; delayTime: number; ttrTime: number; message: string}
* @param {Object} delayQueueJob
*/
const pushDelayQueueAndItem = async (delayQueueJob) => {
// 1、添加job id到sorted set中
// 2、添加job的body到hash或string
const delayTime = delayQueueJob.delayTime;
const member = delayQueueJob.delayQueueJodId;
const args = [DELAY_BUCKET_KEY_PREFIX, delayTime, member];
await Promise.all([
RedisUtil.zaddAsync(args),
RedisUtil.setAsync(key, JSON.stringify(delayQueueJob)),
]);
};
/**
*
* @param {number} maxScore
* @param {number} minScore
* @param {number} offset
* @param {number} count
* @returns
*/
const getDelayQueue = async (maxScore, minScore, offset, count) => {
const delayQueueList = await RedisUtil.zrevrangebyscoreAsync(
DELAY_BUCKET_KEY_PREFIX,
maxScore,
minScore,
"LIMIT",
offset,
count
);
if (delayQueueList || delayQueueList.length() === 0) {
return;
}
return await RedisUtil.getAsync(delayQueueList);
};
/**
*
* @param {string[]} members
* @param {string[]} keys
* @returns
*/
const removeDelayQueueAndItems = async (members, keys) => {
await Promise.all([RedisUtil.zremAsync(members), RedisUtil.delAsync(keys)]);
return;
};
// const getDelayBucketKey = (delayTime) => {
// return DELAY_BUCKET_KEY_PREFIX + (delayTime % DELAY_BUCKET_NUM);
// };