package com.test.rocketmq.mq;
import com.alibaba.fastjson.JSONObject;
import com.test.rocketmq.dao.pojo.Account;
import com.test.rocketmq.dao.pojo.Journal;
import com.test.rocketmq.service.AccountService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
/**
* @author: WangSaiChao
* @date: 2018/2/12
* @description: 消息消费者
*/
@Component
public class MessageListenerImpl implements MessageListenerConcurrently {
@Autowired
private AccountService accountService;
@Override
@Transactional
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg=msgs.get(0);
try {
String topic = msg.getTopic();
//Message Body
JSONObject messageBody=JSONObject.parseObject(new String(msg.getBody(),"UTF-8"));
String tags = msg.getTags();
String keys = msg.getKeys();
String userId = messageBody.getString("userId");
String orderId = messageBody.getString("orderId");
Double amount = messageBody.getDouble("amount");
//打印
System.out.println(topic+" "+tags+" "+keys+" 用户id为: "+userId+",订单号为: " + orderId + ",消费金额为:"+amount);
//业务逻辑处理,根据用户id去数据库中 将账户余额更新
Account account = new Account();
Date date = new Date();
account.setAmount(amount);
account.setUserId(userId);
account.setUpdateTime(date);
accountService.update(account);
//保存记录到账户流水表
Journal journal = new Journal();
journal.setOrderId(orderId);
journal.setUserId(userId);
journal.setAmount(amount);
journal.setUpdateTime(date);
accountService.insert(journal);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
//重试次数为3情况
if(msg.getReconsumeTimes() == 3){
/**
* 如果连续重发3次还是发布出去 证明该订单可能哪里有错
* 保存错误日志,然后就不再发送消息了,需要人工介入,逻辑我就不写了
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//如果没超过三次,稍后再次发送
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
SpringBoot整合rocketmq事务消息
共35个文件
java:18个
xml:8个
ds_store:5个
需积分: 48 69 下载量 12 浏览量
2018-02-13
17:26:54
上传
评论
收藏 56KB ZIP 举报
温馨提示
SpringBoot整合rocketmq地址http://blog.csdn.net/qq_34021712/article/details/79322459
资源推荐
资源详情
资源评论
收起资源包目录
SpringBoot整合RocketMQ.zip (35个子文件)
transaction-message-consumer
.DS_Store 6KB
src
.DS_Store 6KB
test
resources
java
main
.DS_Store 6KB
build
package.xml 2KB
resources
.DS_Store 6KB
mapper
AccountMapper.xml 712B
config
application.properties 481B
logback-spring.xml 5KB
path.properties 21B
java
com
test
rocketmq
service
AccountService.java 771B
mq
MessageListenerImpl.java 3KB
utils
FastJsonUtil.java 3KB
ConsumerApplication.java 744B
dao
pojo
Journal.java 1KB
Account.java 932B
AccountMapper.java 493B
config
RmqCustomerConfig.java 2KB
DataSourceConfig.java 2KB
pom.xml 4KB
transaction-message-producer
.DS_Store 6KB
src
test
resources
java
main
build
package.xml 2KB
resources
mapper
OrderMapper.xml 727B
config
application.properties 481B
logback-spring.xml 5KB
path.properties 21B
java
com
test
rocketmq
service
OrderService.java 3KB
mq
TransactionExecuterImpl.java 2KB
utils
FastJsonUtil.java 3KB
controller
OrderController.java 901B
dao
pojo
Order.java 2KB
OrderMapper.java 419B
config
DataSourceConfig.java 2KB
RmqProducerConfig.java 2KB
ProducerApplication.java 744B
pom.xml 4KB
共 35 条
- 1
资源评论
这个名字想了很久
- 粉丝: 1518
- 资源: 51
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功