zoukankan      html  css  js  c++  java
  • go kafka group

    在以前的文章kafka初探go和C#的实现里面我们用了sarama来消费kafka的消息,但是很遗憾它没有group的概念。没办法 我们只能用sarama-cluster来实现, 注意sarama版本不要太新否则有错误panic: non-positive interval for NewTicker 问题处理,建议大家可以修改go.mod文件如下:

    require (
        github.com/Shopify/sarama v1.24.1
        github.com/bsm/sarama-cluster v2.1.15+incompatible
    )

    测试代码如下:

    package main
    
    import (
        "fmt"
        "log"
        "os"
        "os/signal"
        _ "regexp"
        "time"
    
        "github.com/Shopify/sarama"
        cluster "github.com/bsm/sarama-cluster"
    )
    
    var Address = []string{"192.168.100.30:9092"}
    var Topic = "gavintest"
    
    //panic: non-positive interval for NewTicker
    // 修改go.mod
    //github.com/Shopify/sarama v1.24.1
    // github.com/bsm/sarama-cluster v2.1.15+incompatible
    //修改
    /**
      消费者
    */
    func main() {
        go syncConsumer("demo1")
        go syncConsumer("demo2")
        go syncProducer()
        select {}
    
    }
    
    func syncConsumer(groupName string) {
        config := cluster.NewConfig()
        config.Consumer.Return.Errors = true
        config.Group.Return.Notifications = true
    
        // init consumer
        //可以订阅多个主题
        topics := []string{Topic}
        consumer, err := cluster.NewConsumer(Address, groupName, topics, config)
        if err != nil {
            panic(err)
        }
        //这里需要注意的是defer 一定要在panic 之后才能关闭连接
        defer consumer.Close()
    
        // trap SIGINT to trigger a shutdown.
        signals := make(chan os.Signal, 1)
        signal.Notify(signals, os.Interrupt)
    
        // consume errors
        go func() {
            for err := range consumer.Errors() {
                log.Printf("Error: %s
    ", err.Error())
            }
        }()
    
        // consume notifications
        go func() {
            for ntf := range consumer.Notifications() {
                log.Printf("Rebalanced: %+v
    ", ntf)
            }
        }()
    
        // 循环从通道中获取message
        //msg.Topic 消息主题
        //msg.Partition  消息分区
        //msg.Offset
        //msg.Key
        //msg.Value 消息值
        for {
            select {
            case msg, ok := <-consumer.Messages():
                if ok {
                    fmt.Printf("%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s
    ", groupName, msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
                    consumer.MarkOffset(msg, "") // 上报offset
                }
            case err := <-consumer.Errors():
                {
                    fmt.Println(fmt.Sprintf("consumer error:%v", err))
                }
            case <-signals:
                return
            }
        }
    }
    
    //同步消息模式
    func syncProducer() {
        //指定配置
        config := sarama.NewConfig()
        // 等待服务器所有副本都保存成功后的响应
        config.Producer.RequiredAcks = sarama.WaitForAll
        // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        // 是否等待成功和失败后的响应
        config.Producer.Return.Successes = true
        config.Producer.Timeout = 5 * time.Second
        producer, err := sarama.NewSyncProducer(Address, config)
        if err != nil {
            log.Printf("sarama.NewSyncProducer err, message=%s 
    ", err)
            return
        }
        defer producer.Close()
        msg := &sarama.ProducerMessage{
            Topic: Topic,
        }
    
        var i = 0
        for {
            i++
            //将字符串转换为字节数组
            msg.Value = sarama.ByteEncoder(fmt.Sprintf("this is a message:%d", i))
            //SendMessage:该方法是生产者生产给定的消息
            //partition, offset, err := producer.SendMessage(msg)
            _, _, err := producer.SendMessage(msg)
            //生产失败的时候返回error
            if err != nil {
                fmt.Println(fmt.Sprintf("Send message Fail %v", err))
            }
            //生产成功的时候返回该消息的分区和所在的偏移量
            //fmt.Printf("send message Partition = %d, offset=%d
    ", partition, offset)
    
            time.Sleep(time.Second * 5)
        }
    
    }

    运行结果: 

     https://github.com/bsm/sarama-cluster

    windows技术爱好者
  • 相关阅读:
    shell-条件测试
    51Nod 1279 扔盘子 (思维+模拟)
    51Nod 1042 数字0-9的数量(数位DP)
    Codeforces 1138B Circus (构造方程+暴力)
    51nod 1133 不重叠的线段 (贪心,序列上的区间问题)
    51nod 1091 线段的重叠(贪心)
    EOJ Monthly 2019.2 E 中位数 (二分+中位数+dag上dp)
    牛客练习赛39 C 流星雨 (概率dp)
    牛客练习赛39 B 选点(dfs序+LIS)
    Educational Codeforces Round 57
  • 原文地址:https://www.cnblogs.com/majiang/p/14543566.html
Copyright © 2011-2022 走看看