/*
* (C) 2007-2012 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Authors:
* wuhua <wq163@163.com> , boyan <killme2008@gmail.com>
*/
package cn.itcast.storm.spout;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import com.taobao.gecko.core.util.LinkedTransferQueue;
import com.taobao.metamorphosis.Message;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.consumer.MessageListener;
import com.taobao.metamorphosis.exception.MetaClientException;
/**
* 支持metamorphosis消息消费的storm spout
*
* @author boyan(boyan@taobao.com)
* @date 2011-11-8
*
*/
public class MetaSpout implements IRichSpout {
private static final long serialVersionUID = 4382748324382L;
public static final String FETCH_MAX_SIZE = "meta.fetch.max_size";
//public static final String TOPIC = "meta.topic";
public static final int DEFAULT_MAX_SIZE = 1024 * 1024;
private transient MessageConsumer messageConsumer;
private transient MessageSessionFactory sessionFactory;
private final MetaClientConfig metaClientConfig;
private final ConsumerConfig consumerConfig;
private final String topic;
static final Log log = LogFactory.getLog(MetaSpout.class);
private final Scheme scheme;
/**
* Time in milliseconds to wait for a message from the queue if there is no message ready when the topology requests a tuple (via {@link #nextTuple()}).
*/
public static final long WAIT_FOR_NEXT_MESSAGE = 1L;
private transient ConcurrentHashMap<Long, MetaMessageWrapper> id2wrapperMap;
private transient SpoutOutputCollector collector;
private transient LinkedTransferQueue<MetaMessageWrapper> messageQueue;
public MetaSpout(final MetaClientConfig metaClientConfig, final String topic, final ConsumerConfig consumerConfig, final Scheme scheme) {
super();
this.metaClientConfig = metaClientConfig;
this.consumerConfig = consumerConfig;
this.topic = topic;
this.scheme = scheme;
}
@SuppressWarnings("rawtypes")
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
Integer maxSize = (Integer) conf.get(FETCH_MAX_SIZE);
if (maxSize == null) {
log.warn("Using default FETCH_MAX_SIZE");
maxSize = DEFAULT_MAX_SIZE;
}
this.id2wrapperMap = new ConcurrentHashMap<Long, MetaMessageWrapper>();
this.messageQueue = new LinkedTransferQueue<MetaMessageWrapper>();
try {
this.collector = collector;
this.setUpMeta(topic, maxSize);
} catch (final MetaClientException e) {
log.error("Setup meta consumer failed", e);
}
}
private void setUpMeta(final String topic, final Integer maxSize) throws MetaClientException {
this.sessionFactory = new MetaMessageSessionFactory(this.metaClientConfig);
this.messageConsumer = this.sessionFactory.createConsumer(this.consumerConfig);
this.messageConsumer.subscribe(topic, maxSize, new MessageListener() {
public void recieveMessages(final Message message) {
final MetaMessageWrapper wrapper = new MetaMessageWrapper(message);
MetaSpout.this.id2wrapperMap.put(message.getId(), wrapper);
MetaSpout.this.messageQueue.offer(wrapper);
try {
wrapper.latch.await();
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
//消费失败,抛出运行时异常
if (!wrapper.success) {
throw new RuntimeException("Consume message failed");
}
}
public Executor getExecutor() {
return null;
}
}).completeSubscribe();
}
public void close() {
try {
this.messageConsumer.shutdown();
} catch (final MetaClientException e) {
log.error("Shutdown consumer failed", e);
}
try {
this.sessionFactory.shutdown();
} catch (final MetaClientException e) {
log.error("Shutdown session factory failed", e);
}
}
public void nextTuple() {
if (this.messageConsumer != null) {
try {
final MetaMessageWrapper wrapper = this.messageQueue.poll(WAIT_FOR_NEXT_MESSAGE, TimeUnit.MILLISECONDS);
if (wrapper == null) {
return;
}
final Message message = wrapper.message;
this.collector.emit(this.scheme.deserialize(message.getData()), message.getId());
} catch (final InterruptedException e) {
// interrupted while waiting for message, big deal
}
}
}
public void ack(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = true;
wrapper.latch.countDown();
} else {
log.warn(String.format("don't know how to ack(%s: %s)", msgId.getClass().getName(), msgId));
}
}
public void fail(final Object msgId) {
if (msgId instanceof Long) {
final long id = (Long) msgId;
final MetaMessageWrapper wrapper = this.id2wrapperMap.remove(id);
if (wrapper == null) {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
return;
}
wrapper.success = false;
wrapper.latch.countDown();
} else {
log.warn(String.format("don't know how to reject(%s: %s)", msgId.getClass().getName(), msgId));
}
}
public void declareOutputFields(final OutputFieldsDeclarer declarer) {
declarer.declare(this.scheme.getOutputFields());
}
public boolean isDistributed() {
return true;
}
public void activate() {
}
public void deactivate() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
storm集群的搭建-java示例代码.zip (119个子文件)
.classpath 4KB
.classpath 4KB
.classpath 4KB
.classpath 4KB
.classpath 3KB
storm-0.8.2.jar 4.83MB
clojure-1.4.0.jar 3.26MB
guava-13.0.jar 1.8MB
metamorphosis-dashboard-1.4.6.2.jar 1.03MB
mysql-connector-java-5.1.24-bin.jar 826KB
c3p0-0.9.1.2.jar 596KB
zookeeper-3.3.3.jar 588KB
joda-time-2.0.jar 556KB
jetty-6.1.26.jar 527KB
log4j-1.2.16.jar 470KB
jackson-mapper-lgpl-1.4.0.jar 382KB
httpclient-4.1.1.jar 343KB
gecko-1.1.4.jar 320KB
libthrift7-0.7.0.jar 295KB
commons-lang-2.6.jar 278KB
commons-lang-2.5.jar 273KB
snakeyaml-1.9.jar 260KB
jgrapht-0.8.3.jar 240KB
metamorphosis-client-1.4.6.2.jar 231KB
metamorphosis-server-1.4.6.2.jar 228KB
metamorphosis-commons-1.4.6.2.jar 187KB
commons-io-2.4.jar 181KB
httpcore-4.1.jar 177KB
jetty-util-6.1.26.jar 173KB
kryo-2.17.jar 157KB
jackson-core-lgpl-1.4.0.jar 152KB
metamorphosis-tools-1.4.6.2.jar 144KB
servlet-api-2.5-20081211.jar 131KB
junit-3.8.1.jar 118KB
commons-io-1.4.jar 106KB
servlet-api-2.5.jar 103KB
curator-framework-1.0.1.jar 97KB
metamorphosis-server-wrapper-1.4.6.2.jar 93KB
jline-0.9.94.jar 85KB
reflectasm-1.07-shaded.jar 64KB
zkclient-0.3.jar 63KB
commons-logging-1.1.1.jar 59KB
carbonite-1.5.0.jar 57KB
commons-codec-1.4.jar 57KB
commons-fileupload-1.2.1.jar 56KB
commons-exec-1.1.jar 51KB
disruptor-2.10.1.jar 50KB
asm-4.0.jar 45KB
objenesis-1.2.jar 35KB
curator-client-1.0.1.jar 24KB
slf4j-api-1.5.8.jar 23KB
ring-core-1.1.5.jar 21KB
json-simple-1.1.jar 16KB
jzmq-2.1.0.jar 13KB
slf4j-log4j12-1.5.8.jar 9KB
clj-time-0.4.1.jar 9KB
hiccup-0.3.6.jar 8KB
tools.logging-0.2.3.jar 7KB
compojure-1.1.3.jar 6KB
tools.macro-0.1.0.jar 5KB
minlog-1.2.jar 5KB
math.numeric-tower-0.0.1.jar 5KB
tools.cli-0.2.2.jar 3KB
ring-servlet-0.3.11.jar 3KB
core.incubator-0.1.0.jar 3KB
clout-1.0.1.jar 3KB
ring-jetty-adapter-0.3.11.jar 2KB
MetaSpout.java 7KB
MetaSpout.java 7KB
RecordMatcher.java 5KB
TopoMain.java 4KB
MysqlBolt.java 3KB
DBManager.java 3KB
FilterBolt.java 3KB
FileWriterTopo.java 2KB
RandomWordSpout.java 2KB
WriterBolt.java 2KB
MessageSpout.java 2KB
WriterBolt.java 2KB
WordReader.java 2KB
FieldsGroupingBolt.java 2KB
WordCounter.java 2KB
TransferBolt.java 1KB
FileWriterBolt.java 1KB
StringScheme.java 1KB
StringScheme.java 1KB
RandomWordSpout.java 1KB
MetaMessageWrapper.java 1KB
MetaMessageWrapper.java 1KB
WriterBolt.java 1KB
WordCountTopo.java 1KB
SpliterBolt.java 1KB
TopoMain.java 1KB
ReflectionUtil.java 1KB
TopoMain.java 982B
TransferBolt.java 953B
WordSpliter.java 945B
PropertyUtil.java 896B
PropertyUtil.java 795B
TopoMain.java 795B
共 119 条
- 1
- 2
资源评论
小小哭包
- 粉丝: 1900
- 资源: 3864
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- pta题库答案c语言之排序4统计工龄.zip
- pta题库答案c语言之树结构7堆中的路径.zip
- pta题库答案c语言之树结构3TreeTraversalsAgain.zip
- pta题库答案c语言之树结构2ListLeaves.zip
- pta题库答案c语言之树结构1树的同构.zip
- 基于C++实现民航飞行与地图简易管理系统可执行程序+说明+详细注释.zip
- pta题库答案c语言之复杂度1最大子列和问题.zip
- 三维装箱问题(Three-Dimensional Bin Packing Problem,3D-BPP)是一个经典的组合优化问题
- 以下是一些关于Linux线程同步的基本概念和方法.txt
- 以下是一个简化的示例,它使用pygame库来模拟烟花动画的框架.txt
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功