package com.mfrank.delayqueuedemo.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";
public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";
public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";
// 声明延时Exchange
@Bean("delayExchange")
public DirectExchange delayExchange(){
return new DirectExchange(DELAY_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明延时队列A 延时10s
// 并绑定到对应的死信交换机
@Bean("delayQueueA")
public Queue delayQueueA(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 6000);
return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
}
// 声明延时队列B 延时 60s
// 并绑定到对应的死信交换机
@Bean("delayQueueB")
public Queue delayQueueB(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
// x-message-ttl 声明队列的TTL
args.put("x-message-ttl", 60000);
return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
}
// 声明延时队列C 不设置TTL
// 并绑定到对应的死信交换机
@Bean("delayQueueC")
public Queue delayQueueC(){
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
}
// 声明死信队列A 用于接收延时10s处理的消息
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B 用于接收延时60s处理的消息
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明死信队列C 用于接收延时任意时长处理的消息
@Bean("deadLetterQueueC")
public Queue deadLetterQueueC(){
return new Queue(DEAD_LETTER_QUEUEC_NAME);
}
// 声明延时队列A绑定关系
@Bean
public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
}
// 声明延时队列B绑定关系
@Bean
public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
}
// 声明延时队列C绑定关系
@Bean
public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
@Qualifier("delayExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
// 声明死信队列C绑定关系
@Bean
public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
redis实现延迟消息队列
共16个文件
java:9个
yml:1个
xml:1个
需积分: 2 0 下载量 107 浏览量
2024-03-29
10:04:09
上传
评论
收藏 59KB ZIP 举报
温馨提示
redis实现延迟消息队列 需求背景 最近在做一个排队取号的系统 在用户预约时间到达前XX分钟发短信通知 在用户预约时间结束时要判断用户是否去取号了,不然就记录为爽约 在用户取号后开始,等待XX分钟后要发短信提醒是否需要使用其他渠道办理 类似的场景太多,最简单的解决办法就是定时任务去扫表。这样每个业务都要维护自己的扫表逻辑, 而且数据量越来越来越多的,有的数据可能会延迟比较大 经过一番搜索,网上说rabbitmq可以满足延迟执行需求,但是目前系统用了其他消息中间件,所以不打算用。 基于Redis实现的延迟消息队列java版:delay-queue 整体结构 整个延迟队列由4个部分组成: JobPool用来存放所有Job的元信息。 DelayBucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 Timer负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue。 ReadyQueue存放处于Ready状态的Job(这里只存放JobId),以供消费程序消费。
资源推荐
资源详情
资源评论
收起资源包目录
delayed-queue-demo-master.zip (16个子文件)
delayed-queue-demo-master
mvnw.cmd 6KB
pom.xml 2KB
src
test
java
com
mfrank
delayqueuedemo
DelayQueueDemoApplicationTests.java 359B
main
resources
application.yml 199B
java
com
mfrank
delayqueuedemo
mq
DelayMessageSender.java 2KB
DeadLetterQueueConsumer.java 2KB
controller
RabbitMQMsgController.java 1KB
DelayQueueDemoApplication.java 346B
constants
DelayTypeEnum.java 674B
config
DelayedRabbitMQConfig.java 1KB
RabbitMQConfig.java 6KB
.mvn
wrapper
maven-wrapper.properties 116B
maven-wrapper.jar 47KB
MavenWrapperDownloader.java 5KB
mvnw 9KB
.gitignore 333B
共 16 条
- 1
资源评论
进击的代码家
- 粉丝: 2202
- 资源: 203
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功