package com.qyg.test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* java实现Kafka消费者的示例
*
* @author qiuyg
*
*/
public class KafkaClusterConsTest {
private static final String TOPIC = "partest";
//消费线程数
private static final int THREAD_AMOUNT = 5;
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "134.32.51.74:2181");
props.put("group.id", "group98");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(TOPIC, THREAD_AMOUNT);
// 可以读取多个topic
ConsumerConnector consumer = Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer
.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> msgStreamList = msgStreams.get(TOPIC);
// 使用ExecutorService来调度线程
ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT);
for (int i = 0; i < msgStreamList.size(); i++) {
KafkaStream<byte[], byte[]> kafkaStream = msgStreamList.get(i);
executor.submit(new MsgListenClusterTest(kafkaStream, i));
}
}
}
/**
* 具体处理message的线程
*
* @author qiuyg
*
*/
class MsgListenClusterTest implements Runnable {
private KafkaStream<byte[], byte[]> kafkaStream = null;
private int num = 0;
public MsgListenClusterTest(KafkaStream<byte[], byte[]> kafkaStream, int num) {
super();
this.kafkaStream = kafkaStream;
this.num = num;
}
public void run() {
try {
ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
while (true) {
if (iterator.hasNext()) {
try {
MessageAndMetadata<byte[], byte[]> msg = iterator.next();
String message = new String(msg.message());
System.out.println(Thread.currentThread().getId() + " consumer data: " + "\tpartition: " + msg.partition() + "\tmessage: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
kafka 发送和接收消息
共26个文件
jar:16个
class:4个
java:3个
5星 · 超过95%的资源 需积分: 49 47 下载量 173 浏览量
2018-03-01
22:57:54
上传
评论
收藏 19.03MB RAR 举报
温馨提示
消息中间件kafka 进行消息的分发和接收示例 应用中包含分通道发送和多线程接收
资源推荐
资源详情
资源评论
收起资源包目录
kafka.rar (26个子文件)
.project 381B
bin
com
qyg
test
MsgListenClusterTest.class 2KB
SimplePartitioner.class 969B
KafkaClusterConsTest.class 3KB
KafkaClusterProTest.class 2KB
.settings
org.eclipse.core.resources.prefs 207B
src
com
qyg
test
KafkaClusterConsTest.java 3KB
SimplePartitioner.java 825B
KafkaClusterProTest.java 1KB
.classpath 1KB
lib
scala-parser-combinators_2.11-1.0.4.jar 414KB
kafka-log4j-appender-0.11.0.0.jar 13KB
kafka_2.11-0.11.0.0-test.jar 4.46MB
scala-library-2.11.11.jar 5.48MB
slf4j-log4j12-1.7.25.jar 12KB
log4j-1.2.17.jar 478KB
metrics-core-2.2.0.jar 80KB
zookeeper-3.4.10.jar 851KB
kafka_2.11-0.11.0.0.jar 6.57MB
zkclient-0.10.jar 73KB
validation-api-1.1.0.Final.jar 62KB
slf4j-api-1.7.25.jar 40KB
kafka-tools-0.11.0.0.jar 76KB
kafka-streams-0.11.0.0.jar 643KB
snappy-java-1.1.2.6.jar 1.01MB
kafka-clients-0.11.0.0.jar 1.35MB
共 26 条
- 1
资源评论
- MXJ0080092019-10-12谢谢,适合新手,学习用用
cac2020
- 粉丝: 2
- 资源: 22
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功