# 消息重试机制
消费者消费消息的时候,发生异常情况,导致消息未确认, 该消息会被重复消费(默认没有重复次数,即无限循环消费)
,但可以通过设置重试次数以及达到重试次数之后的消息处理
实现步骤:
1. 开启重试
2. 修改消息失败策略为重新发布到新队列 或 转入死信队列 (tips:如果修改了失败策略则死信队列无法生效,即两者并不兼容...)
### 一、application.yml 配置
> tips: `spring.rabbitmq.listener.simple.retry.enabled`默认false 即一直死循环消费
```yml
# RabbitMQ配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
# 填写自己安装rabbitmq时设置的账号密码,默认账号密码为`guest`
username: admin
password: admin
virtual-host: my_vhost # 填写自己的虚拟机名,对应可查看 `127.0.0.1:15672/#/users` 下Admin中的`Can access virtual hosts`信息
# 消息接收方
listener:
simple:
acknowledge-mode: auto # 表示消息确认方式,其有三种配置方式,分别是none、manual(手动ack)和auto;默认auto
concurrency: 5 # 最小的消费者数量
max-concurrency: 10 # 最大的消费者数量
prefetch: 3 # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
# 重试机制:
# eg: 最大重试次数为8 & 重试间隔1秒 & 间隔时间乘子2 & 最大间隔时间50秒 -- (最大重试次数包含初次消费)
# 初次消費
# 第1次:1秒
# 第2次:1*2=2秒
# 第3次:2*2=4秒
# 第4次:4*2=8秒
# 第5次:8*2=16秒
# 第6次:16*2=32秒
# 第7次:32*2=64秒 (由于设置最大间隔时间,因此这里为50秒 )
retry:
enabled: true # 是否开启重试
max-attempts: 4 # 最大重试次数
max-interval: 50000 # 重试最大间隔时间
initial-interval: 1000 # 重试间隔(单位:毫秒)
multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
```
### 二、修改消息失败策略
```java
package com.zhengqing.demo.config;
import org.springframework.amqp.rabbit.config.AbstractRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.amqp.AbstractRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfig {
public static final String RETRY_EXCHANGE = "retry_exchange";
public static final String RETRY_FAILURE_KEY = "retry_fail_routing_key";
public static final String RETRY_FAILURE_QUEUE = "retry_fail_queue";
/**
* 修改消息失败策略
* 默认配置: {@link AbstractRabbitListenerContainerFactoryConfigurer#configure(AbstractRabbitListenerContainerFactory, ConnectionFactory, RabbitProperties.AmqpContainer)}
* MessageRecoverer recoverer = this.messageRecoverer != null ? this.messageRecoverer : new RejectAndDontRequeueRecoverer(); 默认拒绝&不重新排队
*/
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
/**
return new RejectAndDontRequeueRecoverer(); // 拒绝&不重新排队(默认)
return new MessageBatchRecoverer() {public void recover(List<Message> messages, Throwable cause) {}}; // 用于消息批量处理的恢复器(Recoverer),它可以在消息消费失败时对一个批量的消息进行统一的处理。
return new ImmediateRequeueMessageRecoverer(); // 重新排队 -- 重试之后,返回队列,然后再重试,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费...
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY); // 重新发布 -- 重试之后,将消息转发到重试失败队列,由重试失败消费者消费...
*/
return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);
}
}
```
消息重试失败重发队列 -- 业务补偿机制
```java
package com.zhengqing.demo.config;
import cn.hutool.core.date.DateTime;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class RetryFailConsumer {
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = RabbitMqConfig.RETRY_FAILURE_QUEUE, durable = "true"),
exchange = @Exchange(value = RabbitMqConfig.RETRY_EXCHANGE, type = "direct", durable = "true"),
key = RabbitMqConfig.RETRY_FAILURE_KEY
)
)
public void retryFailConsumer(Message message, Channel channel) throws Exception {
log.info("[消息重试失败] 接收时间: {} 接收消息: {}", DateTime.now(), new String(message.getBody(), StandardCharsets.UTF_8));
try {
int a = 1 / 0;
} catch (Exception e) {
log.error("[消息重试失败] 异常:{}", e.getMessage());
// 如果这里再抛出异常则继续走消息重试...
// throw e;
}
}
}
```
### 三、测试
```java
package com.zhengqing.demo.controller;
import cn.hutool.core.date.DateTime;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@Api(tags = "测试mq-消息重试")
@RestController
@RequestMapping("/api/mq/retry")
@RequiredArgsConstructor
public class RetryController {
private final RabbitTemplate rabbitTemplate;
@ApiOperation("消息重试")
@PostMapping("producer")
public String producer() {
String msgContent = "Hello World " + DateTime.now();
log.info("{} [生产者] 发送消息: {}", DateTime.now(), msgContent);
this.rabbitTemplate.convertAndSend("test_exchange", "test_routing_key_retry", msgContent);
return "SUCCESS";
}
@SneakyThrows
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "test_queue_retry", durable = "true"),
exchange = @Exchange(value = "test_exchange", type = "direct", durable = "true"),
key = "test_routing_key_retry"
)
)
public void consumer(String msg) {
log.info("{} [消费者] 接收消息: {}", DateTime.now(), msg);
try {
int a = 1 / 0;
} catch (Exception e) {
log.error("[消费者] 异常:{}", e.getMessage());
throw e;
}
}
}
```
### 四、MessageRecoverer 消息失败
没有合适的资源?快使用搜索试试~ 我知道了~
java-workspace存放案例demo代码
共2000个文件
java:1295个
xml:290个
html:99个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 59 浏览量
2024-04-10
11:09:08
上传
评论
收藏 188.98MB ZIP 举报
温馨提示
存放案例demo代码
资源推荐
资源详情
资源评论
收起资源包目录
java-workspace存放案例demo代码 (2000个子文件)
bootstrap.min.css 97KB
bootstrap.min.css 97KB
app.css 45KB
app.css 45KB
main.css 36KB
main.css 36KB
font-awesome.min.css 30KB
font-awesome.min.css 30KB
page.css 6KB
page.css 6KB
timeline.css 3KB
timeline.css 3KB
site.css 1KB
site.css 1KB
site.css 1KB
site.css 1KB
site.css 1KB
site.css 1KB
test.doc 10KB
index_dev.htm 1KB
index_dev.htm 1KB
index.htm 902B
index.htm 902B
Spring Security(5) 整合OAuth2.html 110KB
微信公众号开发 (2) 消息处理.html 109KB
SpringCloud(14) Sentinel 1.8.4 规则持久化到Nacos.html 109KB
整合Seata+多数据源+sharding-jdbc.html 51KB
SpringCloud(11) 整合Seata实现分布式事务.html 46KB
微信公众号开发 (3) 菜单处理.html 43KB
微信公众号开发 (4) 网页授权.html 28KB
微信公众号开发 (1) 微信接入认证成为开发者.html 16KB
Error(23) Log4j远程代码执行漏洞.html 14KB
flow-rule-dialog.html 10KB
flow-rule-dialog.html 10KB
cluster-server-assign-dialog.html 9KB
cluster-server-assign-dialog.html 9KB
flow-rule-dialog.html 8KB
flow-rule-dialog.html 8KB
param-flow-rule-dialog.html 8KB
param-flow-rule-dialog.html 8KB
cluster_app_assign_manage.html 6KB
cluster_app_assign_manage.html 6KB
identity.html 6KB
identity.html 6KB
metric.html 6KB
metric.html 6KB
param_flow.html 6KB
param_flow.html 6KB
cluster_app_server_list.html 5KB
cluster_app_server_list.html 5KB
flow_v1.html 5KB
flow_v1.html 5KB
degrade-rule-dialog.html 5KB
degrade-rule-dialog.html 5KB
identity.html 5KB
identity.html 5KB
flow_v2.html 5KB
flow_v2.html 5KB
authority.html 4KB
authority.html 4KB
system.html 4KB
system.html 4KB
degrade.html 4KB
degrade.html 4KB
cluster_single_config.html 4KB
sidebar.html 4KB
cluster_single_config.html 4KB
sidebar.html 4KB
system-rule-dialog.html 4KB
system-rule-dialog.html 4KB
cluster_app_server_overview.html 4KB
cluster_app_server_overview.html 4KB
flow.html 4KB
flow.html 4KB
api.html 4KB
api.html 4KB
cluster_app_client_list.html 3KB
cluster_app_client_list.html 3KB
machine.html 3KB
machine.html 3KB
IDE工具(46) idea远程调试springboot项目.html 3KB
authority-rule-dialog.html 3KB
authority-rule-dialog.html 3KB
api-dialog.html 3KB
api-dialog.html 3KB
cluster-client-config-dialog.html 2KB
cluster-client-config-dialog.html 2KB
cluster-server-connection-detail-dialog.html 2KB
cluster-server-connection-detail-dialog.html 2KB
client.html 2KB
client.html 2KB
server.html 2KB
server.html 2KB
login.html 2KB
login.html 2KB
login.html 2KB
login.html 2KB
login.html 2KB
index.html 1KB
pagination.tpl.html 1KB
共 2000 条
- 1
- 2
- 3
- 4
- 5
- 6
- 20
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6718
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功