zoukankan      html  css  js  c++  java
  • golang中使用kafka

    golang中比较好用的kafka client有

    其中 sarama的使用者应该是最多的, 然后还有一个sarama的cluster版本sarama-cluster

    本文简单描述下sarama的一些简单使用

    生产者接口

    func producer_test() {
        fmt.Printf("producer_test
    ")
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        config.Producer.Return.Successes = true
        config.Producer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("producer_test create producer error :%s
    ", err.Error())
            return
        }
    
        defer producer.AsyncClose()
    
        // send message
        msg := &sarama.ProducerMessage{
            Topic: "kafka_go_test",
            Key:   sarama.StringEncoder("go_test"),
        }
    
        value := "this is message"
        for {
            fmt.Scanln(&value)
            msg.Value = sarama.ByteEncoder(value)
            fmt.Printf("input [%s]
    ", value)
    
            // send to chain
            producer.Input() <- msg
    
            select {
            case suc := <-producer.Successes():
                fmt.Printf("offset: %d,  timestamp: %s", suc.Offset, suc.Timestamp.String())
            case fail := <-producer.Errors():
                fmt.Printf("err: %s
    ", fail.Err.Error())
            }
        }
    }
    

    消费者接口

    func consumer_test() {
        fmt.Printf("consumer_test")
    
        config := sarama.NewConfig()
        config.Consumer.Return.Errors = true
        config.Version = sarama.V0_11_0_2
    
        // consumer
        consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("consumer_test create consumer error %s
    ", err.Error())
            return
        }
    
        defer consumer.Close()
    
        partition_consumer, err := consumer.ConsumePartition("kafka_go_test", 0, sarama.OffsetOldest)
        if err != nil {
            fmt.Printf("try create partition_consumer error %s
    ", err.Error())
            return
        }
        defer partition_consumer.Close()
    
        for {
            select {
            case msg := <-partition_consumer.Messages():
                fmt.Printf("msg offset: %d, partition: %d, timestamp: %s, value: %s
    ",
                    msg.Offset, msg.Partition, msg.Timestamp.String(), string(msg.Value))
            case err := <-partition_consumer.Errors():
                fmt.Printf("err :%s
    ", err.Error())
            }
        }
    
    }
    

    元数据接口

    func metadata_test() {
        fmt.Printf("metadata test
    ")
    
        config := sarama.NewConfig()
        config.Version = sarama.V0_11_0_2
    
        client, err := sarama.NewClient([]string{"localhost:9092"}, config)
        if err != nil {
            fmt.Printf("metadata_test try create client err :%s
    ", err.Error())
            return
        }
    
        defer client.Close()
    
        // get topic set
        topics, err := client.Topics()
        if err != nil {
            fmt.Printf("try get topics err %s
    ", err.Error())
            return
        }
    
        fmt.Printf("topics(%d):
    ", len(topics))
    
        for _, topic := range topics {
            fmt.Println(topic)
        }
    
        // get broker set
        brokers := client.Brokers()
        fmt.Printf("broker set(%d):
    ", len(brokers))
        for _, broker := range brokers {
            fmt.Printf("%s
    ", broker.Addr())
        }
    }


    作者:yandaren
    链接:https://www.jianshu.com/p/3d4655cd7054
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    JS缓存图片实例
    Windows Server 2008上安装Media Player
    [转] BizTalk Server 2010新功能介绍(一):概述
    Microsoft BizTalk ESB Toolkit 2.0
    Asp.NET导出Excel文件乱码解决若干方法
    [PM Tools]软件项目进度跟踪表v3.0
    关于Silverlight中多项目共享DLL文件的讨论
    Silverlight中的ListBox横向显示CheckBox
    设计模式.简单工厂
    Silverlight用户控件转移时产生的“元素已经是另一个元素的子元素”问题
  • 原文地址:https://www.cnblogs.com/ExMan/p/14958099.html
Copyright © 2011-2022 走看看