zoukankan      html  css  js  c++  java
  • [Golang] 消费Kafka的日志提交到ElasticSearch

    0x0 需求

      消费Kafka的日志并写入ElasticSearch供查询

    0x1 依赖库

    golang版Kafka客户端 https://github.com/Shopify/sarama

    golang版ElasticSearch客户端  https://github.com/elastic/go-elasticsearch

    0x2 实现

    总共分3部分

    1、Kafka消费者

    // LogJson json格式
    type LogJson struct {
        Tag     string    `json:"tag"`
        Level   string    `json:"level"`
        File    string    `json:"file"`
        Time    time.Time `json:"@timestamp"`
        Message string    `json:"message"`
    }
    
    type taskProcessor interface {
        AddTask(key string, val []byte)
    }
    
    // MyConsumer 可关闭的带任务处理器的消费者
    type MyConsumer struct {
        processor taskProcessor
        ctx       context.Context
    }
    
    // NewMyConsumer 构造
    func NewMyConsumer(p taskProcessor, ctx context.Context) *MyConsumer {
        c := &MyConsumer{
            processor: p,
            ctx:       ctx,
        }
    
        return c
    }
    
    // Setup 启动
    func (consumer *MyConsumer) Setup(s sarama.ConsumerGroupSession) error {
        log.Printf("[main] consumer.Setup memberID=[%s]", s.MemberID())
        return nil
    }
    
    // Cleanup 当退出时
    func (consumer *MyConsumer) Cleanup(s sarama.ConsumerGroupSession) error {
        log.Printf("[main] consumer.Cleanup memberID=[%s]", s.MemberID())
        return nil
    }
    
    // ConsumeClaim 消费日志
    func (consumer *MyConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    
        for {
            select {
            case message, ok := <-claim.Messages():
                if !ok {
                    return nil
                }
                js := &LogJson{}
                if err := json.Unmarshal(message.Value, js); nil != err {
                    fmt.Fprintf(os.Stderr, "[MyConsumer] ConsumeClaim json.Unmarshal err=[%s] topic=[%s] key=[%s] val=[%s]
    ", err.Error(), message.Topic, message.Key, string(message.Value))
                } else {
                    index := fmt.Sprintf("%s-%s", message.Topic, js.Time.Format("2006.01.02"))
                    consumer.processor.AddTask(index, message.Value)
                    session.MarkMessage(message, "")
                }
    
            case <-consumer.ctx.Done():
                return nil
            }
        }
    
        return nil
    }

    2、插入ElasticSearch的Worker

    package elastic_worker
    
    import (
        "context"
        "encoding/json"
        "fmt"
        "log"
        "runtime"
        "sync"
        "time"
    
        "github.com/olivere/elastic"
    )
    
    // Config 配置
    type Config struct {
        MaxMessage int `xml:"max_msg"`          // 最大缓冲
        WorkerNum  int `xml:"worker_number"`    // 线程个数
        BatchSize  int `xml:"batch_size"`       // 每个批次最大条数
        TickTime   int `xml:"tick_millisecond"` // 处理频率
    }
    
    type task struct {
        key string
        val []byte
    }
    
    // Worker 消息处理器
    type Worker struct {
        msgQ   chan *task
        client *elastic.Client
        wg     sync.WaitGroup
        config *Config
    }
    
    // NewWorker 构造
    func NewWorker(client *elastic.Client, cfg *Config) *Worker {
        w := &Worker{
            client: client,
            config: cfg,
            msgQ:   make(chan *task, cfg.MaxMessage),
        }
    
        return w
    }
    
    // Run 开工
    func (w *Worker) Run(ctx context.Context) {
    
        // 线程数
        thread := w.config.WorkerNum
        if thread <= 0 {
            thread = runtime.NumCPU()
        }
    
        // ticker
        tickTime := time.Duration(w.config.TickTime) * time.Millisecond
        if tickTime <= 0 {
            tickTime = time.Duration(100) * time.Millisecond
        }
    
        // 启动
        for i := 0; i < thread; i++ {
            w.wg.Add(1)
            time.Sleep(tickTime / time.Duration(thread))
            go func(idx int) {
    
                // 构造一个service,server可以反复使用
                service := w.client.Bulk()
                service.Refresh("wait_for")
                defer service.Reset()
    
                log.Printf("[elastic_worker] worker[%d] start", idx)
                defer w.wg.Done()
    
                // ticker
                ticker := time.NewTicker(tickTime)
                defer ticker.Stop()
    
            LOOP:
                for {
                    select {
                    case <-ctx.Done():
                        log.Printf("[elastic_worker] worker[%d] is quiting", idx)
                        // 要把通道里的全部执行完才能退出
                        for {
                            if num := w.process(service); num > 0 {
                                log.Printf("[elastic_worker] worker[%d] process batch [%d] when quiting", idx, num)
                            } else {
                                break LOOP
                            }
                            time.Sleep(tickTime)
                        }
    
                    case <-ticker.C:
                        if num := w.process(service); num > 0 {
                            log.Printf("[elastic_worker] worker[%d] process batch [%d] ", idx, num)
                        }
                    }
                }
    
                log.Printf("[elastic_worker] worker[%d] stop", idx)
            }(i)
        }
    }
    
    // AddTask 添加任务,goroutine safe
    func (w *Worker) AddTask(key string, val []byte) {
        t := &task{
            key: key,
            val: val,
        }
        w.msgQ <- t
    }
    
    // process 处理任务
    func (w *Worker) process(service *elastic.BulkService) int {
        //service.Reset()
    
        // 每个批次最多w.config.BatchSize个
    LOOP:
        for i := 0; i < w.config.BatchSize; i++ {
            // 有任务就加到这个批次,没任务就退出
            select {
            case m := <-w.msgQ:
                req := elastic.NewBulkIndexRequest().Index(m.key).Type("doc").Doc(json.RawMessage(m.val))
                service.Add(req)
            default:
                break LOOP
            }
        }
    
        total := service.NumberOfActions()
        if total > 0 {
            if resp, err := service.Do(context.Background()); nil != err {
                panic(err)
            } else {
                if resp.Errors {
                    for _, v := range resp.Failed() {
                        fmt.Println("service.Do failed", v)
                    }
                    panic("resp.Errors")
                }
            }
        }
    
        return total
    }
    
    // Close 关闭 需要外面的context关闭,和等待msgQ任务被执行完毕
    func (w *Worker) Close() {
        w.wg.Wait()
        if n := len(w.msgQ); n > 0 {
            log.Printf("[elastic_worker] worker Close remain msg[%d]", n)
        }
    }

    3、main.go

    package main
    
    import (
        "context"
        "encoding/xml"
        "flag"
        "fmt"
        "io/ioutil"
        "log"
        "os"
        "os/signal"
        "runtime"
        "strings"
        "syscall"
        "time"
    
        "consumer"
        "elastic_worker"
        
        "github.com/Shopify/sarama"
        "github.com/olivere/elastic"
    )
    
    // Consumer Consumer配置
    type ConsumerConfig struct {
        Topic       []string `xml:"topic"`
        Broker      string   `xml:"broker"`
        Partition   int32    `xml:"partition"`
        Replication int16    `xml:"replication"`
        Group       string   `xml:"group"`
        Version     string   `xml:"version"`
    }
    
    // Config 配置
    type Config struct {
        Consumer   ConsumerConfig        `xml:"consumer"`
        ElasticURL string                `xml:"elastic_url"`
        Filters    []string              `xml:"filter"`
        Worker     elastic_worker.Config `xml:"elastic_worker"`
    }
    
    var (
        configFile = "" // 配置路径
        initTopic  = false
        listTopic  = false
        delTopic   = ""
        cfg        = &Config{}
        web        = ""
    )
    
    func init() {
        flag.StringVar(&configFile, "config", "cfg.xml", "config file ")
        flag.BoolVar(&initTopic, "init", initTopic, "create topic")
        flag.BoolVar(&listTopic, "list", listTopic, "list topic")
        flag.StringVar(&delTopic, "del", delTopic, "delete topic")
    }
    
    var (
        elasticClient *elastic.Client
    )
    
    func main() {
        runtime.GOMAXPROCS(runtime.NumCPU())
    
        defer time.Sleep(time.Second)
    
        // 获取host名字
        hostName, err := os.Hostname()
        if nil != err {
            hostName = "[beats]"
        }
    
        // 加载配置
        if contents, err := ioutil.ReadFile(configFile); err != nil {
            panic(err)
        } else {
            if err = xml.Unmarshal(contents, cfg); err != nil {
                panic(err)
            }
        }
    
        // sarama的logger
        sarama.Logger = log.New(os.Stdout, fmt.Sprintf("[%s]", hostName), log.LstdFlags)
    
        // 指定kafka版本,一定要支持kafka集群
        version, err := sarama.ParseKafkaVersion(cfg.Consumer.Version)
        if err != nil {
            panic(err)
        }
        config := sarama.NewConfig()
        config.Version = version
        config.Consumer.Offsets.Initial = sarama.OffsetOldest
        config.ClientID = hostName
    
        // 工具
        if tool(cfg, config) {
            return
        } else {
            initTopic = true
            tool(cfg, config)
        }
    
        // 启动elastic客户端
        urls := strings.Split(cfg.ElasticURL, ",")
        if cli, err := elastic.NewClient(elastic.SetURL(urls...)); err != nil {
            panic(err)
        } else {
            elasticClient = cli
            // ping检查
            if ret, _, err := elasticClient.Ping(urls[0]).Do(context.Background()); nil != err {
    
                panic(err)
            } else {
                log.Printf("elasticClient.Ping %+v", ret)
            }
    
            defer elasticClient.Stop()
        }
    
        // ctx
        ctx, cancel := context.WithCancel(context.Background())
    
        // Worker
        worker := elastic_worker.NewWorker(elasticClient, &cfg.Worker)
        worker.Run(ctx)
        defer worker.Close()
    
        // kafka consumer client
        kafkaClient, err := sarama.NewConsumerGroup(strings.Split(cfg.Consumer.Broker, ","), cfg.Consumer.Group, config)
        if err != nil {
            panic(err)
        }
    
        consumer := consumer.NewMyConsumer(worker, ctx)
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return
                default:
                    err := kafkaClient.Consume(ctx, cfg.Consumer.Topic, consumer)
                    if err != nil {
                        log.Printf("[main] client.Consume error=[%s]", err.Error())
                        time.Sleep(time.Second)
                    }
                }
            }
        }()
    
        // os signal
        sigterm := make(chan os.Signal, 1)
        signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    
        //time.Sleep(time.Second * 4)
        sig := <-sigterm
        log.Printf("[main] os sig=[%v]", sig)
    
        cancel()
        log.Printf("[main] cancel")
        if err := kafkaClient.Close(); nil != err {
            log.Printf("[main] kafkaClient close error=[%s]", err.Error())
        }
    
        log.Printf("[main] beats quit")
    }
    
    func tool(cfg *Config, config *sarama.Config) bool {
        if initTopic || listTopic || len(delTopic) > 0 {
            ca, err := sarama.NewClusterAdmin(strings.Split(cfg.Consumer.Broker, ","), config)
            if nil != err {
                panic(err)
            }
    
            if len(delTopic) > 0 { // 删除Topic
                if err := ca.DeleteTopic(delTopic); nil != err {
                    panic(err)
                }
                log.Printf("delete ok topic=[%s]
    ", delTopic)
            } else if initTopic { // 初始化Topic
                if detail, err := ca.ListTopics(); nil != err {
                    panic(err)
                } else {
                    for _, v := range cfg.Consumer.Topic {
                        if d, ok := detail[v]; ok {
                            if cfg.Consumer.Partition > d.NumPartitions {
                                if err := ca.CreatePartitions(v, cfg.Consumer.Partition, nil, false); nil != err {
                                    panic(err)
                                }
                                log.Println("alter topic ok", v, cfg.Consumer.Partition)
                            }
    
                        } else {
                            if err := ca.CreateTopic(v, &sarama.TopicDetail{NumPartitions: cfg.Consumer.Partition, ReplicationFactor: cfg.Consumer.Replication}, false); nil != err {
                                panic(err)
                            }
                            log.Println("create topic ok", v)
                        }
                    }
                }
            }
    
            // 显示Topic列表
            if detail, err := ca.ListTopics(); nil != err {
                log.Println("ListTopics error", err)
            } else {
                for k := range detail {
                    log.Printf("[%s] %+v", k, detail[k])
                }
            }
    
            if err := ca.Close(); nil != err {
                panic(err)
            }
    
            return true
        }
        return false
    }

    0x3 配置文件

    <?xml version="1.0" encoding="utf-8"?>
    <config>
    
      <consumer>
        <!-- Kafka cluster -->
        <broker>127.0.0.1:9092</broker>
    
        <!-- topic 可以配多个-->
        <topic>top1</topic>
        <topic>top2</topic>
        
    
        <!-- Kafka 分组 -->
        <group>test-group</group>
    
        <!-- Kafka 版本 -->
        <version>2.2.0</version>
    
        <!-- partition 个数,开consumer个数不能超过这个 -->
        <partition>16</partition>
    
        <!-- 副本因子 -->
        <replication>2</replication>
      </consumer>
    
      <elastic_url>http://127.0.0.1:9200</elastic_url>
    
      <elastic_worker>
        <!-- 最大缓冲 这个小点可以防止崩溃导致丢失太多-->
        <max_msg>2048</max_msg>
    
        <!-- 线程个数 -->
        <worker_number>1</worker_number>
    
        <!-- 每个批次最大数量 -->
        <batch_size>1024</batch_size>
    
        <!-- 处理频率(毫秒) -->
        <tick_millisecond>5000</tick_millisecond>
      </elastic_worker>
    
    
    </config>

    0x4 注意

    1、如果你的ElasticSearch集群的配置足够高,你可以修改配置文件里的<worker_number>1</worker_number>给Worker开多协程,否则还是单协程性能更高一些。

    2、可以适当调整<batch_size>1024</batch_size>每个批次的数量来提升写入性能。

    3、如果报这个错误  EsRejectedExcutionException说明ES性能扛不住了,需要提升配置,降低写入量。

  • 相关阅读:
    Nhibernate对象转JSON
    C# Windows服务
    C# 接收http请求
    C# XML 基础解系
    C# XML 序列化与反序列化
    C# Newtonsoft.Json 应用
    C# 读取自定义XML
    对图片添加水印
    iText: 对pdf文件添加水印
    java对Office文件处理技术(介绍)
  • 原文地址:https://www.cnblogs.com/mrblue/p/11251498.html
Copyright © 2011-2022 走看看