ActiveMQ使用示例之Topic.docx
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
### ActiveMQ 使用示例之 Topic #### 一、概述 ActiveMQ 是一款开源的消息中间件,支持多种消息传输模式,包括点对点 (Queue) 和发布/订阅 (Topic) 模式。在发布/订阅模式中,多个订阅者可以订阅同一个主题 (Topic),发布者向该主题发送的消息将被所有订阅该主题的订阅者接收。本文档将详细介绍如何使用 ActiveMQ 进行 Topic 消息的发送与接收,并提供示例代码。 #### 二、非持久的 Topic 消息 ##### 1. 发送非持久的 Topic 消息 非持久消息是指那些不会被持久化存储在磁盘上的消息,一旦发送,如果没有订阅者接收,则这些消息将会丢失。非持久消息适用于那些对消息可靠性要求不高的场景,例如实时聊天或通知等。 **步骤**: - 创建连接工厂 (`ConnectionFactory`) 并设置连接参数。 - 获取连接 (`Connection`) 并启动连接。 - 创建会话 (`Session`)。 - 创建 Topic 目的地 (`Destination`)。 - 创建消息生产者 (`MessageProducer`)。 - 发送消息。 **示例代码**: ```java public class NoPersistenceSender { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKER_URL = "tcp://192.168.0.101:61616"; private static final int SEND_NUM = 10; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; Destination destination; MessageProducer messageProducer; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("MyTopic"); messageProducer = session.createProducer(destination); sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception { for (int i = 0; i < SEND_NUM; i++) { TextMessage message = session.createTextMessage("Message " + i); messageProducer.send(message); } } } ``` ##### 2. 接收非持久的 Topic 消息 接收非持久 Topic 消息时需要注意以下几点: - **订阅者必须在线**:只有当订阅者在线时,它才能接收到发布的消息。 - **创建 Topic 目的地**:与发送消息类似,接收消息也需要创建相应的 Topic 目的地。 - **循环接收消息**:由于不知道客户端发送多少条消息,通常采用 while 循环方式不断尝试接收新消息,直到没有更多消息为止。 **示例代码**: ```java public class NoPersistenceReceiver { private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String BROKER_URL = "tcp://192.168.0.101:61616"; public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session; MessageConsumer consumer; Destination destination; connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("MyTopic"); consumer = session.createConsumer(destination); receiveMessages(consumer); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } public static void receiveMessages(MessageConsumer consumer) throws Exception { Message message; while ((message = consumer.receive(1000)) != null) { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; System.out.println("Received message: " + txtMsg.getText()); } } } } ``` #### 三、总结 本篇文章详细介绍了如何使用 ActiveMQ 进行非持久 Topic 消息的发送与接收。通过示例代码我们可以看到,在 Topic 模式下,消息被发布到特定的 Topic 中,而订阅该 Topic 的多个订阅者都可以接收到这条消息。这种机制非常适合用于广播消息或一对多的消息传递场景。同时需要注意的是,非持久消息不会被保存,如果订阅者离线,则可能会丢失消息。因此在设计系统时需要根据业务需求选择合适的持久性策略。
- 粉丝: 257
- 资源: 1940
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 计组 8.1 cpu.docx
- 基于.NET平台的IFoxCAD Cad二次开发设计源码
- 计组 7.2 指令系统.docx
- 计组 5.5 浮点加减运算 7.1 指令系统.docx
- 计组 5.4 定点乘法运算.docx
- 基于嵌入式应用的C语言常用代码模块库设计源码
- 计组一二章习题.docx
- 计组 5.1 定点数据表示.docx
- 计组存储器习题 1.docx
- 基于CSS、Java、JavaScript、HTML的2022年Internet Cafe MVC设计源码
- 基于Vue框架的智能粮食检测仓移动端H5设计源码
- 基于Python与Shell语言优化的yolo改进与陆小马公众号设计源码
- OpenWrt软件编译构建系统详解及自定义模块构建实践
- 考到 3.7 死锁的检测与解除.docx
- 基于Java语言的江山市房产信息网站设计源码
- os 存储器管理.docx