![image](https://raw.githubusercontent.com//master/123.png)
这叫独立消息的最终一致性,有三个角色
1. 生产者
2. 消费者
3. 消息处理者:定时任务进行补偿
但本人将上图消息处理者的的消息确认使用一个本地的业务标记,用来标志着一个本地事务的完成。之后分为两个阶段,一个是调用confirm确认是否存在(目的是为了确认生产者是否已经完成了这个业务,还是已经回滚),然后再调用deleteBusiness删除业务标记,此两个接口由生产者提供,消息处理者远程调用。
因为若不分为两步,改为一步,按上图的方式:执行顺序为: 执行RPC→修改消息为可发送。但是消息处理者可能还没修改消息的状态,就宕机了,而生产者已经删除了业务标记,那么这个消息没有存到redis,更不会发到mq,消费者自然也就不可能消费到。因此可能需要提供两个接口,一个确认,一个删除。执行顺序是:消息确认→消息状态改变→删除业务号。这样,就算消息处理者宕机了,之后重新上线,去向生产者重新确认,业务标记也是还在的。而若果宕机导致业务号忘记删除也无所谓,因为消息已经改成可发送了,不会重复确认,唯一的缺点是如果宕机,可能导致生产者那边忘记删除,从而产生内存泄漏,解决方法是给缓存加个失效时间或者定时删除。
```java
@Component
@FeignClient(value = "transaction-order")
public interface ProviderService {
@PostMapping("/rpc/confirm")
public boolean confirm(@RequestBody String msgId);
@PostMapping("/rpc/deleteBusiness")
public String deleteBusiness(@RequestBody String msgId);
}
```
防止消息处理者还没把消息存储到redis,自身就宕机的情况。另外,提供submit跟rollback接口。给生产者成功和失败时调用。
```java
@Service
@FeignClient(value = "transaction-msghandler")
public interface MsgHandlerService {
@PostMapping("/rpc/sendHalfMessage")
public String sendHalfMessage(@RequestBody MQMessage msg);
@Async
@PostMapping("/rpc/submit")
public String submit(@RequestBody MQMessage msg);
@Async
@PostMapping("/rpc/rollback")
public String rollback(@RequestBody MQMessage msg);
}
```
```
消息:
1. 状态1,待发送状态
2. 状态2,可发送状态
3. 状态3,完成状态(直接删除可发送状态的消息)
生产者:
1. 不用配置rabbitmq,而是使用RPC调用,让配置有rabbitmq的消息处理者代发。自己只负责生产消息
2. (暴露接口)需要提供消息确认的接口。目前想到的实现办法是,事务最后需要设置redis或者本地缓存,比如存储业务号, 一旦确认成功,就由消息处理者回调并删除,如果缓存中没有这个业务号,就证明还没执行完或者已经回滚,重试几次后就放弃。
3.(暴露接口)可能还需要提供业务号的删除接口: 按2的方法执行方法,执行顺序: 执行RPC→修改消息为可发送。但是消息处理者可能还没修改消息的状态,就宕机了。因此可能需要提供两个接口,一个确认,一个删除。执行顺序是:消息确认→消息状态改变→删除业务号。这样,就算宕机导致业务号忘记删除也无所谓,因为消息已经改成可发送了,不会重复确认,而生产者那边设置的业务号到时也可以设置过期时间。
4. 业务执行结束时通知消息处理者,异步调用确认方法,形成回调。
消息处理者:
1. 基本方法:设置为待发送,可发送,完成状态的。
2. (暴露接口)向外暴露设置待发送方法。
3. 定时遍历待发送状态的消息,RPC调用生产者的确认方法进行确认,一旦到达时间上限,就失败,删除待发送消息。若成功,修改消息状态为可发送状态
4. 定时遍历可发送状态的消息,将可发送的消息发送给RabbitMQ,一旦消息ack确认成,删除可发送状态的消息,代表已经完成。如果ack失败,继续发送,一旦达到次数上限,抛异常,进入死信队列。
5. (暴露接口)3的子方法sendHalfMessage,只处理单一消息。非定时,确认并修改状态。提供此子方法的原因是方便其他人调用
6. (暴露接口)4的子方法submit,只处理单一消息,非定时,发送消息给rabbitmq。
7. (暴露接口)rollback,删除redis内的待发送消息。
消费者:
1. 进行消息幂等性检查,这里可以用数据库的唯一索引来检查幂等性就行了
2. 手动ack,在业务结束时进行
```
完整调用流程:
1. 生产者远程调用sendHalfMessage
2. 消息处理者收到消息,在redis中加入此消息,key为订单号,并设置为待发送状态
3. 生产者开始执行业务
4. 生产者执行业务结束,设置内存标记
5. (可异步)生产者远程调用调用submit方法。
6. 消息处理者收到消息,远程调用生产者的confirm方法
7. 生产者收到消息,查看缓存,确认业务是否完成,如果业务完成,则返回true。
8. 若返回true,消息处理者就在将redis中此订单号的消息设为可发送状态。
9. 然后消息处理者再远程调用生产者的deleteBusiness方法,通知生产者在缓存中删除此业务号。
10. 最后,消息处理者将此消息发送消息到MQ。
11. 若消息成功发送到MQ,回调ACK机制,消息处理者检查ack是否为true,若是,删除此订单号在redis中的消息
12. (可异步)若出现异常,生成者调用rollback方法,消息处理者收到消息,删除
免责声明:
1.本资源仅供学习和交流使用,不保证其准确性、完整性、及时性或适用性。
2.本资源仅包含一般信息,不构成专业建议。在使用本资源时,请务必自行研究并谨慎决策。
3.我已尽力确保本资源的正确性和合法性,但不对其准确性、完整性和及时性做出保证。
4.本资源不应用于商业用途。
5.在使用本资源的过程中,用户应自行承担所有风险和责任,并遵守相关法律法规。
6.对于因使用本资源而产生的任何损失或损害,我概不负责。
请确保在使用本资源时仔细阅读并遵守以上免责声明。如果您有任何疑问或需要进一步帮助,请联系我。
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
使用rabbitmq实现的分布式事务。解决方案是独立消息服务的最终一致性.zip (73个子文件)
distributed_transaction-master
mvnw.cmd 6KB
pom.xml 4KB
msghandler
mvnw.cmd 6KB
pom.xml 4KB
src
test
java
com
watashi
msghandler
OtherApplicationTests.java 1KB
main
resources
application.yml 514B
java
com
watashi
msghandler
OtherApplication.java 871B
contoller
TestController.java 2KB
RPCController.java 1KB
consumer
DeadConsumer.java 566B
service
ProviderService.java 682B
mqwork
RedisCoordinator.java 4KB
TimeCompensation.java 4KB
Coordinator.java 749B
config
RabbitCallbackConfig.java 2KB
RedisConfig.java 2KB
RabbitConnectionConfig.java 4KB
RabbitTemplateConfig.java 2KB
RabbitConfig.java 2KB
.mvn
wrapper
maven-wrapper.properties 218B
maven-wrapper.jar 50KB
MavenWrapperDownloader.java 5KB
mvnw 10KB
.gitignore 459B
order
mvnw.cmd 6KB
pom.xml 3KB
src
main
resources
application.yml 704B
java
com
watashi
order
mapper
OrderMapper.java 318B
OrderApplication.java 974B
order
OrderController.java 3KB
RPCController.java 1KB
service
OrderService.java 748B
MsgHandlerService.java 884B
TransactionService.java 897B
.mvn
wrapper
maven-wrapper.properties 218B
maven-wrapper.jar 50KB
MavenWrapperDownloader.java 5KB
mvnw 10KB
.gitignore 458B
.mvn
wrapper
maven-wrapper.properties 218B
maven-wrapper.jar 50KB
MavenWrapperDownloader.java 5KB
pay
mvnw.cmd 6KB
pom.xml 3KB
src
test
java
com
watashi
pay
PayApplicationTests.java 213B
main
resources
application.yml 553B
java
com
watashi
pay
mapper
PayMapper.java 236B
service
payService.java 2KB
PayApplication.java 471B
config
RabbitTemplateConfig.java 1KB
.mvn
wrapper
maven-wrapper.properties 218B
maven-wrapper.jar 50KB
MavenWrapperDownloader.java 5KB
mvnw 10KB
.gitignore 397B
123.png 138KB
mvnw 10KB
.gitignore 460B
common-api
mvnw.cmd 6KB
pom.xml 2KB
src
test
java
com
example
demo
DemoApplicationTests.java 215B
main
resources
application.properties 1B
java
com
watashi
api
Pay.java 257B
MessageUtil.java 2KB
Order.java 260B
MQMessage.java 794B
MQConstants.java 563B
.mvn
wrapper
maven-wrapper.properties 218B
maven-wrapper.jar 50KB
MavenWrapperDownloader.java 5KB
mvnw 10KB
.gitignore 458B
README.md 6KB
共 73 条
- 1
资源评论
武昌库里写JAVA
- 粉丝: 3137
- 资源: 1872
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功