没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
Apache Kafka系列文章
1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试
2、java调用kafka api
3、kafka重要概念介紹及示例
4、kafka分区、副本介绍及示例
5、kafka监控工具Kafka-Eagle介绍及使用
@TOC
本文介绍了kafka相关重要的概念及使用示例。
本文前提是kafka环境可用。
本文分为五部分,即概念、幂等与事务、分区的leader和follower、消息可靠机制和限速机制。
一、概念
1、Kafka重要概念
1)、broker
一个Kafka的集群通常由多个broker组成,这样才能实现负载均衡、以及容错
broker是无状态(Sateless)的,它们是通过ZooKeeper来维护集群状态
一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不影响性能
2)、zookeeper
ZK用来管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入、或者Kafka集群中出现故障的
broker。
3)、 producer(生产者)
生产者负责将数据推送给broker的topic
4)、consumer(消费者)
消费者负责从broker的topic中拉取数据,并自己进行处理
5)、consumer group(消费者组)
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以
有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消
费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一
个消费组内的一个consumer来消费。
consumer group是kafka提供的可扩展且具有容错性的消费者机制
一个消费者组可以包含多个消费者
一个消费者组有一个唯一的ID(group Id)
组内的消费者一起消费主题的所有分区数据
主题是一个逻辑概念,用于生产者发布数据,消费者拉取数据
Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限
制
在主题中的消息是有结构的,一般一个主题包含某一类消息
一旦生产者发送消息到主题中,这些消息就不能被更新(更改)
9)、偏移量(offset)
offset记录着下一条将要发送给Consumer的消息的序号
默认Kafka将offset存储在ZooKeeper中
在一个分区中,消息是有顺序的方式存储着,每个在分区的消费都是有一个递增的id。这个就是偏
移量offset
偏移量在分区中才是有意义的。在分区之间,offset是没有任何意义的
2、消费者组验证
Kafka支持有多个消费者同时消费一个主题中的数据。
1)、创建topic
由于每个分区数据只能是一个消费者进行消费,故该topic至少需要2个分区才能验证消费者组。
2)、生产者代码
kafka-topics.sh --create --bootstrap-server server1:9092 --topic t_consumerGroup
--partitions 2 --replication-factor 1
// 生产40条数据
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerTest {
// 只有主题分区超过一个以上的,才会由不同的消费者进行消费,一个分区的数据只能由组内的一个消
费者进行消费,本示例的topic有2个分区
public final static String TOPIC_NAME = "t_consumerGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "server1:9092");
props.put("acks", "all");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String,
String>(props);
boolean flag = true;
int count = 0;
while (flag) {
count++;
for (int i = 0; i < 10; ++i) {
try {
// 获取返回值Future,该对象封装了返回值
Future<RecordMetadata> future = producer.send(new
ProducerRecord<String, String>(TOPIC_NAME, i + "", i + ""));
// 调用一个Future.get()方法等待响应
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
剩余26页未读,继续阅读
资源评论
一瓢一瓢的饮alanchanchn
- 粉丝: 3006
- 资源: 69
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功