package com.lin.demo.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import com.lin.demo.producer.KafkaProducer;
public class KafkaConsumer {
private final ConsumerConnector consumer;
private KafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "127.0.0.1:2181");
// group 代表一个消费组
props.put("group.id", "lingroup");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("rebalance.max.retries", "5");
props.put("rebalance.backoff.ms", "1200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaProducer.TOPIC, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext()) {
System.out.println("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" + it.next().message() + "<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
/* KafkaProducer kafkaProducer = new KafkaProducer();
kafkaProducer.produce();*/
}
}
public static void main(String[] args) {
new KafkaConsumer().consume();
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
Kafka实例Kafka实例
共79个文件
jar:56个
asc:5个
properties:2个
需积分: 50 52 下载量 72 浏览量
2017-06-27
14:12:31
上传
评论
收藏 24.06MB ZIP 举报
温馨提示
kafka
资源推荐
资源详情
资源评论
收起资源包目录
Kafka实例.zip (79个子文件)
Kafka-Demo
.project 2KB
.mymetadata 304B
src
main
resources
log4j.properties 292B
java
com
lin
demo
consumer
KafkaConsumer.java 2KB
producer
KafkaProducer.java 1KB
WebRoot
WEB-INF
classes
com
lin
demo
consumer
KafkaConsumer.class 3KB
producer
KafkaProducer.class 2KB
log4j.properties 292B
lib
jersey-container-servlet-2.22.1.jar 16KB
jackson-core-2.5.4.jar 225KB
jetty-security-9.2.12.v20150709.jar 94KB
javassist-3.18.1-GA.jar 697KB
hk2-utils-2.4.0-b31.jar 99KB
kafka_2.11-0.9.0.0.jar 4.78MB
jetty-server-9.2.12.v20150709.jar 409KB
aopalliance-repackaged-2.4.0-b31.jar 14KB
javax.inject-2.4.0-b31.jar 6KB
kafka_2.11-0.9.0.0-test.jar.asc 821B
hk2-api-2.4.0-b31.jar 169KB
connect-file-0.9.0.0.jar 9KB
jersey-client-2.22.1.jar 160KB
jackson-jaxrs-base-2.5.4.jar 28KB
slf4j-log4j12-1.7.6.jar 9KB
zkclient-0.7.jar 72KB
metrics-core-2.2.0.jar 80KB
lz4-1.2.0.jar 162KB
jetty-io-9.2.12.v20150709.jar 106KB
validation-api-1.1.0.Final.jar 62KB
jersey-container-servlet-core-2.22.1.jar 63KB
connect-runtime-0.9.0.0.jar 175KB
snappy-java-1.1.1.7.jar 580KB
kafka-tools-0.9.0.0.jar 40KB
scala-library-2.11.7.jar 5.48MB
log4j-1.2.17.jar 478KB
connect-api-0.9.0.0.jar 45KB
jackson-module-jaxb-annotations-2.5.4.jar 32KB
javax.servlet-api-3.1.0.jar 94KB
slf4j-api-1.7.6.jar 28KB
javax.inject-1.jar 2KB
argparse4j-0.5.0.jar 82KB
kafka-clients-0.9.0.0.jar 620KB
jersey-server-2.22.1.jar 929KB
jersey-common-2.22.1.jar 680KB
jetty-util-9.2.12.v20150709.jar 349KB
jersey-guava-2.22.1.jar 949KB
zookeeper-3.4.6.jar 774KB
kafka-log4j-appender-0.9.0.0.jar 8KB
jackson-databind-2.5.4.jar 1.09MB
jetty-http-9.2.12.v20150709.jar 103KB
connect-json-0.9.0.0.jar 34KB
javax.annotation-api-1.2.jar 26KB
javax.ws.rs-api-2.0.1.jar 113KB
kafka_2.11-0.9.0.0-scaladoc.jar 2.76MB
scala-parser-combinators_2.11-1.0.4.jar 414KB
kafka_2.11-0.9.0.0-javadoc.jar 47KB
jersey-media-jaxb-2.22.1.jar 71KB
kafka_2.11-0.9.0.0.jar.asc 821B
kafka_2.11-0.9.0.0-javadoc.jar.asc 821B
kafka_2.11-0.9.0.0-scaladoc.jar.asc 821B
jackson-jaxrs-json-provider-2.5.4.jar 16KB
kafka_2.11-0.9.0.0-sources.jar 607KB
jetty-servlet-9.2.12.v20150709.jar 113KB
jackson-annotations-2.5.0.jar 39KB
jopt-simple-3.2.jar 52KB
osgi-resource-locator-1.0.1.jar 20KB
scala-xml_2.11-1.0.4.jar 633KB
kafka_2.11-0.9.0.0-test.jar 1.98MB
hk2-locator-2.4.0-b31.jar 175KB
kafka_2.11-0.9.0.0-sources.jar.asc 821B
web.xml 404B
index.jsp 834B
META-INF
MANIFEST.MF 36B
.myeclipse
.settings
org.eclipse.wst.jsdt.ui.superType.container 49B
org.eclipse.wst.common.project.facet.core.xml 252B
org.eclipse.jdt.core.prefs 364B
org.eclipse.wst.jsdt.ui.superType.name 6B
org.eclipse.wst.common.component 568B
.jsdtscope 500B
.classpath 596B
共 79 条
- 1
资源评论
a65568641
- 粉丝: 3
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功