kafka 生产者生产者
kafka 生产者生产者
创建一个包含目标主题和内容的 ProducerRecord 对象,可指定键或分区,发送前要把对象序列化成字符数组。
数据被传给分区器,如果指定了分区就直接把指定的分区返回。如果没有指定分区,分区器就根据 ProducerRecord 对象的键
选择一个分区。紧接着这条记录就被添加到一个记录批次里,这个批次里所有的消息会被发送到相同的主题和分区上。
如果消息成功写入 kafka,就返回一个 PrcordMetaData 对象,包含了主题和分区信息,以及记录在分区的偏移量。如果写入
失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后还是失败就会返回错误信息。
Properties
kafka 生产者有3个必选的属性
bootstrap.servers——broker 的地址清单:清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其它
broker 的信息。不过至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能连接到集群上。
key.serializer:一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生成者会使用这个类把键对象序列
化成字节数组。kafka 客户端默认 提供 ByteArraySerializer、StringSerializer、IntegerSerializer。
value.serializer:序列化值对象的类。
kafka 得到其它配置:
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。
acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应。
acks=1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应,否则会收到一个错误响应。如果一个
没有收到消息的节点成为新首领,消息也会丢失。
acks=all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
buffer.memory:设置生产者内存缓冲区的大小。如果应用程序发送消息的速度超过发送到服务器的速度,send()方法要么阻
塞,要么抛出异常,这取决于如何设置block.bufffer.full 参数(在0.9.0.0版本里被替换成了max.block.ms,表示在抛出异常之
前可以阻塞一段时间)。
compression.type:指定了消息被发送给 broker 之前使用哪一种压缩算法压缩(默认不会被压缩)。该参数的值有 snappy、
gzip、lz4,。
retires:决定了生产者可以重发消息的次数,如果达到了这个次数,生产者就会放弃重发并返回错误。默认情况下,生产者在
每次重试之间等待100ms,不过可以通过retry.backoff.ms 参数改变这个时间间隔。
batch.size:指定了一个批次可以使用的内存大小,按照字节计算。
linger.ms:指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到的时
评论0
最新资源