没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
RabbitMQ作为一个中间件,本质上是一个消息的代理,在这个领域还有ActiveMQ、RocketMQ、 ZeroMQ、Joram、Kafka等等。其中ActiveMQ是Apache公司开源的消息系统,使用Java语言开发,功能 较为完善,被大量开源项目所使用。而RocketMQ是阿里开源的消息中间件,他也是纯Java开发,具有高吞 吐量、高可用性、适合大规模分布式系统应用的特点。接下来的Kafka是LinkedIn开源的分布式发布-订阅消 息系统,目前这个项目已经属于Apache顶级项目。Kafka的主要特点是基于Pull的模式来处理消息消息,追 求高吞吐量,后面的Kafka学习文档中会详细讲解,这里就不一一展开了
资源推荐
资源详情
资源评论
RabbitMQ 学习文档
消息队列 AMQP
1. RabbitMQ的简介
RabbitMQ作为一个中间件,本质上是一个消息的代理,在这个领域还有ActiveMQ、RocketMQ、
ZeroMQ、Joram、Kafka等等。其中ActiveMQ是Apache公司开源的消息系统,使用Java语言开发,功能
较为完善,被大量开源项目所使用。而RocketMQ是阿里开源的消息中间件,他也是纯Java开发,具有高吞
吐量、高可用性、适合大规模分布式系统应用的特点。接下来的Kafka是LinkedIn开源的分布式发布-订阅消
息系统,目前这个项目已经属于Apache顶级项目。Kafka的主要特点是基于Pull的模式来处理消息消息,追
求高吞吐量,后面的Kafka学习文档中会详细讲解,这里就不一一展开了。
认识一个东西我们首先从怎么使用它开始。
首先从 [Github上下载RabbitMQ的源码]
下载后好源码,按照指示进行编译安装。下面先对Rabbit里面会用到的术语进行一个说明。
名称 解释
Producer(生产者) 发送消息的程序
Consumer(消费者) 消息的接收处理程序
Queue(队列) 放置消息的仓库
2. RabbitMQ的使用
2.1 初识RabbitMQ
首先我们使用一个“Hello World”走进RabbitMQ的世界
在这个demo中我们首先有两个Java程序,一个扮演生产者的角色,一个扮演消费者的角色。使用
RabbitMQ的一个消息队列来作为消息的仓库。基本的流程如下。
生产者:
1. import com.rabbitmq.client.Channel;
2. import com.rabbitmq.client.Connection;
3. import com.rabbitmq.client.ConnectionFactory;
4. /**
5. * 消息生产者
6. *
7. * @author hzwangqing2
8. * @create 2016-08-27-15:11
9. */
10. public class P {
11. private final static String QUEUE_NAME = "netease";
12. public static void main(String[] argv) throws Exception {
13. // 创建连接工厂
14. ConnectionFactory factory = new ConnectionFactory();
15. // 设置RabbitMQ地址
16. factory.setHost("localhost");
17. // 创建一个新的连接
18. Connection connection = factory.newConnection();
19. // 创建一个频道
20. Channel channel = connection.createChannel();
21. // 声明一个队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是
其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如
果存在,不会对已经存在的队列产生任何影响。
22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23. String message = "Hello World!";
24. // 发送消息到队列中
25. channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UT
F-8"));
26. System.out.println("P [x] Sent '" + message + "'");
27. // 关闭频道和连接
28. channel.close();
29. connection.close();
30. }
31. }
消费者:
1. import com.rabbitmq.client.*;
2. import java.io.IOException;
3.
4. /**
5. * 消息消费者
6. *
7. * @author hzwangqing2
8. * @create 2016-08-27-16:31
9. */
10. public class C {
11.
12. private final static String QUEUE_NAME = "netease";
13.
14. public static void main(String[] argv) throws Exception {
15. // 创建连接工厂
16. ConnectionFactory factory = new ConnectionFactory();
17. // 设置RabbitMQ地址
18. factory.setHost("localhost");
19. // 创建一个新的连接
20. Connection connection = factory.newConnection();
21. // 创建一个频道
22. Channel channel = connection.createChannel();
23. // 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特
点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建
,如果存在,不会对已经存在的队列产生任何影响。
24. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25. System.out.println("C [*] Waiting for messages. To exit press C
TRL+C");
26. // DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我
们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
27. Consumer consumer = new DefaultConsumer(channel) {
28. @Override
29. public void handleDelivery(String consumerTag, Envelope env
elope, AMQP.BasicProperties properties, byte[] body) throws
IOException {
30. String message = new String(body, "UTF-8");
31. System.out.println("C [x] Received '" + message + "'");
32. }
33. };
34. // 自动回复队列应答 -- RabbitMQ中的消息确认机制,后面章节会详细讲解
35. channel.basicConsume(QUEUE_NAME, true, consumer);
36. }
37. }
demo写好了后,我们来运行一下这两个程序,我们首先运行消费者程序。
1. C [*] Waiting for messages. To exit press CTRL+C
然后我们再运行一个生产者程序。
1. P [x] Sent 'Hello World!'
当我们再次切回消费者界面的时候。我们会看到下面的显示。
1. C [*] Waiting for messages. To exit press CTRL+C
2. C [x] Received 'Hello World!'
上面就是一个最简单的RabbitMQ的消息流,后面我们会更加详细的分析RabbitMQ的其他高级功能。
2.2 工作队列
上一节中我们学习了最简单的消息队列模型,但是实际使用中我们往往不是如此简单的粗暴的去使用一个队
列。通常我们还会有这样的需求--我们生产者产生了一个消息,但是这个消息在出队列后需要进行一些复杂
的处理,这个处理非常耗时,自然的这个处理过程就成了整个系统的瓶颈所在。那么我们能不能并行去处理
这些消息呢?答案是可以的。RabbitMQ的工作队列就是为了来解决这个问题的。模型如下。
下面先给出消息的生产者:
1. import com.rabbitmq.client.Channel;
2. import com.rabbitmq.client.Connection;
3. import com.rabbitmq.client.ConnectionFactory;
4.
5. /**
6. * 消息生产者
7. *
8. * @author hzwangqing2
9. * @create 2016-08-27-16:31
10. */
11. public class WorkQueueTask {
12.
13. private static final String TASK_QUEUE_NAME = "netease_queue";
14.
15. public static void main(String[] argv) throws java.io.IOException,
Exception {
16.
17. ConnectionFactory factory = new ConnectionFactory();
18. factory.setHost("localhost");
19. Connection connection = factory.newConnection();
20. Channel channel = connection.createChannel();
21.
22. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
;
23. // 分发消息 模拟了5个不同的消息
24. for(int i = 0 ; i < 5; i++){
25. String message = "Hello World! " + i;
26. channel.basicPublish("", TASK_QUEUE_NAME, null, message.get
Bytes());
27. System.out.println(" [x] Sent '" + message + "'");
28. }
29. channel.close();
30. connection.close();
31. }
32. }
消息的生产者有了。我们现在模拟几个消息的消费者,但是不同于上一个demo,为了展示更好的效果,我
们通过让线程睡眠2秒来模拟消息处理的时间。
消息的消费者_1:
1. import com.rabbitmq.client.*;
2. import java.io.IOException;
3.
4. public class Worker1 {
5. private static final String TASK_QUEUE_NAME = "netease_queue";
6.
7. public static void main(String[] argv) throws Exception {
8. ConnectionFactory factory = new ConnectionFactory();
9. factory.setHost("localhost");
10. final Connection connection = factory.newConnection();
11. final Channel channel = connection.createChannel();
12.
13. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null)
;
14. System.out.println("Worker1 [*] Waiting for messages. To exit p
ress CTRL+C");
15. // 每次从队列中获取数量
剩余48页未读,继续阅读
资源评论
ykm77777
- 粉丝: 0
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- (源码)基于C++和C混合模式的操作系统开发项目.zip
- (源码)基于Arduino的全球天气监控系统.zip
- OpenCVForUnity2.6.0.unitypackage
- (源码)基于SimPy和贝叶斯优化的流程仿真系统.zip
- (源码)基于Java Web的个人信息管理系统.zip
- (源码)基于C++和OTL4的PostgreSQL数据库连接系统.zip
- (源码)基于ESP32和AWS IoT Core的室内温湿度监测系统.zip
- (源码)基于Arduino的I2C协议交通灯模拟系统.zip
- coco.names 文件
- (源码)基于Spring Boot和Vue的房屋租赁管理系统.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功