package com.heibaiying.consumers;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.Scanner;
/**
* Kafka消费者和消费者组
*/
public class ConsumerExit {
public static void main(String[] args) {
String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
props.put("group.id", group);
props.put("enable.auto.commit", false);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topic));
/*调用wakeup优雅的退出*/
final Thread mainThread = Thread.currentThread();
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
if ("exit".equals(sc.next())) {
consumer.wakeup();
try {
/*等待主线程完成提交偏移量、关闭消费者等操作*/
mainThread.join();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
record.topic(), record.partition(), record.key(), record.value(), record.offset());
}
}
} catch (WakeupException e) {
//对于wakeup()调用引起的WakeupException异常可以不必处理
} finally {
consumer.close();
System.out.println("consumer关闭");
}
}
}
Kafka生产者消费者使用案例
需积分: 0 16 浏览量
更新于2023-10-13
1
收藏 12KB RAR 举报
在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是 Kafka 实现横向伸缩的主要手段。
shangjg3
- 粉丝: 3053
- 资源: 144
最新资源
- DirectiveError解决办法.md
- 肝脏及其肿瘤分割的 CT 数据集,已经切片成jpg数据,约2w张数据和mask
- 基于OpenCV和C的文档扫描仪++
- 2024年全球芯片设计行业市场发展现状和前景预测报告
- frida拦截微信小程序云托管API
- 手写流程图检测31-YOLO(v5至v8)、COCO、CreateML、Darknet、Paligemma、TFRecord数据集合集.rar
- Python编程一级基础练习(含答案)
- awewq1132323
- 2024年全球螺栓行业市场发展现状和前景预测报告
- 基于python flask实现某瓣数据可视化数据分析平台
- 手势检测7-YOLO(v5至v11)、COCO、CreateML、Paligemma、TFRecord、VOC数据集合集.rar
- 2024年全球电磁兼容材料行业市场发展现状和前景预测报告
- 中式汉堡市场调研报告:2023年市场规模约为1890亿元
- 2021年中国便民缴费产业报告.zip
- CentOS bridge 工具包 bridge-utils-1.6-1.33.x86-64.rpm
- 数据库应用技术考试方案-A卷-图书馆管理系统的数据库操作-可实现-有问题联系博主