package com.yting.cloud.kafka.consumer;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.yting.cloud.kafka.entity.User;
import com.yting.cloud.kafka.util.BeanUtils;
/**
* Kafka官网给的案例 SimpleConsumer,饿在Eclipse本地连接服务器测试,所以修改了一些代码
*
* @Author 王扬庭
* @Time 2014-07-18
*
*/
public class UserSimpleConsumer {
public static void main(String args[]) {
UserSimpleConsumer example = new UserSimpleConsumer();
long maxReads = 100;
String topic = "test-user-001";
// int partition = 0; //
int partition = 1; //
List<String> seeds = new ArrayList<String>();
seeds.add("rs229");
seeds.add("rs227");
seeds.add("rs226");
seeds.add("rs198");
seeds.add("rs197");
int port = Integer.parseInt("9092");
try {
example.run(maxReads, topic, partition, seeds, port);
} catch (Exception e) {
System.out.println("Oops:" + e);
e.printStackTrace();
}
}
private List<String> m_replicaBrokers = new ArrayList<String>();
public UserSimpleConsumer() {
m_replicaBrokers = new ArrayList<String>();
}
public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
// find the meta data about the topic and partition we are interested in
//
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
a_partition);
if (metadata == null) {
System.out
.println("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
System.out
.println("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
while (a_maxReads > 0) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,
64 * 1024, clientName);
}
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
System.out.println("Error fetching data from the Broker:"
+ leadBroker + " Reason: " + code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
a_port);
continue;
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset
+ " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
// ===这里就是反序列化=======================================================
User user = (User) BeanUtils.bytes2Object(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + user);
// =========================================================================
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
if (consumer != null)
consumer.close();
}
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out
.println("Error fetching data Offset Data the Broker. Reason: "
+ response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private String findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& i == 0) {
// first time through if the leader hasn't changed give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata.leader().host();
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
}
System.out
.println("Unable to find new leader after Broker failure. Exiting");
throw new Exception(
"Unable to find new leader after Broker failure. Exiting");
}
private PartitionMetadata findLeader(List<String> a_seedBrokers,
int a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", "
+ a_partition + "] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host()
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
妳那伊抹微笑_Kafka之自定义Encoder实现Class级别的数据传送以及解析的Eclipse工程_201407181.rar 该文档与教程http://blog.csdn.net/u012185296/article/details/37924353 配套使用的,主要讲的是Kafka之自定义Encoder实现Class级别的数据传送以及解析 、、、 你也可以到博客地址http://blog.csdn.net/u012185296中去学习相关的云技术,Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ...云计算技术 .....................
资源推荐
资源详情
资源评论
收起资源包目录
_Kafka之自定义Encoder实现Class级别的数据传送以及解析的Eclipse工程_201407181.rar (34个子文件)
cloud-kafka-yting-201407181
bin
com
yting
cloud
kafka
consumer
UserSimpleConsumer.class 10KB
unit
test
TestBeanUtils.class 1KB
producer
UserProducer.class 3KB
partition
HashSimplePartitioner.class 1012B
entity
User.class 2KB
util
BeanUtils.class 2KB
encoder
UserEncoder.class 1KB
.settings
org.eclipse.jdt.core.prefs 629B
src
com
yting
cloud
kafka
consumer
UserSimpleConsumer.java 8KB
unit
test
TestBeanUtils.java 467B
producer
UserProducer.java 2KB
partition
HashSimplePartitioner.java 594B
entity
User.java 1KB
util
BeanUtils.java 1KB
encoder
UserEncoder.java 551B
.project 403B
.classpath 2KB
lib
snappy-java-1.0.5.jar 1.19MB
jopt-simple-3.2.jar 52KB
kafka_2.9.2-0.8.1.1.jar.asc 821B
metrics-core-2.2.0.jar 80KB
kafka_2.9.2-0.8.1.1-scaladoc.jar.asc 821B
slf4j-api-1.7.2.jar 25KB
kafka_2.9.2-0.8.1.1-sources.jar.asc 821B
zookeeper-3.3.4.jar 590KB
kafka_2.9.2-0.8.1.1-sources.jar 4KB
kafka_2.9.2-0.8.1.1-scaladoc.jar 1.58MB
storm-core-0.9.2-incubating.jar 5.06MB
zkclient-0.3.jar 63KB
log4j-1.2.15.jar 383KB
kafka_2.9.2-0.8.1.1.jar 3.23MB
kafka_2.9.2-0.8.1.1-javadoc.jar.asc 821B
scala-library-2.9.2.jar 8.45MB
kafka_2.9.2-0.8.1.1-javadoc.jar 37KB
共 34 条
- 1
资源评论
- qq3303017662016-06-13看了。挺好的。
- iamcndi2016-01-18看了代码kafka配置成功
- 凨行者2015-11-23当时为了解kafka而下载的 好评
那伊抹微笑
- 粉丝: 240
- 资源: 15
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功