zoukankan      html  css  js  c++  java
  • 海量日志收集项目总结(2)logtranfer

    海量日志收集项目总结(2)logtranfer

    简介

    logtranfer的作用是:把日志从kafka消息队列中读出来,发往ES,并连接kibana,做可视化展示。

    模块架构为:

    mark

    项目源码

    项目的目录结构

    mark

    各部分源码

    main.go:

    package main
    
    import (
       "github.com/wind-zhou/log_transfer/conf"
       "github.com/wind-zhou/log_transfer/es"
       "gopkg.in/ini.v1"
       "fmt"
       "github.com/wind-zhou/log_transfer/kafka"
    )
    
    //1.将日志数据取出
       //2.发往ES
    var (
       cfg = new(conf.LogTransfer)
    )
       func main(){
    
       	//0. 加载配置文件
          err := ini.MapTo(cfg, "./conf/cfg.ini")
          if err != nil {
             fmt.Println("load ini failed")
             return
          }
          fmt.Println(cfg)
    
    
          //初始化es
          err=es.Init(cfg.EsCfg.Address)
          if err != nil {
             fmt.Printf("init es failed err=%v
    ",err)
          }
    
          fmt.Println("init es success")
           //1.初始化kafka
           //1.1 链接kafka
           //1.2 每个分区读数据发往es
    
       	err=kafka.Init([]string{cfg.KafkaCfg.Address},cfg.KafkaCfg.Topic)
          if err != nil {
             fmt.Printf("init kafka failed err=%v
    ",err)
             return
          }
          select {
    
          }
       }
    

    kafka.go:

    package kafka
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    	"github.com/wind-zhou/log_transfer/es"
    )
    
    
    //初始化client 消费者
    //从kafka取数据发给es
    func Init(address []string,topic string)  error {
    	consumer, err := sarama.NewConsumer(address, nil)
    	if err != nil {
    		fmt.Printf("fail to start consumer, err:%v
    ", err)
    		return err
    	}
    	partitionList, err := consumer.Partitions(topic) // 根据topic取到所有的分区
    	if err != nil {
    		fmt.Printf("fail to get list of partition:err%v
    ", err)
    		return err
    	}
    	fmt.Println("分区列表:",partitionList)
    	for partition := range partitionList { // 遍历所有的分区
    		// 针对每个分区创建一个对应的分区消费者
    		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
    		if err != nil {
    			fmt.Printf("failed to start consumer for partition %d,err:%v
    ", partition, err)
    			return err
    		}
    		//defer pc.AsyncClose()
    		// 异步从每个分区消费信息
    		go func(sarama.PartitionConsumer) {
    			for msg := range pc.Messages() {
    				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v
    ", msg.Partition, msg.Offset, msg.Key, string(msg.Value))
    				//发送给es
    				ld:=es.LogData{
    					Topic: topic,
    					Data: string(msg.Value),
    				}
    				es.SendToEsChan(&ld)//把读取的分区数据发送到一个通道
    				if err != nil {
    					fmt.Printf("es.SendToEs err=%v
    ",err)
    					return
    				}
    			}
    		}(pc)
    	}
    	return  err
    }
    
    

    es.go:

    package es
    
    import (
    	"context"
    	"fmt"
    	"github.com/olivere/elastic"
    	"strings"
    
    	"time"
    )
    
    type LogData struct {
    	Topic string `json:"topic"`
    
    	Data string`json:"data"`
    }
    
    var (
    	client *elastic.Client
    	ch =make(  chan *LogData,100000)
    )
    
    //初始化es
    //准备接受Kafka的数据
    
    func Init(address string)(err error){
    	if !strings.HasPrefix(address,"http://"){
    		address="http://"+address
    	}
    	client, err = elastic.NewClient(elastic.SetURL(address))
    	if err != nil {
    		// Handle error
    		return
    	}
    
    	fmt.Println("connect to es success")
    
    	go SendToEs()
    	return
    
    }
    
    
    //SendToEs 发送数据到es
    
    func SendToEsChan(msg *LogData) {
    	ch <-msg
    
    }
    
    func SendToEs(){
    	for{
    		select {
    			case msg:= <-ch:
    
    				put1, err := client.Index().Index(msg.Topic).Type("xxx").BodyJson(msg).Do(context.Background())
    				if err != nil {
    					// Handle error
    					fmt.Println(err)
    
    				}
    				fmt.Printf("Indexed user %s to index %s, type %s
    ", put1.Id, put1.Index, put1.Type)
    		default:
    			time.Sleep(time.Second)
    		}
    	}
    
    }
    

    cfg.go:

    package conf
    
    type LogTransfer struct{
    	KafkaCfg`ini:"kafka"`
    	EsCfg`ini:"es"`
    }
    type KafkaCfg struct {
    	Address string`ini:"address"`
    	Topic string`ini:"topic"`
    }
    
    type EsCfg struct {
    	Address string`ini:"address"`
    }
    

    cfg.ini:

    [kafka]
    address=192.168.43.99:9092
    topic=web_log
    
    [es]
    address=127.0.0.1:9200
    

    项目分析

    这部分的主角其实只有两个:kafka和ES,kibana只是个工具人(因为不需要用go语言操作kibana)。

    • kafka部分需要写一个kafka的消费者,负责从消息队列中按照特定的topic取出日志信息。
    • ES部分需要编写代码,将取出的数据写入ES。

    各部分操作都需要初始化,kafka需要知道主机的地址,以及所取日志的topic;ES需要知道主机的地址,因此这些参数可以写入配置文件中,也就是cfg.ini, cfg.go里面声明一些结构体,用于之后配置文件解析。

    注:

    上面给的源码中,kafka消费者所取的日志topic已经写死到了配置文件中,也就是kafka消费者,只拉取web_log主题的日志,如果想要拉取里面所有的topic日志,可以仿照前面的logagent,加一个etcd模块,用于存储之前存入的topic。

    main.go中解析配置文件代码:

    mark

    kafka消费者:

    Kafka消费者的逻辑为:

    1. 传入kafka主机地址,连接上kafka,创建一个消费者的
    2. 传入日志的topic,获取到搞topic所有的分区
    3. 遍历所有的分区,并在每个分区创建一个goroutine,分别去读取日志
    4. 每个goroutine内,将读取的信息发往ES

    ES操作:

    ES部分的逻辑为:

    1. 传入ES主机的地址,连接ES,并产生一个ES的client
    2. 开启一个goroutine,将数据传入ES的指定的Index,Type。

    OK,这部分核心代码就结束了。

    一些细节:

    1. 消费者是按topic从kafka读取数据,因此在传入ES时,可以按topic创建Index
    2. 消费者读取的数据为字符串或字符切片。先构造成一个结构体,发送到ES时,会将其转换成 json格式。
    3. main程序最后加了个select{},用于等待,否则主程序会退出

    这里还做了一些优化,如kafka拿到数据后,先将其送入一个通道,ES端开启的goroutine是在通道中拿数据并发送。

    项目调试

    测试环境搭建

    1. 开启kafka

      1. 开启zookeeper :.inwindowszookeeper-server-start.bat .configzookeeper.properties
      2. 开启kafka : .inwindowskafka-server-start.bat .configserver.properties
    2. 开启etcd

    3. 开启es : binelasticsearch.bat

    4. 开启kaibana,连接到ES

      开启kibana步骤:

      1. 修改kibana配置文件kibana.yaml中的内容

      设置ES主机地址:

      mark

      设置中文编码:

      mark

      1. 开启kibana : binkibana.bat

      mark

      1. 访问kibana : http://127.0.0.1:5601

      mark

    5. 开启前面写的logagent

      mark

    6. 开启logtranfer

    mark

    调试

    1. 打开web_log主题日志所在的文件nginx.log , 在里面写入数据

    mark

    1. logagent端返回的结果

    mark

    1. logtranfer端 返回结果

    mark

    可见收到的web_log日志被存储到了ES的Index=web_log,Type=xxx中。

    1. 在kibana中索引

    mark

    后记

    到此,这个项目完结,第一次写实战博客,写的很烂,但作为一个开始吧!

    本次项目简单学了一些新东西:

    ES,kafka,zookeeper,kinbana等。

    另外,这个项目是看网上盗版视频缩写,非常的粗糙,logagent和logtranfer两部分并没有很好的衔接起来,两个模块没有整合到在一起,例如logtranfer只读取了logagent中web-log主题日志,没有读取其所有topic日志。

    这可以进行优化,大致的思路是,在logagent端每次读取etcd新变化后,记录下该配置,并传递给logtranfer,然后logtranfer端根据此配置去开启或关闭goroutine。

  • 相关阅读:
    Zabbix5 Frame 嵌套
    Zabbix5 对接 SAML 协议 SSO
    CentOS7 安装 Nexus
    CentOS7 安装 SonarQube
    GitLab 后台修改用户密码
    GitLab 查看版本号
    GitLab Admin Area 500 Error
    Linux 安装 PostgreSQL
    Liger ui grid 参数
    vue.js 是一个怪东西
  • 原文地址:https://www.cnblogs.com/wind-zhou/p/12926190.html
Copyright © 2011-2022 走看看