zoukankan      html  css  js  c++  java
  • Go 关于 kafka 的生产者、消费者实例

    zookeeper + kafka

    首先要在 apche 官网下载 kafka 的程序包(linux版本),然后放到服务器上解压,得到以下目录

    bin 目录下包含了服务的启动脚本

    启动 zookeeper

    ./bin/zookeeper-server-start.sh config/zookeeper.properties
    

      

    启动 kafka server

    ./bin/kafka-server-start.sh config/server.properties
    

    创建一个主题

    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1(主题名)
    

      

    启动生产者

    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1
    

    启动消费者

    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
    

    如果要消费前面的数据,在启动时添加 --from-beginning 参数

    ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning
    

    查看 kafka 进程命令:

    jps
    

    producer

    安装 Sarama 包

    go get github.com/Shopify/sarama

    demo:创建生产者,往单节点的 kafka 上发送数据

    package main
    import (
       "fmt"
       "github.com/Shopify/sarama"
       "time"
    )
    
    //消息写入kafka
    func main() {
       //初始化配置
       config := sarama.NewConfig()
       config.Producer.RequiredAcks = sarama.WaitForAll
       config.Producer.Partitioner = sarama.NewRandomPartitioner
       config.Producer.Return.Successes = true
       //生产者
       client, err := sarama.NewSyncProducer([]string{"10.10.4.35:9092"}, config)
       if err != nil {
          fmt.Println("producer close,err:", err)
          return
       }
       defer client.Close()
    
       for i:=0; i<5; i++ {
          //创建消息
          msg := &sarama.ProducerMessage{}
          msg.Topic = "cctv1"
          msg.Value = sarama.StringEncoder("this is a good test,hello kai")
          //发送消息
          pid, offset, err := client.SendMessage(msg)
          if err != nil {
             fmt.Println("send message failed,", err)
             return
          }
          fmt.Printf("pid:%v offset:%v
    ", pid, offset)
          time.Sleep(time.Second)
       }
    }
    

    运行结果:(上述代码执行一下,就会往 kafka 中 cctv1 的主题发布 5 条消息)

    然后通过 kafka 自带的消费者终端查看发送的数据

    ./bin/kafka-console-consumer.sh --bootstrap-server 10.10.4.35:9092 --topic cctv1 --from-beginning

    consumer

    demo:创建消费者,从单节点的 kafka 中消费数据

    package main
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    func main() {
        consumer, err := sarama.NewConsumer([]string{"10.10.4.35:9092"}, nil)
        if err != nil {
            fmt.Println("consumer connect err:", err)
            return
        }
        defer consumer.Close()
    
        //获取 kafka 主题
        partitions, err := consumer.Partitions("cctv1")
        if err != nil {
            fmt.Println("get partitions failed, err:", err)
            return
        }
    
        for _, p := range partitions {
        	//sarama.OffsetNewest:从当前的偏移量开始消费,sarama.OffsetOldest:从最老的偏移量开始消费
            partitionConsumer, err := consumer.ConsumePartition("cctv1", p, sarama.OffsetNewest)
            if err != nil {
                fmt.Println("partitionConsumer err:", err)
                continue
            }
            wg.Add(1)
            go func(){
                for m := range partitionConsumer.Messages() {
                    fmt.Printf("key: %s, text: %s, offset: %d
    ", string(m.Key), string(m.Value), m.Offset)
                }
                wg.Done()
            }()
        }
        wg.Wait()
    }
    

    consumer 版本2

    package kafkaPlugin
    import (
        "fmt"
        "github.com/Shopify/sarama"
    )
    
    type KafkaConsumer struct {
        Node string
        Topic string
        //Message string
        MessageQueue chan string
    }
    
    func (this KafkaConsumer) Consume(){
        consumer, err := sarama.NewConsumer([]string{this.Node}, nil)
        if err != nil {
            fmt.Printf("kafka connnet failed, error[%v]", err.Error())
            return
        }
        defer consumer.Close()
    
        partitions, err := consumer.Partitions(this.Topic)
        if err != nil {
            fmt.Printf("get topic failed, error[%v]", err.Error())
            return
        }
        for _, p := range partitions {
            partitionConsumer, err := consumer.ConsumePartition(this.Topic, p, sarama.OffsetNewest)
            if err != nil {
                fmt.Printf("get partition consumer failed, error[%v]", err.Error())
                continue
            }
    
            for message := range partitionConsumer.Messages() {
                fmt.Printf("message:[%v], key:[%v], offset:[%v]
    ", string(message.Value), string(message.Key), string(message.Offset))
                this.MessageQueue <- string(message.Value)
            }
        }
    }
    
    func main(){
        var kafkaConsumer = KafkaConsumer{
            Node: "10.10.4.35:9092",
            Topic: "cctv1",
        }
        kafkaConsumer.Consume()
    }
    View Code

    上述代码执行起来,就会开启 kafka 消费者持续监听,然后通过 kafka 自带的生产者终端发送 2 条测试数据,消费结果如下:

    windows 环境,运行编译错误:exec: "gcc": executable file not found in %PATH%

    问题处理请参考下面的链接:

    https://blog.csdn.net/myBarbara/article/details/95358694

    https://www.cnblogs.com/ggg-327931457/p/9694516.html

    ending ~

    每天都要遇到更好的自己.
  • 相关阅读:
    自我学习而已——javascript——变量,作用域和内存问题
    自我学习而已——javascript——数据类型部分
    面向对象三大特性之封装
    面向对象三大特性之继承与多态
    初识面向对象
    python常用模块(re模块)
    递归函数与算法
    Python之匿名函数
    python之内置函数
    各种推导式
  • 原文地址:https://www.cnblogs.com/kaichenkai/p/11234450.html
Copyright © 2011-2022 走看看