# RocketMQ Example
## Project Instruction
This example illustrates how to use RocketMQ Binder implement pub/sub messages for Spring Cloud applications.
[RocketMQ](https://rocketmq.apache.org/) is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.
Before we start the demo, let's look at Spring Cloud Stream.
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers. It provides opinionated configuration of middleware from several vendors, introducing the concepts of persistent publish-subscribe semantics, consumer groups, and partitions.
There are two concepts in Spring Cloud Stream: Binder 和 Binding.
* Binder: A strategy interface used to bind an app interface to a logical name.
Binder Implementations includes `KafkaMessageChannelBinder` of kafka, `RabbitMessageChannelBinder` of RabbitMQ and `RocketMQMessageChannelBinder` of `RocketMQ`.
* Binding: Including Input Binding and Output Binding.
Binding is Bridge between the external messaging systems and application provided Producers and Consumers of messages.
This is a overview of Spring Cloud Stream.
![](https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/images/SCSt-with-binder.png)
## Preparation
### Download and Startup RocketMQ
You should startup Name Server and Broker before using RocketMQ Binder.
1. Download [RocketMQ](https://archive.apache.org/dist/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip) and unzip it.
2. Startup Name Server
```bash
sh bin/mqnamesrv
```
3. Startup Broker
```bash
sh bin/mqbroker -n localhost:9876
```
### Declare dependency
Add dependency spring-cloud-starter-stream-rocketmq to the `pom.xml` file in your Spring Cloud project.
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
```
## Simple example
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t test-topic
```
### Integration with RocketMQ Binder
Configure Input and Output Binding and cooperate with `@EnableBinding` annotation
```java
@SpringBootApplication
@EnableBinding({ Source.class, Sink.class })
public class RocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
}
```
Configure Binding:
```properties
# configure the nameserver of rocketmq
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
# configure the output binding named output
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
# configure the input binding named input
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=test-group
```
### Start Application
1. Add necessary configurations to file `/src/main/resources/application.properties`.
```properties
spring.application.name=rocketmq-example
server.port=28081
```
2. Start the application in IDE or by building a fatjar.
1. Start in IDE: Find main class `RocketMQApplication`, and execute the main method.
2. Build a fatjar: Execute command `mvn clean package` to build a fatjar, and run command `java -jar rocketmq-example.jar` to start the application.
### Message Handle
Using the binding named output and sent messages to `test-topic` topic.
And using two input bindings to subscribe messages.
* input1: subscribe the message of `test-topic` topic and consume ordered messages(all messages should in the same MessageQueue if you want to consuming ordered messages).
* input2: subscribe the message of `test-topic` topic and consume concurrent messages which tags is `tagStr`, the thread number in pool is 20 in Consumer side.
see the configuration below:
```properties
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.bindings.input1.destination=test-topic
spring.cloud.stream.bindings.input1.content-type=text/plain
spring.cloud.stream.bindings.input1.group=test-group1
spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true
spring.cloud.stream.bindings.input2.destination=test-topic
spring.cloud.stream.bindings.input2.content-type=text/plain
spring.cloud.stream.bindings.input2.group=test-group2
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false
spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr
spring.cloud.stream.bindings.input2.consumer.concurrency=20
```
#### Pub Messages
Using MessageChannel to send messages:
```java
public class ProducerRunner implements CommandLineRunner {
@Autowired
private MessageChannel output;
@Override
public void run(String... args) throws Exception {
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_TAGS, "tagStr");
Message message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
output.send(message);
}
}
```
Or you can using the native API of RocketMQ to send messages:
```java
public class RocketMQProducer {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes());
producer.send(msg);
}
```
#### Sub Messages
Using `@StreamListener` to receive messages:
```java
@Service
public class ReceiveService {
@StreamListener("input1")
public void receiveInput1(String receiveMsg) {
System.out.println("input1 receive: " + receiveMsg);
}
@StreamListener("input2")
public void receiveInput2(String receiveMsg) {
System.out.println("input2 receive: " + receiveMsg);
}
}
```
## Broadcasting exmaple
### Create topic
```sh
sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast
```
### Producer
**application.yml**
```yaml
server:
port: 28085
spring:
application:
name: rocketmq-broadcast-producer-example
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
bindings:
producer-out-0:
producer:
group: output_1
bindings:
producer-out-0:
destination: broadcast
logging:
level:
org.springframework.context.support: debug
```
**code**
Use `ApplicationRunner` and `StreamBridge` to send messages.
```java
@SpringBootApplication
public class RocketMQBroadcastProducerApplication {
private static final Logger log = LoggerFactory
.getLogger(RocketMQBroadcastProducerApplication.class);
@Autowired
private StreamBridge streamBridge;
public static void main(String[] args) {
SpringApplication.run(RocketMQBroadcastProducerApplication.class, args);
}
@Bean
public ApplicationRunner producer() {
return args -> {
for (int i = 0; i < 100; i++) {
String key = "KEY" + i;
Map<String, Object> headers = new HashMap<>();
headers.put(MessageConst.PROPERTY_KEYS, key);
headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i);
Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers);
streamBridge.send("producer-out-0", msg);
}
};
}
}
```
### Consumer
Startup two consumers.
#### Consumer1
**application.yml**
```yaml
server:
port: 28084
spring:
application:
name: rocketmq-broadcast-consumer1-example
cloud:
stream:
function:
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
Spring Cloud Alibaba 致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用微服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来开发分布式应用服务。依托 Spring Cloud Alibaba,您只需要添加一些注解和少量配置,就可以将 Spring Cloud 应用接入阿里微服务解决方案,通过阿里中间件来迅速搭建分布式应用系统
资源推荐
资源详情
资源评论
收起资源包目录
Spring Cloud Alibaba 致力于提供分布式应用服务开发的一站式解决方案 (710个子文件)
nacos-config.adoc 19KB
nacos-config.adoc 19KB
sentinel.adoc 18KB
sentinel.adoc 18KB
nacos-discovery.adoc 14KB
nacos-discovery.adoc 13KB
rocketmq-new.adoc 13KB
rocketmq.adoc 11KB
rocketmq.adoc 10KB
sms.adoc 10KB
acm.adoc 9KB
graalvm.adoc 9KB
sms.adoc 8KB
graalvm.adoc 8KB
acm.adoc 8KB
sca-upgrade-guide.adoc 7KB
oss.adoc 7KB
sca-upgrade-guide.adoc 7KB
sidecar.adoc 6KB
oss.adoc 6KB
seata.adoc 5KB
ans.adoc 5KB
seata.adoc 5KB
schedulerx.adoc 5KB
ans.adoc 5KB
schedulerx.adoc 4KB
CODE_OF_CONDUCT.adoc 2KB
introduction.adoc 2KB
introduction.adoc 2KB
circuitbreaker-sentinel.adoc 2KB
dependency-management.adoc 792B
dependency-management.adoc 750B
spring-cloud-alibaba.adoc 570B
spring-cloud-alibaba.adoc 549B
spring.binders 97B
mvnw.cmd 7KB
broker.conf 970B
broker.conf 162B
broker.conf 162B
broker.conf 162B
jvm.config 95B
Dockerfile 252B
Dockerfile 252B
Dockerfile 241B
Dockerfile 237B
Dockerfile 236B
Dockerfile 236B
Dockerfile 232B
.env 149B
spring.factories 1KB
spring.factories 283B
spring.factories 154B
spring.factories 121B
.gitignore 495B
order.html 4KB
sentinel.html 3KB
rocketmq.html 2KB
errorPage.html 142B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 626B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 310B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 257B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 193B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 177B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 136B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 123B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 109B
org.springframework.boot.autoconfigure.AutoConfiguration.imports 76B
maven-wrapper.jar 59KB
NacosDiscoveryProperties.java 19KB
NacosConfigProperties.java 19KB
RocketMQConsumerProperties.java 13KB
StringUtils.java 11KB
RocketMQProducerMessageHandler.java 11KB
SentinelAutoConfigurationTests.java 11KB
SentinelRestTemplateTests.java 10KB
NacosConfigurationXmlJsonTest.java 10KB
RocketMQMessageChannelBinder.java 10KB
SentinelAutoConfiguration.java 9KB
NacosConfigDataLocationResolverTest.java 9KB
SentinelBeanPostProcessor.java 9KB
MockNamingService.java 9KB
SentinelProperties.java 9KB
NacosConfigurationNoSuffixTest.java 9KB
NacosConfigDataLocationResolver.java 8KB
NacosConfigurationNewTest.java 8KB
RocketMQInboundChannelAdapter.java 8KB
FeignClientCircuitBreakerRuleIntegrationTest.java 8KB
IOUtils.java 8KB
SentinelDataSourceHandler.java 8KB
RocketMQConsumerFactory.java 8KB
JacksonRocketMQHeaderMapper.java 8KB
NacosPropertySourceLocator.java 8KB
NacosConfigurationTests.java 7KB
CircuitBreakerRuleChangeListener.java 7KB
ReactiveSentinelCircuitBreakerIntegrationTest.java 7KB
NacosAutoServiceRegistrationTests.java 7KB
NacosConfigurationTests.java 7KB
RocketMQMessageConverterSupport.java 7KB
DataSourcePropertiesTests.java 7KB
SentinelHealthIndicatorTests.java 6KB
共 710 条
- 1
- 2
- 3
- 4
- 5
- 6
- 8
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 7454
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 中国水系线(1-5级很细致)
- 基于Golang的高并发三方支付系统设计源码,TypeScript+Vue+HTML全栈实现
- 基于Babylon.js的HTML交互式Web设计源码学习教程
- Pyside6简单进销存教程,有开发书和使用书
- 基于HTML/CSS的大学期末静态网页答辩设计源码
- 基于微信小程序的便捷小区业主决策投票小程序设计源码
- 基于Vue框架的农业电商平台后台管理系统设计源码
- 基于Vue和JavaScript的流动治超管理平台前端设计源码
- 基于Vue和JavaScript的百度地图集成展示设计源码
- 基于Vue 3和TypeScript的B2C电商平台优选集设计源码
- XAPK Installer
- 基于Qt5.14.2的简易Qt天气预报设计源码,新手练手利器
- 基于Docker/Qemu/Bochs的Linux 0.11内核开发环境源码设计
- 无标题重生之我竟然要准备信息检索考试
- 11111111145367451111111
- 人工智能视频数据集crowed-people4
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功