package amqp
import (
"context"
"errors"
"fmt"
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
"strings"
"time"
)
/*
原则:
Consumer 加入给定Topic列表的 消费者集群, 并通过ConsumerHandler启动, 并阻塞ConsumerGroupSession;
会话的生命周期如下:
1. Consumer加入 Consumer Group, 并分配给他们"合理份额"的分区, 又名:claims;
2. 在开始处理之前, 会调用Handler的setup()钩子来通知用户claims, 并允许对状态进行任何必要的准备或更改;
3. 对于每个分配的claims, Handler的ConsumeClaim()将在一个单独的goroutine中调用, 该goroutine要求它是线程安全的; 必须小心的保护任何状态, 防止并发读/写;
4. 会话将持续, 直到其中一个consumerClaim()退出; 这可以是在"取消父上下文"时,也可以是"在启动服务器端rebalance周期"时;
5. 一旦所有consumerClaim()循环退出, 就会调用处理程序的Cleanup()钩子, 以允许用户在rebalance之前执行任何最终任务;
6. 最后,在claims released之前,最后一次提交标记的offset;
PS: 一旦触发rebalance,会话必须在Config中完成; 这意味着ConsumerClaim()必须尽快退出, 以便有时间进行Cleanup()和最终的offset commit;
如果超过超时, Kafka会将使用者从组中删除, 这将导致偏移提交失败;
*/
// ConsumerGroup 定义消费者组类
type ConsumerGroup struct {
logger *logrus.Entry
consumer sarama.ConsumerGroup
conf *sarama.Config
options consumerOption
}
type Data struct {
FaninTime time.Time
Topic string
Key []byte
Value []byte
}
// TODO 疯狂打印问题:如果Value是一个超级长串.
func (d *Data) String() string {
return fmt.Sprintf("{ Topic: %s, Key: %s, Value: %s, FaninTime:%s }",
d.Topic, d.Key, d.Value, d.FaninTime.Format("20060102 15:04:05.0000"))
}
// consumerOption 定义创建消费者所需要的参数
// 并实现了 `ConsumerGroupHandler`
type consumerOption struct {
logger *logrus.Entry
callback func(context.Context, *Data) error
groupID string
topics []string
restart bool
}
func (co consumerOption) Setup(sess sarama.ConsumerGroupSession) error {
co.logger.Info("consumer setup", fmt.Sprintln("{topic: partition}:", sess.Claims()))
return nil
}
func (co consumerOption) Cleanup(sess sarama.ConsumerGroupSession) error {
co.logger.Info("consumer exiting")
sess.Commit()
return nil
}
func (co consumerOption) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
defer func() {
if r := recover(); r != nil {
co.logger.Errorf("ConsumeClaim panic recovered: %v", r)
}
}()
for {
select {
case msg, ok := <-claim.Messages():
if !ok || msg == nil {
co.logger.WithField("data:", msg).Errorln("消费到空数据")
continue
}
// Mark: @zcf Callback崩了会影响到消费任务
// 所以, Callback一定要用好Context
data := &Data{
Key: msg.Key,
Value: msg.Value,
Topic: msg.Topic,
FaninTime: msg.Timestamp,
}
err := co.callback(sess.Context(), data)
retry := 0
for err != nil && retry < 5 {
co.logger.WithField("Topic:", claim.Topic()).WithError(err).Errorln("callback handle")
if err = co.callback(sess.Context(), data); err == nil {
break
}
<-time.After(time.Second)
retry++
}
co.logger.Infof("consumer[%s]: Topic: %s, Partition: %v, Offset:%v, Accession: %s ago",
co.groupID, claim.Topic(), claim.Partition(), msg.Offset, time.Since(msg.Timestamp).String())
sess.MarkMessage(msg, "")
case <-sess.Context().Done():
co.logger.WithField("Topic:", claim.Topic()).WithField("Partition:", claim.Partition()).WithField("NextOffset:", claim.HighWaterMarkOffset()).Info("Consumer context closing")
return nil
}
}
}
// Fetch 定义消费者拉取缓存的配置对象
type Fetch struct {
Min, Default, Max int32
}
type HandleFunc func(context.Context, *Data) error
// ConsumerConfig 定义创建ConsumerGroup所需的配置
type ConsumerConfig struct {
logger *logrus.Entry
handle HandleFunc
addrs []string
groupID string
topics []string
restart bool
}
type ConsumerConfigOption func(*ConsumerConfig)
func ConsumerConfigWithLogger(lg *logrus.Entry) ConsumerConfigOption {
return func(c *ConsumerConfig) {
c.logger = lg
}
}
func ConsumerConfigWithRestart(restart bool) ConsumerConfigOption {
return func(c *ConsumerConfig) {
c.restart = restart
}
}
// NewConsumerConfig 获取创建ConsumerGroup所需的默认配置
func NewConsumerConfig(addrs, topics []string, groupID string, handle HandleFunc, ops ...ConsumerConfigOption) *ConsumerConfig {
c := &ConsumerConfig{
logger: logrus.NewEntry(logrus.New()),
handle: handle,
addrs: addrs,
groupID: groupID,
topics: topics,
}
for _, op := range ops {
op(c)
}
return c
}
func ConsumerWithFetch(f Fetch) OptionFunc {
return func(c *sarama.Config) error {
c.Consumer.Fetch = f
return nil
}
}
func ConsumerBeginAt(at int64) OptionFunc {
return func(c *sarama.Config) error {
c.Consumer.Offsets.Initial = at
return nil
}
}
// NewConsumerGroup 创建一个消费者实例
// 默认从最旧的开始消费
func NewConsumerGroup(cf *ConsumerConfig, ops ...OptionFunc) (*ConsumerGroup, error) {
conf := sarama.NewConfig()
conf.Version = defaultVersion
conf.Consumer.Return.Errors = true
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
conf.Consumer.Fetch = Fetch{
Min: 1 << 10, // 1KB
Default: 1 << 20, // 1MB
Max: 10 << 20, // 10 MB
}
// conf.Consumer.Offsets.AutoCommit.Enable = true // 自动提交offset(默认开启)
// conf.Consumer.Offsets.AutoCommit.Interval = 10 * time.Second // 默认 1s
for _, op := range ops {
if err := op(conf); err != nil {
return nil, err
}
}
cg, err := sarama.NewConsumerGroup(cf.addrs, cf.groupID, conf)
if err != nil {
return nil, err
}
cf.logger = cf.logger.WithField("GroupID:", cf.groupID)
consumer := &ConsumerGroup{
conf: conf,
logger: cf.logger,
consumer: cg,
options: consumerOption{
logger: cf.logger,
callback: cf.handle,
groupID: cf.groupID,
topics: cf.topics,
restart: cf.restart,
},
}
return consumer, nil
}
// Run 执行消费动作
func (cg *ConsumerGroup) Run(ctx context.Context) {
defer cg.close()
for {
select {
case err := <-cg.consumer.Errors():
cg.logger.WithError(err).Errorln("Error channel")
cg.handleConsumeError(err)
case <-ctx.Done():
err := ctx.Err()
if err == nil || errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// 正常退出
return
}
cg.logger.WithError(err).Errorln("上下文异常退出")
default:
if err := cg.consumer.Consume(ctx, cg.options.topics, cg.options); err != nil {
cg.logger.WithError(err).Errorln("Consume Error channel")
cg.handleConsumeError(err)
}
}
}
}
// 处理消费失败的错误
func (cg *ConsumerGroup) handleConsumeError(err error) {
if isCanRestartError(err) {
cg.logger.WithError(err).Warnln("Kafka消费错误,尝试重新连接")
if cg.options.restart {
cg.restartConsumer()
}
} else {
cg.logger.WithError(err).Errorln("其他消费错误,不进行重试")
}
}
// 重启消费者
func (cg *ConsumerGroup) restartConsumer() {
//cg.close()
delayDuration := 5 * time.Second
cg.logger.Infof("等待 %s 后重新连接", delayDuration)
time.Sleep(delayDuration)
newConsumer, err := sarama.NewConsumerGroup(cg.options.topics, cg.options.groupID, cg.conf)
if err != nil {
cg.logger.WithError(err).Errorln("Failed to create new ConsumerGroup")
return
}
cg.consumer = newConsumer
}
func (cg *ConsumerGroup) close() {
if cg.consumer != nil {
if err := cg.consumer.Close(); err != nil {
cg.logger.WithError(err).Errorln("Consumer stoped error")
return
}
}
cg.logger.Info("Consumer stoped")
}
func isCanRestartError(err error) bool {
if errors.Is(err, sarama.ErrOffsetOutOfRange) {
re
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
go-kafka.zip (7个子文件)
kafka.go 4KB
consumer.go 8KB
consumer_test.go 3KB
auth_options.go 1KB
producer_test.go 3KB
producer.go 6KB
kafka_test.go 5KB
共 7 条
- 1
资源评论
hjx_dou
- 粉丝: 332
- 资源: 12
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功