没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
试读
15页
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端 Kafka快速⼊门(⼗⼆)——Python客户端 ⼀、confluent-kafka 1、confluent-kafka简介 confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka 0.8以上版本。本⽂基于confluent-kafka 1.3.0编写。 GitHub地址: 2、confluent-kafka特性 (1)可靠。confluent-kafka是对⼴泛应⽤于各种⽣产环境的librdkafka的封装,使⽤Java客户端相同的测试集进⾏测试,由Confluent进⾏ ⽀持。 (2)性能。性能是⼀个关键的设计考虑因素,对于较⼤的消息,最⼤吞吐量与Java客户机相当(Python解释器的开销影响较⼩),延迟与Java 客户端相当。 (3)未来⽀持。Coufluent由Kafka创始⼈创建,致⼒于构建以Apache Kafka为核⼼的流处理平台。确保核⼼Apache Kafka和Coufluent 平台组件保持同步是当务之急。 3、confluent-kafk
资源推荐
资源详情
资源评论
kafkapython教程_Kafka快速⼊门(⼗⼆)——Python客户端
Kafka快速⼊门(⼗⼆)——Python客户端
⼀、confluent-kafka
1、confluent-kafka简介
confluent-kafka是Python模块,是对librdkafka的轻量级封装,⽀持Kafka 0.8以上版本。本⽂基于confluent-kafka 1.3.0编写。
GitHub地址:
2、confluent-kafka特性
(1)可靠。confluent-kafka是对⼴泛应⽤于各种⽣产环境的librdkafka的封装,使⽤Java客户端相同的测试集进⾏测试,由Confluent进⾏
⽀持。
(2)性能。性能是⼀个关键的设计考虑因素,对于较⼤的消息,最⼤吞吐量与Java客户机相当(Python解释器的开销影响较⼩),延迟与Java
客户端相当。
(3)未来⽀持。Coufluent由Kafka创始⼈创建,致⼒于构建以Apache Kafka为核⼼的流处理平台。确保核⼼Apache Kafka和Coufluent
平台组件保持同步是当务之急。
3、confluent-kafka安装
创建confluent源:
进⼊/etc/yum.repos.d⽬录创建confluent.repo⽂件:
[Confluent.dist]
name=Confluent repository (dist)
gpgcheck=1
enabled=1
[Confluent]
name=Confluent repository
gpgcheck=1
enabled=1
安装:
sudo yum clean all && sudo yum install confluent-community-2.12
sudo yum install librdkafka-devel python-devel
pip install confluent-kafka
安装AvroProducer、AvroConsumer:
pip install "confluent-kafka[avro]"
⼆、coufluent-kafka客户端API
1、confluent_kafka.Consumer
Consumer(config)
使⽤指定的配置dict创建Consumer实例。
Consumer.assign(partitions)
由指定TopicPartition列表设置Consumer的分区分配策略,启动消费。如果对关闭的Consumer调⽤本函数会抛出RuntimeError。
Consumer.assignment()
返回当前分区分配策略,返回list(TopicPartition)
Consumer.close()
关闭和终⽌Consumer实例,关闭Consumer实例会执⾏以下操作:停⽌消费;提交位移(如果enable.auto.commit设置为False会抛出异
常)、离开Consumer Group。
Consumer.commit([message=None][, offsets=None][, asynchronous=True])
提交⼀条消息或位移列表,message和offsets是互斥参数,如果没有指定参数,会使⽤当前分区分配策略的offsets。
message:提交消息的位移加1
offsets:要提交的TopicPartition列表
asynchronous:是否异步提交。异步提交会⽴即返回None。如果设置为False,会阻塞直到提交成功或失败,如果提交成功,会返回提交
的offsets。注意:提交成功,需要对返回的TopicPartition列表的每个TopicPartition的err字段进⾏检查,TopicPartition可能会提交失
败。
Consumer.committed(partitions[, timeout=None])
获取已提交的分区的offsets。
partitions:TopicPartition列表
timeout:请求超时,单位秒。
返回TopicPartition列表或错误集
Consumer.consume([num_messages=1][, timeout=-1])
消费消息,调⽤回调函数,返回消息列表,如果超时,返回空。
应⽤程序必须检查返回Message的error⽅法,正常Message的error返回None。
num_messages:返回的最⼤消息数量,默认为1
timeout:阻塞等待消息、事件、回调函数的最⼤时间
Connsumer.get_watermark_offsets(partition[, timeout=None][, cached=False])
获取分区的低⽔位和⾼⽔位
partition:TopicPartition对象
Timeout:请求超时,
Cached:是否替换正在查询的Broker使⽤的缓存信息。
成功返回低⽔位和⾼⽔位的元组,超时返回None。
Consumer.list_topics([topic=None][, timeout=-1])
请求集群的元数据信息。
topic:字符串类,如果指定,只请求本Topic的信息,否则返回集群的所有Topic。
timeout:超时前的最⼤响应时间,-1表⽰永不超时。
返回ClusterMetadata类型
Consumer.offsets_for_times(partitions[, timeout=None])
对指定的分区列表根据时间戳查询offsets。
返回每个分区的offsets⼤于等于指定分区列表的时间戳的位移。
partitions:TopicPartition列表
timeout:请求超时时间。
Consumer.pause(partitions)
暂停指定分区列表的分区的消费
Consumer.poll([timeout=None])
消费消息,调⽤回调函数,返回事件。
应⽤程序必须检查返回的Message对象的error()⽅法,如果是正常消息,返回None。
返回Message对象回None。
Consumer.position(partitions)
获取指定分区列表分区的位移
partitions:分区列表
返回带位移的TopicPartition列表,当前位移是最新消费消息的位移加1。
Consumer.resume(partitions)
恢复指定分区列表的分区的消费
partitions:要恢复的TopicPartitio列表
Consumer.seek(partition)
定位分区的消费位移到offset。offset可以是绝对值,也可以是逻辑位移OFFSET_BEGINNING。本函数只⽤于活跃消费分区更新消费位
移,要设置分区的起始位移可以使⽤assign函数。
Consumer.store_offsets([message=None][, offsets=None])
存储⼀条消息的位移或位移列表。
message和offsets是互斥参数。
被存储的位移会根据auto.commit.interval.m参数值被提交,使⽤本函数时enable.auto.offset.store参数必须被设置为False。
message:存储message的位移加1。
offsets:要存储位移的TopicPartition列表
Consumer.subscribe(topics[, on_assign=None][, on_revoke=None])
设置要订阅的Topic列表,会替代此前订阅的Topic。
订阅的Topic名称⽀持正则表达式,使⽤”^”作为Topic名称前缀。
topics:Topic名称列表
on_assign:完成分区再分配的回调函数
on_revoke:再平衡操作的
on_assign(consumer, partitions)
on_revoke(consumer, partitions)
剩余14页未读,继续阅读
资源评论
是空空呀
- 粉丝: 167
- 资源: 3万+
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功