没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
试读
35页
Memorphosis是一个消息中间件,它是linkedin开源MQ——kafka的Java版本,针对淘宝内部应用做了定制和优化。Metamorphosis的设计原则 • 消息都是持久的,保存在磁盘 • 吞吐量第一 • 消费状态保存在客户端 • 分布式,生产者、服务器和消费者都可分布
资源推荐
资源详情
资源评论
Memorphosis 是一个消息中间件,它是 linkedin 开源 MQ——kafka 的 Java 版本,
针对淘宝内部应用做了定制和优化。Metamorphosis 的设计原则
� 消息都是持久的,保存在磁盘
� 吞吐量第一
� 消费状态保存在客户端
� 分布式,生产者、服务器和消费者都可分布
Metamorphosis 的部署结构
[] Metamorphosis 的特点
除了完整实现 kafka 的功能之外,我们还为 meta 加入了额外的功能,使得 meta
成为一个更为强大的通用消息中间件,包括
� 彻底用 java 重写的实现,高效的协议和通讯框架
� 发送端的负载均衡
� Master/Slave 异步和同步复制的高可用方案
� 专门用于广播消息的客户端实现
� 与 diamond 结合使用的顺序发送消息功能
� 支持事务,包括本地事务和分布式事务,实现 JTA 规范。
[] Getting started
我们在日常已经部署了 metamorhposis 环境,因此你可以直接在本地测试,如果
你想部署一个自己的服务器,可以参照#服务器部署节。
前面提到,meta 是一个消息中间件。消息中间件中有两个角色:消息生产者和
消息消费者。Meta 里同样有这两个概念,消息生产者负责创建消息并发送到 meta
服务器,meta 服务器会将消息持久化到磁盘,消息消费者从 meta 服务器拉取消
息并提交给应用消费。
[] 消息会话工厂类
在使用消息生产者和消费者之前,我们需要创建它们,这就需要用到消息会话工
厂类——MessageSessionFactory,由这个工厂帮你创建生产者或者消费者。除
了这些,MessageSessionFactory 还默默无闻地在后面帮你做很多事情,包括
1. 服务的查找和发现,通过 diamond 和 zookeeper 帮你查找日常的 meta 服
务器地址列表
2. 连接的创建和销毁,自动创建和销毁到 meta 服务器的连接,并做连接复
用,也就是到同一台 meta 的服务器在一个工厂内只维持一个连接。
3. 消息消费者的消息存储和恢复,后续我们会谈到这一点。
4. 协调和管理各种资源,包括创建的生产者和消费者的。
因此,我们首先需要创建一个会话工厂类,MessageSessionFactory 仅是一个接
口,它的实现类是 MetaMessageSessionFactory
MessageSessionFactory sessionFactory = new
MetaMessageSessionFactory(new MetaClientConfig());
请注意,MessageSessionFactory 应当全局共用一个
[] 消息生产者
翠花,上代码
package com.taobao.metamorphosis.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.client.producer.SendResult;
public class Producer {
public static void main(String[] args) throws Exception {
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new
MetaMessageSessionFactory(new MetaClientConfig());
// create producer,强烈建议使用单例
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = "meta-test";
producer.publish(topic);
BufferedReader reader = new BufferedReader(new
InputStreamReader(System.in));
String line = null;
while ((line = reader.readLine()) != null) {
// send message
SendResult sendResult = producer.sendMessage(new
Message(topic, line.getBytes()));
// check result
if (!sendResult.isSuccess()) {
System.err.println("Send message failed,error
message:" + sendResult.getErrorMessage());
}
else {
System.out.println("Send message successfully,sent to "
+ sendResult.getPartition());
}
}
}
}
消息生产者的接口是 MessageProducer,你可以通过它来发送消息。创建生产者
很简单,通过 MessageSessionFactory 的 createProducer 方法即可以创建一个
生产者。在 Meta 里,每个消息对象都是 Message 类的实例,Message 表示一个
消息对象,它包含这么几个属性:
属性
值
id
消息的唯一 id,系统自动产生,用户无法设置,在发送成功后由服
务器返回,发送失败则为 0。
topic
消息的主题,订阅者订阅该主题即可接收发送到该主题下的消息,
必须
data
消息的有效载荷,也就是消息内容,meta 永远不会修改消息内容,
你发送出去是什么样子,接收到就是什么样子。消息内容限制在 1M
以内,我的建议是最好不要发送超过上百 K 的消息,必须
attribute
消息属性,一个字符串,可选。发送者可设置消息属性来让消费者
过滤。
细心的朋友可能注意到,我们在 sendMessage 之前还调用了 MessageProducer
的 publish(topic)方法
producer.publish(topic);
这一步在发送消息前是必须的,你必须发布你将要发送消息的 topic,这是为了
让会话工厂帮你去查找接收这些 topic 的 meta 服务器地址并初始化连接。这个
步骤针对每个 topic 只需要做一次,多次调用无影响。
总结下这个例子,从标准输入读入你输入的数据,并将数据封装成一个 Message
对象,发送到 topic 为 meta-test 下。
请注意,MessageProducer 是线程安全的,完全可重复使用,因此最好在应用中
作为单例来使用,一次创建,到处使用,配置为 spring 里的 singleton bean。
MessageProducer 创建的代价昂贵,每次都需要通过 zk 查找服务器并创建 tcp
长连接。
[] 消息消费者
发送消息后,消费者可以接收消息了,下面的代码创建消费者并订阅 meta-test
这个主题,等待消息送达并打印消息内容
package com.taobao.metamorphosis.example;
import java.util.concurrent.Executor;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
public class AsyncConsumer {
public static void main(String[] args) throws Exception {
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new
MetaMessageSessionFactory(new MetaClientConfig());
剩余34页未读,继续阅读
资源评论
小小哭包
- 粉丝: 1899
- 资源: 3860
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- Flume进阶-自定义拦截器jar包
- Dubins曲线算法讲解和在运动规划中的使用.pdf
- 上市公司-股票性质数据-工具变量(民企、国企、央企)2003-2022年.dta
- 上市公司-股票性质数据-工具变量(民企、国企、央企)2003-2022年.xlsx
- Reeds+Shepp曲线算法讲解和实现.pdf
- 毕业设计基于SpringBoot+MyBatisPlus+MySQL+Vue的外卖配送信息系统源代码+数据库
- 词向量(Word Embeddings)是自然语言处理(NLP)领域的一种重要技术.txt
- Surfer,线性函数
- MyBatis 的动态 SQL 是其核心特性之一.txt
- 时代的sdddsddsddsd
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功