# springboot整合ActiveMQ实现异步交易
## 前言
前段时间,我们分享了`ActiveMQ`的一些基本知识,介绍了`ActiveMQ`的简单部署和基本用法,演示了`java`环境使用`ActiveMQ`收发消息的简单操作,但当时只讲了`ActiveMQ`,`demo`也不是`web`项目,距离我们实际应用确实也比较远,为了让各位小伙伴更够更直观地了解`ActiveMQ`的实际应用场景,体会到异步交易的魅力,今天我们通过一个小小的`demo`,来看下`springboot`和`ActiveMQ`的整合应用。
今天的核心知识点就两个:
- `Springboot`异步交易
- `springboot`整合`ActiveMQ`
好了,话不多说,我们直接开始。
## 正文
我们的内容,是以文件异步导出业务为例写的一些业务代码。我先简单说下业务处理过程,第一步,用户发起文件导出请求,后端接收到前端请求后,验证请求参数,并发起异步文件导出交易,交易发起成功后返回结果。
第二步,导出成功后,用户可以在文件下载中心进行下载。
![](https://gitee.com/sysker/picBed/raw/master/images/20210523151728.png)
为了演示方便,我把所有数据都存放在`reids`里面了,一般实际项目中会把文件信息存放在数据库中,处理成功后才会放进缓存。项目的完整源码附在文末,有兴趣的小伙伴自己去看。
### 启用JMS
创建项目,我们这里就不介绍,到今天还不会搭建`springboot`开发环境,确实该面壁思过了。项目创建完成后,在`springboot`入口加上如下配置启用`jms`(`java message servic`):
```
@EnableJms
```
### 引入依赖
除了`spring-boot-starter-web`,这里我们还需要引入如下依赖:
```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.10</version>
</dependency>
<!-- 在2.X版本,spring.activemq.pool.enabled=true时,需依赖该jar -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
<version>1.0.3</version>
</dependency>
```
这里简单说明下,第一个依赖是`activemq`的`starter`,是`activemq`组件的核心依赖,所有的组件都是基于他展开的;
第二个依赖是`activemq`的连接池,类似于数据库连接池;
第三个依赖是`activemq`自动配置类依赖的包。
后面两个依赖是可选的,如果你启用了`activeMQ`连接池(`spring.activemq.pool.enabled=true`时),那你就必须依赖,没有依赖的话,`sprinbgoot`启动会报错:
![](https://gitee.com/sysker/picBed/raw/master/images/20210523134208.png)
主要原因是`activemq`的自动配置时依赖了这个包,没有这个包`Jms`的连接工厂是无法被初始化的:
![](https://gitee.com/sysker/picBed/raw/master/images/20210523134319.png)
![](https://gitee.com/sysker/picBed/raw/master/images/20210523134425.png)
有兴趣的小伙伴可以自己把这个依赖先拿掉试下。
### 添加配置
完成上面的工作,我们要启动本地的`ActvieMQ`服务,然后添加`ActvieMQ`配置信息:
```properties
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100
```
如果不需要连接池,后面两个配置可以直接拿掉。
### 消息发送接口
发送接口 就是消息的生产者,`springboot`提供了消息的模板类(`JmsMessagingTemplate`),我们可以通过`Autowired`注入使用:
```java
@Service
public class JmsSendService {
@Autowired
private JmsMessagingTemplate jmsTemplate;
public void sendMessage(String queueName, String message) {
jmsTemplate.convertAndSend(queueName, message);
}
}
```
`ActiveMQ`支持有返回值和无返回值两种会话形式,你可以根据自己的需要选择,`JmsMessagingTemplate`都是支持的,提供的模板方法也比较丰富:
![](https://gitee.com/sysker/picBed/raw/master/images/20210523141400.png)
这里我们只用到了`convertAndSend`,字面意思就是方法的意思,`object`是消息内容,`destination`是消息队列名称,看下源码你就知道,方法内部会把我们的消息内容转换成`Message`对象,当然如果你有特殊需求,你也可以自己组装`Message`,只是过程比较繁琐,简单业务的话,用我这种方式就比较简便了。
如果你需要接收返回值,那你可以调用`sendAndReceive(Message<T> var)`接口来实现,但是需要你自己定义自己的`Message<T>`,需要实现`Message<T>`接口。
```java
class StringMessage implements Message<String> {
private String payload;
private MessageHeaders messageHeaders;
public StringMessage(String payload) {
this.payload = payload;
}
@Override
public String getPayload() {
return this.payload;
}
@Override
public MessageHeaders getHeaders() {
return this.messageHeaders;
}
}
```
调用`sendAndReceive`:
```java
public String sendAndReceive(String queueName, String message) {
Message<?> messageBack = jmsTemplate.sendAndReceive(queueName, new StringMessage(message));
return (String)messageBack.getPayload();
}
```
`JmsMessagingTemplate`其实就是`springboot`抽象出来的一个通用的消息发送模板,它理论上是可以支持所有`mq`的,只需要官方提供`starter`即可,对开发者来说,确实比较友好,只需要修改配置,剩下的就不用管了,很方便有木有。
这里是`servicce`层的实现过程:
```java
/**
* 文件导出
* @param name
* @param userId
* @return
*/
public JSONObject export(String userId, String name) {
JSONObject result = new JSONObject();
result.put("userId", userId);
result.put("type", 0);
String uuId = UUIDUtil.getUUId();
result.put("fileId", uuId);
result.put("name", name);
// 异步导出文件
doExport(result);
result.put("success", true);
result.put("code", 0);
result.put("message", "数据导出提交成功,请稍后到文件中心下载!");
return result;
}
```
### springboot异步交易
导出文件方法`doExport`内部,我们使用了多线程异步交易,这样的好处是把业务逻辑都放进异步交易中处理,可以将响应结果更快地呈现给用户,让接口响应更快。这里我们插个楼,讲一些`springboot`异步线程池的用法。
#### 启用异步交易
`springboot`启动异步交易很简单,只需要在项目入口加上`@EnableAsync`即可
#### 添加异步线程池配置
配置线程池大小
```java
@Configuration
public class ExcuterConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(150);
executor.setQueueCapacity(500);
return executor;
}
}
```
#### 使用线程池
这里的`taskExecutor`就是我们前面配置的方法名。这里的异步线程和`mq`的异步交易是不一样的。线程池大小是固定的,当所有线程被阻塞,线程池队列也被占满,有新的交易进来时,线程池会因为资源耗尽报错,这时候后续业务是无法正常处理的;但是`mq`基本上是不存在阻塞资源耗尽的情况的(除非资源耗尽),特别是对于不需要有返回指定的交易,它只是一个消息仓库,只要消息不被消费,消息是可以一直存在的,也不会超时。
```java
@Async("taskExecutor") void doExport(JSONObject jsonObject) { try { String name = jsonObject.getString("name"); if (StringUtils.hasLength(name)) { String userId = jsonObject.getString("userId");