zoukankan      html  css  js  c++  java
  • LogAgent —— etcd+kafka+zookeeper+go实现实时读取日志发送到kafka,并实现热加载配置读取的日志路径

    工具包目录结构:

    .
    ├── conf
    │   ├── logAgent.ini
    │   └── logAgentConfig.go
    ├── etcd
    │   └── etcd.go
    ├── kafka
    │   └── kafka.go
    ├── main.go
    └── taillog
        ├── taillog.go
        └── taillog_mgr.go

    logAgent.ini

     1 [kafka]
     2 address=127.0.0.1:9092
     3 chan_max_size=100000
     4 
     5 [etcd]
     6 address=127.0.0.1:2379
     7 timeout=5
     8 collect_log_key=xxx
     9 
    10 [taillog]
    11 filename="./my.log"

    logAgentConf.go

     1 /**
     2  * @Author: Mr.Cheng
     3  * @Description:
     4  * @File: logAgentConfig
     5  * @Version: 1.0.0
     6  * @Date: 2021/12/9 下午8:44
     7  */
     8 
     9 package logAgentConfig
    10 
    11 type AppConf struct {
    12     KafkaConf `ini:"kafka"`
    13     EtcdConf  `ini:"etcd"`
    14     // TaillogConf `ini:"taillog"`
    15 }
    16 
    17 type KafkaConf struct {
    18     Address string `ini:"address"`
    19     Size    int    `ini:"chan_max_size"`
    20 }
    21 
    22 type EtcdConf struct {
    23     Address string `ini:"address"`
    24     Timeout int    `ini:"timeout"`
    25     Key     string `ini:"collect_log_key"`
    26 }
    27 
    28 // ----- unused ↓️----
    29 
    30 type TaillogConf struct {
    31     FileName string `ini:"filename"`
    32 }

    etcd.go

     1 /**
     2  * @Author: Mr.Cheng
     3  * @Description:
     4  * @File: etcd
     5  * @Version: 1.0.0
     6  * @Date: 2021/12/9 下午9:12
     7  */
     8 package etcd
     9 
    10 import (
    11     "context"
    12     "encoding/json"
    13     "fmt"
    14     "go.etcd.io/etcd/clientv3"
    15     "time"
    16 )
    17 
    18 var (
    19     client *clientv3.Client
    20 )
    21 
    22 type LogEntry struct {
    23     Path  string `json:"path"`  // 日志存放的路径
    24     Topic string `json:"topic"` // 日志要发往kafka的topic
    25 }
    26 
    27 func Init(address string, interval int) (err error) {
    28     client, err = clientv3.New(clientv3.Config{
    29         Endpoints:   []string{address},
    30         DialTimeout: time.Duration(interval) * time.Second,
    31     })
    32     if err != nil {
    33         fmt.Printf("connect to etcd failed, err:%v\n", err)
    34         return err
    35     }
    36     return
    37 }
    38 
    39 // 从Etcd中根据Key获取配置项
    40 func GetConf(key string) (LogEntryConf []*LogEntry, err error) {
    41     ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    42     resp, err := client.Get(ctx, key)
    43     cancel()
    44     if err != nil {
    45         fmt.Printf("get from etcd failed, err:%v\n", err)
    46         return nil, err
    47     }
    48     for _, ev := range resp.Kvs {
    49         //fmt.Printf("%s:%s\n", ev.Key, ev.Value)
    50         err = json.Unmarshal(ev.Value, &LogEntryConf)
    51         if err != nil {
    52             fmt.Printf("unmarshal etcd value failed, err:%v\n", err)
    53             return nil, err
    54         }
    55     }
    56     return LogEntryConf, nil
    57 }
    58 
    59 // etcd watch
    60 func WatchConf(key string, newConfChan chan<- []*LogEntry) {
    61     ch := client.Watch(context.Background(), key)
    62     for wresp := range ch {
    63         for _, evt := range wresp.Events {
    64             fmt.Printf("Type:%v key:%v value:%v\n", evt.Type, string(evt.Kv.Key), string(evt.Kv.Value))
    65             var newConf []*LogEntry
    66             // 如果是删除操作,json.Unmarshal会报错,需手动添加一个空的newConf
    67             if evt.Type != clientv3.EventTypeDelete {
    68                 err := json.Unmarshal(evt.Kv.Value, &newConf)
    69                 if err != nil {
    70                     fmt.Printf("unmarshal new conf failed, err:%v\n", err)
    71                     continue
    72                 }
    73             }
    74             newConfChan <- newConf
    75         }
    76     }
    77 }

    kafka.go

     1 /**
     2  * @Author: Mr.Cheng
     3  * @Description:往kafka写入日志
     4  * @File: kafka
     5  * @Version: 1.0.0
     6  * @Date: 2021/12/9 下午2:19
     7  */
     8 
     9 package kafka
    10 
    11 import (
    12     "fmt"
    13     "github.com/Shopify/sarama"
    14     "time"
    15 )
    16 
    17 type logData struct {
    18     Topic string
    19     Data  string
    20 }
    21 
    22 var (
    23     client      sarama.SyncProducer // 全局连接kafka的生产者
    24     logDataChan chan *logData
    25 )
    26 
    27 // 初始化连接
    28 func Init(address []string, size int) (err error) {
    29     config := sarama.NewConfig()
    30     config.Producer.RequiredAcks = sarama.WaitForAll          // 发送模式(需leader和follow都确认)
    31     config.Producer.Partitioner = sarama.NewRandomPartitioner // 选择分区的方式(轮询)
    32     config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel中返回
    33 
    34     // 连接kafka
    35     client, err = sarama.NewSyncProducer(address, config)
    36     if err != nil {
    37         fmt.Printf("client kafka failed, err:%v\n", err)
    38         return err
    39     }
    40 
    41     // 初始化logDataChan
    42     logDataChan = make(chan *logData, size)
    43 
    44     // 从logDataChan中取数据发往kafaka
    45     go sendToKafka()
    46     return nil
    47 }
    48 
    49 func SendToChan(Topic, Data string) {
    50     data := &logData{
    51         Topic: Topic,
    52         Data:  Data,
    53     }
    54     select {
    55     case logDataChan <- data:
    56     default:
    57         time.Sleep(time.Millisecond * 100)
    58     }
    59 }
    60 
    61 func sendToKafka() {
    62     // 循环从通道logDataChan取值并发送给kafka
    63     for {
    64         select {
    65         case data := <-logDataChan:
    66             msg := &sarama.ProducerMessage{}
    67             msg.Topic = data.Topic
    68             msg.Value = sarama.StringEncoder(data.Data)
    69             pid, offset, err := client.SendMessage(msg)
    70             if err != nil {
    71                 fmt.Printf("send msg failed, err:%v\n", err)
    72             }
    73             fmt.Printf("send msg success, pid:%v offect:%v\n", pid, offset)
    74         default:
    75             time.Sleep(time.Millisecond * 50)
    76         }
    77     }
    78 }

    taillog.go

     1 /**
     2  * @Author: Mr.Cheng
     3  * @Description:收集日志模块
     4  * @File: taillog
     5  * @Version: 1.0.0
     6  * @Date: 2021/12/8 下午9:54
     7  */
     8 
     9 package taillog
    10 
    11 import (
    12     "context"
    13     "day21/02.log_agent/kafka"
    14     "fmt"
    15     "github.com/hpcloud/tail"
    16     "time"
    17 )
    18 
    19 type TailTask struct {
    20     Path     string
    21     Topic    string
    22     Instance *tail.Tail
    23     // 为了停止任务,存下context
    24     ctx    context.Context
    25     cancel context.CancelFunc
    26 }
    27 
    28 func NewTailTask(Path, Topic string) (tailtask *TailTask, err error) {
    29     config := tail.Config{
    30         Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, // 从文件那个地方开始读
    31         ReOpen:    true,                                 // 重新打开
    32         MustExist: false,                                // 文件不存在不报错
    33         Poll:      true,
    34         Follow:    true, // 是否跟随
    35     }
    36     ctx, cancel := context.WithCancel(context.Background())
    37     tailObj, err := tail.TailFile(Path, config)
    38     if err != nil {
    39         fmt.Printf("tail file failed, err:%v\n", err)
    40         return nil, err
    41     }
    42     tailtask = &TailTask{Path: Path, Topic: Topic, Instance: tailObj, ctx: ctx, cancel: cancel}
    43     // 开启读取日志并发送给kafka
    44     go tailtask.ReadFromTail()
    45     return tailtask, nil
    46 }
    47 
    48 func (tailtask *TailTask) ReadFromTail() {
    49     for {
    50         select {
    51         case <-tailtask.ctx.Done():
    52             return
    53         case line, ok := <-tailtask.Instance.Lines:
    54             if !ok {
    55                 fmt.Printf("tail fail close reopen, filename:%s\n", tailtask.Path)
    56                 time.Sleep(time.Second)
    57                 continue
    58             }
    59             kafka.SendToChan(tailtask.Topic, line.Text)
    60         default:
    61             time.Sleep(time.Second)
    62         }
    63     }
    64 }

    taillog_mgr.go

      1 /**
      2  * @Author: Mr.Cheng
      3  * @Description:
      4  * @File: taillogMgr
      5  * @Version: 1.0.0
      6  * @Date: 2021/12/14 下午3:47
      7  */
      8 
      9 package taillog
     10 
     11 import (
     12     "day21/02.log_agent/etcd"
     13     "fmt"
     14     "time"
     15 )
     16 
     17 type TailMgr struct {
     18     logEntry    []*etcd.LogEntry
     19     tskMap      map[string]*TailTask
     20     newConfChan chan []*etcd.LogEntry
     21 }
     22 
     23 var tskMgr *TailMgr
     24 
     25 // 循环每一个日志收集项,创建tailObj,并发往kafka
     26 func Init(logEntryConf []*etcd.LogEntry) {
     27     tskMgr = &TailMgr{
     28         logEntry:    logEntryConf,
     29         tskMap:      make(map[string]*TailTask, 16),
     30         newConfChan: make(chan []*etcd.LogEntry),
     31     }
     32 
     33     for _, LogEntry := range logEntryConf {
     34         // fmt.Printf("Path:%v Topic:%v\n", LogEntry.Path, LogEntry.Topic)
     35         tailtask, err := NewTailTask(LogEntry.Path, LogEntry.Topic)
     36         if err != nil {
     37             continue
     38         }
     39         // 在tskMap中存储一下,以便发生配置变更时做增删改操作
     40         key := fmt.Sprintf("%s_%s", tailtask.Path, tailtask.Topic)
     41         tskMgr.tskMap[key] = tailtask
     42     }
     43 
     44     go tskMgr.run()
     45 }
     46 
     47 // 监听newConfChan是否有数据,有数据则表示etcd配置有变化,需做相应的处理
     48 func (t *TailMgr) run() {
     49     for {
     50         select {
     51         case newConf := <- t.newConfChan:
     52             fmt.Printf("配置发生变更,Conf:%v\n", newConf)
     53             // 找出新增项
     54             for _, logEntry := range newConf {
     55                 key := fmt.Sprintf("%s_%s", logEntry.Path, logEntry.Topic)
     56                 _, ok := t.tskMap[key]
     57                 if ok {
     58                     // 表示该配置项原先存在
     59                     continue
     60                 } else {
     61                     // 属于新增配置
     62                     fmt.Printf("新增项,path:%s topic:%s\n", logEntry.Path, logEntry.Topic)
     63                     tailtask, err := NewTailTask(logEntry.Path, logEntry.Topic)
     64                     if err != nil {
     65                         continue
     66                     }
     67                     // TailMgr的logEntry和tskMap增加对应项
     68                     t.logEntry = append(t.logEntry, logEntry)
     69                     t.tskMap[key] = tailtask
     70                     go tailtask.ReadFromTail()
     71                 }
     72             }
     73             // 找出删除项
     74             for index, c1 := range t.logEntry {
     75                 isDelete := true
     76                 for _, c2 := range newConf {
     77                     if c1.Path == c2.Path && c1.Topic == c2.Topic {
     78                         isDelete = false
     79                         break
     80                     }
     81                 }
     82                 if isDelete{
     83                     // 表示属于删除项,从tskMap拿出tailtask对象,执行对象的cancel函数,并将该对象从tskMap中删除
     84                     fmt.Printf("删除项,path:%s topic:%s\n", c1.Path, c1.Topic)
     85                     key := fmt.Sprintf("%s_%s", c1.Path, c1.Topic)
     86                     t.tskMap[key].cancel()
     87                     // TailMgr的logEntry和tskMap删除对应项
     88                     delete(t.tskMap, key)
     89                     t.logEntry = append(t.logEntry[:index], t.logEntry[index + 1:]...)
     90                 }
     91             }
     92         default:
     93             time.Sleep(time.Second)
     94         }
     95     }
     96 }
     97 
     98 // 向外暴露newConfChan
     99 func NewConfChan() chan<- []*etcd.LogEntry{
    100     return tskMgr.newConfChan
    101 }

    main.go

     1 /**
     2  * @Author: Mr.Cheng
     3  * @Description:
     4  * @File: main
     5  * @Version: 1.0.0
     6  * @Date: 2021/12/9 下午8:43
     7  */
     8 
     9 package main
    10 
    11 import (
    12     logAgentConfig "day21/02.log_agent/conf"
    13     "day21/02.log_agent/etcd"
    14     "day21/02.log_agent/kafka"
    15     "day21/02.log_agent/taillog"
    16     "fmt"
    17     "gopkg.in/ini.v1"
    18     "sync"
    19 )
    20 
    21 var (
    22     cfg = new(logAgentConfig.AppConf)
    23     wg  sync.WaitGroup
    24 )
    25 
    26 func main() {
    27     // 加载配置文件
    28     err := ini.MapTo(cfg, "./conf/logAgent.ini")
    29     if err != nil {
    30         fmt.Printf("load ini failed, err:%v\n", err)
    31         return
    32     }
    33 
    34     // 初始化kafka连接
    35     err = kafka.Init([]string{cfg.KafkaConf.Address}, cfg.KafkaConf.Size)
    36     if err != nil {
    37         return
    38     }
    39     fmt.Println("init kafka success")
    40 
    41     // 初始化etcd
    42     err = etcd.Init(cfg.EtcdConf.Address, cfg.EtcdConf.Timeout)
    43     if err != nil {
    44         return
    45     }
    46     fmt.Println("init etcd success")
    47 
    48     // 从etcd中获取日志收集项的配置信息
    49     logEntryConf, err := etcd.GetConf(cfg.EtcdConf.Key)
    50     if err != nil {
    51         return
    52     }
    53     fmt.Printf("get conf from etcd success, conf:%v\n", logEntryConf)
    54 
    55     // 收集日志发往kafka
    56     // 循环每一个日志收集项,创建tailObj,并发往kafka
    57     taillog.Init(logEntryConf)
    58 
    59     // 监视etcd中配置的变动,如有变动,给新的配置信息给taillog
    60     wg.Add(1)
    61     go etcd.WatchConf(cfg.EtcdConf.Key, taillog.NewConfChan())
    62     wg.Wait()
    63 }
  • 相关阅读:
    CentOS 7.x 二进制方式安装mysql5.7.24
    Java集合框架学习总结
    deque 归纳
    KMP学习笔记
    luogu 3393 逃离僵尸岛
    luogu 1280 尼克的任务
    HDU4871 Shortest-path tree(点分治)
    BZOJ4399 魔法少女LJJ
    BZOJ4771 七彩树
    (BUILDER)建造者与(FACTORY)工厂模式 的比较
  • 原文地址:https://www.cnblogs.com/zzmx0/p/15706113.html
Copyright © 2011-2022 走看看