zoukankan      html  css  js  c++  java
  • golang中使用kafka

    golang中比较好用的kafka client有

    其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本sarama-cluster

    本文简单描述下sarama的一些简单使用

    生产者接口

    func producer_test() {
        fmt.Printf("producer_test
    ")
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        config.Producer.Return.Successes = true
        config.Producer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("producer_test create producer error :%s
    ", err.Error())
            return
        }
    
        defer producer.AsyncClose()
    
        // send message
        msg := &sarama.ProducerMessage{
            Topic: "kafka_go_test",
            Key:   sarama.StringEncoder("go_test"),
        }
    
        value := "this is message"
        for {
            fmt.Scanln(&value)
            msg.Value = sarama.ByteEncoder(value)
            fmt.Printf("input [%s]
    ", value)
    
            // send to chain
            producer.Input() <- msg
    
            select {
            case suc := <-producer.Successes():
                fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
            case fail := <-producer.Errors():
                fmt.Printf("err: %s
    ", fail.Err.Error())
            }
        }
    }
    

    消费者接口

    func consumer_test() {
        fmt.Printf("consumer_test")
    
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        // consumer
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("consumer_test create consumer error %s
    ", err.Error())
            return
        }
    
        defer consumer.Close()
    
        partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
        if err != nil {
            fmt.Printf("try create partition_consumer error %s
    ", err.Error())
            return
        }
        defer partition_consumer.Close()
    
        for {
            select {
            case msg := <-partition_consumer.Messages():
                fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s
    ",
                    msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
            case err := <-partition_consumer.Errors():
                fmt.Printf("err :%s
    ", err.Error())
            }
        }
    
    }
    

    元数据接口

    func metadata_test() {
        fmt.Printf("metadata test
    ")
    
        config := sarama.NewConfig()
        config.Version = sarama.V0_11_0_2
    
        client, err := sarama.NewClient([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("metadata_test try create client err :%s
    ", err.Error())
            return
        }
    
        defer client.Close()
    
        // get topic set
        topics, err := client.Topics()
        if err != nil {
            fmt.Printf("try get topics err %s
    ", err.Error())
            return
        }
    
        fmt.Printf("topics(%d):
    ", len(topics))
    
        for _, topic := range topics {
            fmt.Println(topic)
        }
    
        // get broker set
        brokers := client.Brokers()
        fmt.Printf("broker set(%d):
    ", len(brokers))
        for _, broker := range brokers {
            fmt.Printf("%s
    ", broker.Addr())
        }
    }


    作者:yandaren
    链接:https://www.jianshu.com/p/3d4655cd7054
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    git在iOS开发中的使用
    搜索联系人是去掉拼音中的空格
    xmPP(即时通讯)向远程服务器请求数据
    使用CFStringTransform进行汉字转拼音(可去掉声调)
    node的模块系统和commonJS规范的关系
    在centos7中通过使用yum安装mongoDB
    vue跨组件通信,简易状态管理的使用
    Linux(centos7) 常用命令
    前端打包后, 路由模式为history时,用express测试服务端能否正常解析路由路径
    几个文件目录树生成工具tree,treer,tree-cli,tree-node-cli的使用配置和对比
  • 原文地址:https://www.cnblogs.com/ExMan/p/14958099.html
Copyright © 2011-2022 走看看