zoukankan      html  css  js  c++  java
  • [Golang] kafka集群搭建和golang版生产者和消费者

    一、kafka集群搭建  

      至于kafka是什么我都不多做介绍了,网上写的已经非常详尽了。

     (没安装java环境的需要先安装 yum -y install java-1.8.0-openjdk*)

    1. 下载zookeeper  https://zookeeper.apache.org/releases.html

    2. 下载kafka http://kafka.apache.org/downloads

    3. 启动zookeeper集群(我的示例是3台机器,后面的kafka也一样,这里就以1台代指3台,当然你也可以只开1台

      1)配置zookeeper。 修改复制一份 zookeeper-3.4.13/conf/zoo_sample.cfg 改名成zoo.cfg。修改以下几个参数,改成适合自己机器的。

      dataDir=/home/test/zookeeper/data
      dataLogDir=/home/test/zookeeper/log
      server.1=10.22.1.1:2888:3888
      server.2=10.22.1.2:2888:3888
      server.3=10.22.1.3:2888:3888

      2) 创建myid文件,确定机器编号。分别在3台机器的/home/test/zookeeper/data目录执行分别执行命令 echo 1 > myid(注意ip为10.22.1.2把1改成2,见上面的配置)

      3) 启动zookeeper集群。分别进入目录zookeeper-3.4.13/bin 执行 sh zkServer.sh start

    4. 启动kafka集群

      1) 配置kafka。进入kafka_2.11-2.2.0/config。复制3份,分别为server1.properties,server2.properties,server3.properties。修改以下几项(注意对应的机器id)

    log.dirs和zookeeper.connect 是一样的。broker.id和listeners分别填对应的id和ip
    broker.id=1
    listeners=PLAINTEXT://10.22.1.1:9092
    log.dirs=/home/test/kafka/log
    zookeeper.connect=10.22.1.1:2181,10.22.1.2:2181,10.22.1.3:2181

      2) 启动kafka集群。分别进入kafka_2.11-2.2.0/bin目录,分别执行sh kafka-server-start.sh ../config/server1.properties (第2台用server2.properties配置文件)

     

    二、Golang生产者和消费者

      目前比较流行的golang版的kafka客户端库有两个:

      1. https://github.com/Shopify/sarama

      2. https://github.com/confluentinc/confluent-kafka-go

      至于谁好谁坏自己去分辨,我用的是第1个,star比较多的。

    1. kafka生产者代码

      这里有2点要说明:

      1)  config.Producer.Partitioner = sarama.NewRandomPartitioner,我分partition用的是随机,如果你想稳定分paritition的话可以自定义,还有轮询和hash方式

      2) 我的topic是走的外部配置,可以根据自己的需求修改

    // Package kafka_producer kafka 生产者的包装
    package kafka_producer
    
    import (
        "github.com/Shopify/sarama"
        "strings"
        "sync"
        "time"
    
        "github.com/alecthomas/log4go"
    )
    
    // Config 配置
    type Config struct {
        Topic      string `xml:"topic"`
        Broker     string `xml:"broker"`
        Frequency  int    `xml:"frequency"`
        MaxMessage int    `xml:"max_message"`
    }
    
    type Producer struct {
        producer sarama.AsyncProducer
    
        topic     string
        msgQ      chan *sarama.ProducerMessage
        wg        sync.WaitGroup
        closeChan chan struct{}
    }
    
    // NewProducer 构造KafkaProducer
    func NewProducer(cfg *Config) (*Producer, error) {
    
        config := sarama.NewConfig()
        config.Producer.RequiredAcks = sarama.NoResponse                                  // Only wait for the leader to ack
        config.Producer.Compression = sarama.CompressionSnappy                            // Compress messages
        config.Producer.Flush.Frequency = time.Duration(cfg.Frequency) * time.Millisecond // Flush batches every 500ms
        config.Producer.Partitioner = sarama.NewRandomPartitioner
    
        p, err := sarama.NewAsyncProducer(strings.Split(cfg.Broker, ","), config)
        if err != nil {
            return nil, err
        }
        ret := &Producer{
            producer:  p,
            topic:     cfg.Topic,
            msgQ:      make(chan *sarama.ProducerMessage, cfg.MaxMessage),
            closeChan: make(chan struct{}),
        }
    
        return ret, nil
    }
    
    // Run 运行
    func (p *Producer) Run() {
    
        p.wg.Add(1)
        go func() {
            defer p.wg.Done()
    
        LOOP:
            for {
                select {
                case m := <-p.msgQ:
                    p.producer.Input() <- m
                case err := <-p.producer.Errors():
                    if nil != err && nil != err.Msg {
                        l4g.Error("[producer] err=[%s] topic=[%s] key=[%s] val=[%s]", err.Error(), err.Msg.Topic, err.Msg.Key, err.Msg.Value)
                    }
                case <-p.closeChan:
                    break LOOP
                }
    
            }
        }()
    
        for hasTask := true; hasTask; {
            select {
            case m := <-p.msgQ:
                p.producer.Input() <- m
            default:
                hasTask = false
            }
        }
    
    }
    
    // Close 关闭
    func (p *Producer) Close() error {
        close(p.closeChan)
        l4g.Warn("[producer] is quiting")
        p.wg.Wait()
        l4g.Warn("[producer] quit over")
    
        return p.producer.Close()
    }
    
    // Log 发送log
    func (p *Producer) Log(key string, val string) {
        msg := &sarama.ProducerMessage{
            Topic: p.topic,
            Key:   sarama.StringEncoder(key),
            Value: sarama.StringEncoder(val),
        }
    
        select {
        case p.msgQ <- msg:
            return
        default:
            l4g.Error("[producer] err=[msgQ is full] key=[%s] val=[%s]", msg.Key, msg.Value)
        }
    }

    2. kafka消费者

      几点说明:

      1) kafka一定要选用支持集群的版本

      2) 里面带了创建topic,删除topic,打印topic的工具

      3) replication是外面配置的

      4) 开多个consumer需要在创建topic时设置多个partition。官方的示例当开多个consumer的时候会崩溃,我这个版本不会,我给官方提交了一个PR,还不知道有没有采用

    // Package main Kafka消费者
    package main
    
    import (
        "context"
        "encoding/xml"
        "flag"
        "fmt"
        "io/ioutil"
        "log"
        "os"
        "os/signal"
        "runtime"
        "strings"
        "syscall"
        "time"
    
        "github.com/Shopify/sarama"
        "github.com/alecthomas/log4go"
    )
    
    // Consumer Consumer配置
    type ConsumerConfig struct {
        Topic       []string `xml:"topic"`
        Broker      string   `xml:"broker"`
        Partition   int32    `xml:"partition"`
        Replication int16    `xml:"replication"`
        Group       string   `xml:"group"`
        Version     string   `xml:"version"`
    }
    
    var (
        configFile = "" // 配置路径
        initTopic  = false
        listTopic  = false
        delTopic   = ""
        cfg        = &Config{}
    )
    
    // Config 配置
    type Config struct {
        Consumer ConsumerConfig `xml:"consumer"`
    }
    
    func init() {
        flag.StringVar(&configFile, "config", "../config/consumer.xml", "config file ")
        flag.BoolVar(&initTopic, "init", initTopic, "create topic")
        flag.BoolVar(&listTopic, "list", listTopic, "list topic")
        flag.StringVar(&delTopic, "del", delTopic, "delete topic")
    
    }
    
    func main() {
    
        runtime.GOMAXPROCS(runtime.NumCPU())
    
        defer func() {
            time.Sleep(time.Second)
            log4go.Warn("[main] consumer quit over!")
            log4go.Global.Close()
        }()
    
        contents, _ := ioutil.ReadFile(configFile)
        xml.Unmarshal(contents, cfg)
    
        // sarama的logger
        sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", "consumer"), log.LstdFlags)
    
        // 指定kafka版本,一定要支持kafka集群
        version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)
        if err != nil {
            panic(err)
        }
        config := sarama.NewConfig()
        config.Version = version
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
    
        // 工具
        if tool(cfg, config) {
            return
        }
    
        // kafka consumer client
        ctx, cancel := context.WithCancel(context.Background())
        client, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)
        if err != nil {
            panic(err)
        }
    
        consumer := Consumer{}
        go func() {
            for {
                err := client.Consume(ctx, cfg.Consumer.Topic, &consumer)
                if err != nil {
                    log4go.Error("[main] client.Consume error=[%s]", err.Error())
                    // 5秒后重试
                    time.Sleep(time.Second * 5)
                }
            }
        }()
    
        // os signal
        sigterm := make(chan os.Signal, 1)
        signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    
        <-sigterm
    
        cancel()
        err = client.Close()
        if err != nil {
            panic(err)
        }
    
        log4go.Info("[main] consumer is quiting")
    }
    
    func tool(cfg *Config, config *sarama.Config) bool {
        if initTopic || listTopic || len(delTopic) > 0 {
            ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)
            if nil != err {
                panic(err)
            }
    
            if len(delTopic) > 0 { // 删除Topic
                if err := ca.DeleteTopic(delTopic); nil != err {
                    panic(err)
                }
                log4go.Info("delete ok topic=[%s]
    ", delTopic)
            } else if initTopic { // 初始化Topic
                if detail, err := ca.ListTopics(); nil != err {
                    panic(err)
                } else {
                    for _, v := range cfg.Consumer.Topic {
                        if d, ok := detail[v]; ok {
                            if cfg.Consumer.Partition > d.NumPartitions {
                                if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {
                                    panic(err)
                                }
                                log4go.Info("alter topic ok", v, cfg.Consumer.Partition)
                            }
    
                        } else {
                            if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {
                                panic(err)
                            }
                            log4go.Info("create topic ok", v)
                        }
                    }
                }
            }
    
            // 显示Topic列表
            if detail, err := ca.ListTopics(); nil != err {
                log4go.Info("ListTopics error", err)
            } else {
                for k := range detail {
                    log4go.Info("[%s] %+v", k, detail[k])
                }
            }
    
            if err := ca.Close(); nil != err {
                panic(err)
            }
    
            return true
        }
        return false
    }
    
    type Consumer struct {
    }
    
    func (consumer *Consumer) Setup(s sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (consumer *Consumer) Cleanup(s sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        for message := range claim.Messages() {
            key := string(message.Key)
            val := string(message.Value)
            log4go.Info("%s-%s", key, val)
            session.MarkMessage(message, "")
        }
    
        return nil
    }
  • 相关阅读:
    【网络】【操作系统】select、poll、epoll
    【JMM】java内存模型及volatile关键字底层
    【数据库】连接查询(from 内连接 外连接)
    【数据库】SQL牛客练习关键点复习
    【SpringMVC】文件/图片 的下载与上传
    【SpringMVC】拦截器实现与网页跳转步骤
    什么是hashMap,初始长度,高并发死锁,java8 hashMap做的性能提升
    自己写一个HashMap
    String去除重复字符两个方法
    Solr与Elasticsearch比较
  • 原文地址:https://www.cnblogs.com/mrblue/p/10770651.html
Copyright © 2011-2022 走看看