package com.example.rebbit.mq;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @author lx
* @data 2022/10/31 9:11
*/
@Component
public class Producer {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
private void init(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
@Override
//和交互机确认
public void confirm(CorrelationData correlationData, boolean b, String s) {
String id = correlationData != null ? correlationData.getId() :"";
if (b){
System.out.println("交互机收到消息Id------"+id);
}else {
System.out.println("交互机未收到消息Id========"+id+"原因"+s);
}
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
@Override
//路由退回
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("消息:"+new String(returnedMessage.getMessage().getBody()));
System.out.println("交互机:"+returnedMessage.getExchange());
System.out.println("退回原因:"+returnedMessage.getReplyText());
System.out.println("路由key:"+returnedMessage.getRoutingKey());
}
});
}
//Direct模式,支持手动应答和消息确认
public void consumer() {
for(int i=0;i<4;i++){
String id = UUID.randomUUID().toString();
String messageData = "test message, hello!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String,Object> map=new HashMap<>();
map.put("id",id);
map.put("messageData",messageData);
map.put("createTime",createTime);
CorrelationData correlationData= new CorrelationData("direct"+i);
//将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
rabbitTemplate.convertSendAndReceive("TestDirectExchange", "TestDirectRouting2", map,correlationData);
}
}
//发布订阅模式
public void publish() throws Exception {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "publish ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
CorrelationData correlationData= new CorrelationData("publish");
rabbitTemplate.convertSendAndReceive("fanout_exchange", "", manMap,correlationData);
}
//topic模式 路由匹配
public void topIc() throws Exception {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "topIc ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
CorrelationData correlationData= new CorrelationData("topic.man");
CorrelationData correlationData2= new CorrelationData("topic.man.xxl");
rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.man", manMap,correlationData);
rabbitTemplate.convertSendAndReceive("topic_exchange", "topic.man.xxl", manMap,correlationData2);
}
//work 生产者,消费者模式
public void work() throws Exception {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "work ";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("messageData", messageData);
manMap.put("createTime", createTime);
rabbitTemplate.convertSendAndReceive("rabbit_work_queue",manMap);
}
//队列已满,进入死信
public void normal() throws Exception {
for(int i=0;i<4;i++){
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("createTime", createTime);
rabbitTemplate.convertAndSend("directExchange", "normal.key", manMap);
}
}
//消息过期,进入死信
public void ttl() throws Exception {
String messageId = String.valueOf(UUID.randomUUID());
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", messageId);
manMap.put("createTime", createTime);
rabbitTemplate.convertSendAndReceive("directExchange","ttl.key",manMap);
}
public void bcckUp() throws Exception {
Map<String, Object> manMap = new HashMap<>();
manMap.put("messageId", "1");
CorrelationData correlationData= new CorrelationData("bcckUp1");
rabbitTemplate.convertSendAndReceive("confirmExchange","confirm.key",manMap,correlationData);
CorrelationData correlationData2= new CorrelationData("bcckUp2");
manMap.put("messageId", "2");
rabbitTemplate.convertSendAndReceive("confirmExchange","ll.key",manMap,correlationData2);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
sprngboot整合 rabbitMQ MQ的作用 流量消峰 ,应用解耦,异步处理 rabbitMq 五种模式 简单模式:一个生产者,一个消费者 工作模式:一个生产者,多个消费者 发布订阅模:又叫无路由键交换机模式,队列绑定到交换机,当发送消息到交换机时,绑定到该交换机的队列都会监听到 Direct模式:直连路由键交换机模式,其会直连指定一个路由键与队列 与交换机进行绑定 主题路由模式:与直连路由键交换机的区别在于其可以对交换机做层级匹配 rabbitMq死信 1消息TTL过期 2队列已满 3消息被拒绝 发布确认模式:消息成功发送到交互机 生产者发送消息,如果路由错误不能到达指定队列 解决方法有如下几种: 1使用备份交换器路由到备胎队列消费。这样可以保证未被路由的消息不会丢失。 2通过消息的回调方法,添加ReturnListener的编程逻辑.
资源推荐
资源详情
资源评论
收起资源包目录
rebbit.zip (18个子文件)
rebbit
mq
three
TopRabbitConsumer.java 1KB
TopicRabbitConfig.java 2KB
TopRabbitConsumer2.java 1KB
five
ConfirmConfig.java 1KB
WarningConsumer.java 888B
Producer.java 6KB
zero
WorkConsume.java 1KB
WorkRabbitConfig.java 415B
WorkConsume2.java 1KB
four
DeadConsumer.java 1KB
DealConfig.java 3KB
tow
ManReceiver.java 1KB
RabbitConfig.java 1KB
ManReceiver2.java 1KB
RebbitMqCtr.java 1KB
one
Consumer2.java 1KB
Consumer.java 1KB
DirectRabbitConfig.java 2KB
共 18 条
- 1
资源评论
qq_19972217
- 粉丝: 2
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功