zoukankan      html  css  js  c++  java
  • golang中使用kafka客户端sarama消费时需要注意的一个点

    1. kafka消费者的Consume()方法会阻塞;
    2. 当Consume()方法返回err时,不确定继续消费有没有问题;保险起见,退出进程,然后重新初始化。
    3. 当Consume()方法返回nil是,是可以继续消费的,亲测有效。

    需要注意的点写在了注释里:

    //StartKafkaConsumer 启动kafka消费者
    func StartKafkaConsumer(ctx context.Context) {
        //defer utils.ForPanic()   //当消费者出现问题的时候,通过panic退出进程。然后重新启动初始化
              //因此代码里不要加panic处理的机制
        config := sarama.NewConfig()
        config.Version = sarama.V2_1_0_0 // specify appropriate version
        config.Consumer.Return.Errors = true
        cfg := ReaderConfig.Config()
        group, err := sarama.NewConsumerGroup(
            []string{cfg.GetString("kafka.addr")},
            cfg.GetString("kafka.group_id"), config)
        if err != nil {
            my_logger.Errorf("sarama.NewConsumerGroup error, err=%s", err.Error())
            panic(err)
            //return
        }
        defer func() { _ = group.Close() }()
    
        // Track errors
        go func() {
            defer utils.ForPanic()
            for err := range group.Errors() {
                if err != nil {
                    my_logger.Errorf("consumer error:%s", err.Error())
                }
            }
        }()
        topics := []string{cfg.GetString("kafka.topics")}
        queueSize := cfg.GetInt64("kafka.queue_size")
        if queueSize <= 0 {
            my_logger.Errorf("queueSize <= 0")
            panic("queue_size error")
        }
        log.Printf("queue size:%d
    ", queueSize)
        handler := ConsumerGroupHandler{
            Pipe: make(chan []byte, queueSize),
        }
        coroutineCount := cfg.GetInt64("kafka.co_count")
        if coroutineCount <= 0 {
            my_logger.Errorf("coroutineCount <= 0")
            panic("coroutineCount error")
        }
        for i := 0; i < int(coroutineCount); i++ {
            go handler.Do()
        }
        log.Println("start success!")
        for {
            //关键代码
            //正常情况下:Consume()方法会一直阻塞
            //我测试发现,约30分钟左右,Consume()会返回,但没有error
            //无error的情况下,可以重复调用Consume()方法
            //当有error产生的时候,不确定Consume()是否能够继续完善的执行。
            //因此保险的办法是抛出panic,让进程重启。
            err = group.Consume(context.Background(), topics, &handler)
            if err != nil {
                my_logger.Errorf("group.Consume error: err=%s", err.Error())
                panic(err)
            } else {
                my_logger.Info("group.Consume exit")
            }
        }
    }
    
  • 相关阅读:
    <SpringMvc>入门二 常用注解
    <SpringMvc>入门一 HelloWorld
    <MyBatis>入门六 动态sql
    <MyBatis>入门五 查询的返回值处理
    <MyBatis>入门四 传入的参数处理
    <MyBatis>入门三 sqlMapper文件
    <MyBatis>入门二 全局配置文件
    <MyBatis>入门一 HelloWorld
    类和类的继承(6)
    python 的重载
  • 原文地址:https://www.cnblogs.com/ahfuzhang/p/14685521.html
Copyright © 2011-2022 走看看