package jiankunking.kafkajstorm.kafka;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.KafkaException;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.UnresolvedAddressException;
import java.util.*;
/**
*
* @author feilaoda
*
*/
public class KafkaConsumer {
private static Logger LOG = Logger.getLogger(KafkaConsumer.class);
public static final int NO_OFFSET = -1;
private int status;
private SimpleConsumer consumer = null;
private KafkaSpoutConfig config;
private LinkedList<Host> brokerList;
private int brokerIndex;
private Broker leaderBroker;
public KafkaConsumer(KafkaSpoutConfig config) {
this.config = config;
this.brokerList = new LinkedList<Host>(config.brokers);
this.brokerIndex = 0;
}
public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOException {
String topic = config.topic;
FetchRequest req = new FetchRequestBuilder().clientId(config.clientId).addFetch(topic, partition, offset, config.fetchMaxBytes)
.maxWait(config.fetchWaitMaxMs).build();
FetchResponse fetchResponse = null;
SimpleConsumer simpleConsumer = null;
try {
simpleConsumer = findLeaderConsumer(partition);
if (simpleConsumer == null) {
// LOG.error(message);
return null;
}
fetchResponse = simpleConsumer.fetch(req);
} catch (Exception e) {
if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException
|| e instanceof UnresolvedAddressException) {
LOG.warn("Network error when fetching messages:", e);
if (simpleConsumer != null) {
String host = simpleConsumer.host();
int port = simpleConsumer.port();
simpleConsumer = null;
throw new KafkaException("Network error when fetching messages: " + host + ":" + port + " , " + e.getMessage(), e);
}
} else {
throw new RuntimeException(e);
}
}
if (fetchResponse.hasError()) {
short code = fetchResponse.errorCode(topic, partition);
if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) {
long startOffset = getOffset(topic, partition, config.startOffsetTime);
offset = startOffset;
}
if(leaderBroker != null) {
LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition["
+ partition + "] error:" + code);
}else {
}
return null;
} else {
ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, partition);
return msgs;
}
}
private SimpleConsumer findLeaderConsumer(int partition) {
try {
if (consumer != null) {
return consumer;
}
PartitionMetadata metadata = findLeader(partition);
if (metadata == null) {
leaderBroker = null;
consumer = null;
return null;
}
leaderBroker = metadata.leader();
consumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
config.clientId);
return consumer;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return null;
}
protected PartitionMetadata findLeader(int partition) {
PartitionMetadata returnMetaData = null;
int errors = 0;
int size = brokerList.size();
Host brokerHost = brokerList.get(brokerIndex);
try {
if (consumer == null) {
consumer = new SimpleConsumer(brokerHost.getHost(), brokerHost.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
config.clientId);
}
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
consumer = null;
}
int i = brokerIndex;
loop: while (i < size && errors < size + 1) {
Host host = brokerList.get(i);
i = (i + 1) % size;
brokerIndex = i; // next index
try {
if (consumer == null) {
consumer = new SimpleConsumer(host.getHost(), host.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes,
config.clientId);
}
List<String> topics = Collections.singletonList(config.topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
TopicMetadataResponse resp = null;
try {
resp = consumer.send(req);
} catch (Exception e) {
errors += 1;
LOG.error("findLeader error, broker:" + host.toString() + ", will change to next broker index:" + (i + 1) % size);
if (consumer != null) {
consumer.close();
consumer = null;
}
continue;
}
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
LOG.error("Error communicating with Broker:" + host.toString() + ", find Leader for partition:" + partition);
} finally {
if (consumer != null) {
consumer.close();
consumer = null;
}
}
}
return returnMetaData;
}
public long getOffset(String topic, int partition, long startOffsetTime) {
SimpleConsumer simpleConsumer = findLeaderConsumer(partition);
if (simpleConsumer == null) {
LOG.error("Error consumer is null get offset from partition:" + partition);
return -1;
}
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId());
long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition);
if (offsets.length > 0) {
return offsets[0];
} else {
return NO_OFFSET;
}
}
public void close() {
if (consumer != null) {
consumer.close();
}
}
public SimpleConsumer getConsumer() {
return consumer;
}
public void setConsumer(SimpleConsumer consumer) {
this.consumer = consumer;
}
public int getStatus() {
return status;
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
jstorm集成kafka插件demo.zip (68个子文件)
kafka-jstorm
pom.xml 5KB
target
generated-sources
annotations
classes
jiankunking
kafkajstorm
bolts
CustomBolt.class 2KB
kafka
KafkaSpout.class 5KB
KafkaMessageId.class 816B
KafkaSpoutConfig.class 5KB
Host.class 1KB
PartitionConsumer.class 9KB
PartitionConsumer$EmitState.class 1KB
KafkaConsumer.class 9KB
PartitionCoordinator.class 3KB
ZkState.class 6KB
topologies
CustomCounterTopology.class 3KB
util
ByteUtil.class 571B
StringUtil.class 522B
PropertiesUtil.class 2KB
logback.xml 2KB
application.properties 246B
src
test
java
main
resources
logback.xml 2KB
application.properties 252B
java
jiankunking
kafkajstorm
bolts
CustomBolt.java 1KB
kafka
KafkaSpout.java 3KB
Host.java 1KB
KafkaMessageId.java 499B
KafkaConsumer.java 8KB
KafkaSpoutConfig.java 4KB
ZkState.java 3KB
PartitionCoordinator.java 2KB
PartitionConsumer.java 7KB
topologies
CustomCounterTopology.java 2KB
util
ByteUtil.java 328B
StringUtil.java 236B
PropertiesUtil.java 2KB
kafka-jstorm.iml 3KB
.idea
libraries
Maven__org_apache_zookeeper_zookeeper_3_4_5.xml 550B
Maven__javax_servlet_servlet_api_2_5.xml 522B
Maven__jline_jline_0_9_94.xml 469B
Maven__ch_qos_logback_logback_classic_1_0_13.xml 575B
Maven__org_apache_httpcomponents_httpcore_4_3_2.xml 563B
Maven__ch_qos_logback_logback_core_1_0_13.xml 554B
Maven__org_jboss_netty_netty_3_2_2_Final.xml 544B
Maven__org_apache_httpcomponents_httpclient_4_3_3.xml 577B
Maven__org_slf4j_slf4j_api_1_7_5.xml 506B
Maven__org_clojure_clojure_1_6_0.xml 500B
Maven__org_apache_curator_curator_framework_2_5_0.xml 598B
Maven__com_yammer_metrics_metrics_core_2_2_0.xml 563B
Maven__org_apache_curator_curator_client_2_5_0.xml 577B
Maven__commons_logging_commons_logging_1_1_3.xml 572B
Maven__org_apache_kafka_kafka_2_9_2_0_8_1.xml 548B
Maven__com_alibaba_jstorm_jstorm_core_2_2_1.xml 556B
Maven__com_googlecode_json_simple_json_simple_1_1.xml 574B
Maven__com_101tec_zkclient_0_3.xml 489B
Maven__commons_codec_commons_codec_1_6.xml 536B
Maven__io_netty_netty_3_9_0_Final.xml 516B
Maven__net_sf_jopt_simple_jopt_simple_3_2.xml 542B
Maven__org_scala_lang_scala_library_2_9_2.xml 554B
Maven__org_xerial_snappy_snappy_java_1_0_5.xml 552B
Maven__org_slf4j_log4j_over_slf4j_1_7_10.xml 562B
Maven__com_yammer_metrics_metrics_annotation_2_2_0.xml 605B
Maven__junit_junit_3_8_1.xml 462B
Maven__com_google_guava_guava_16_0_1.xml 513B
Maven__org_rocksdb_rocksdbjni_4_3_1.xml 521B
workspace.xml 59KB
encodings.xml 172B
misc.xml 1KB
kotlinc.xml 232B
modules.xml 264B
.name 11B
compiler.xml 636B
共 68 条
- 1
衣舞晨风
- 粉丝: 3759
- 资源: 117
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
前往页