zoukankan      html  css  js  c++  java
  • go influxdb grafana 读取日志展示

    最近在学go,所以就用go写了一个脚本,读取服务器的log日志,根据正则匹配,从log日志中匹配想要的内容,然后存到influxdb数据库作为数据源,最后将数据在grafana中展示

    下面写一下详细的安装步骤:

    首先我找了一个服务器,在服务器上先进行安装influxdb、安装go、安装grafana。

    因为我找的服务器系统有点老,是centos6.5,所以安装跟高版本的还是不太一样

    安装grafana:

    1、下载安装包:

    wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.6.3-1.x86_64.rpm --no-check-certificate

     

     2、安装环境依赖

    yum install initscripts

    yum install fontconfig

    yum install freetype*

    yum install urw-fonts

    安装grafana服务

    rpm -Uvh grafana-4.6.3-1.x86_64.rpm

     

    3、安装完成后修改配置文件/etc/grafana/grafana.ini

    配置文件可以根据说明自行修改需要的行即可

     

    4、安装后这些文件的存放路径

    安装二进制文件 /usr/sbin/grafana-server

    将init.d脚本复制到 /etc/init.d/grafana-server

    安装默认文件(环境变量) /etc/sysconfig/grafana-server

    将配置文件复制到 /etc/grafana/grafana.ini

    安装systemd服务(如果systemd可用)名称 grafana-server.service

    默认配置使用日志文件 /var/log/grafana/grafana.log

     

    5、grafana服务

    启动:service grafana-server start

    停止:service grafana-server stop

    重启:service grafana-server restart

    加入开机自启动: chkconfig --add grafana-server on

    6、安装zabbix插件

    grafana-cli plugins install alexanderzobnin-zabbix-app

     

    7、安装其他面板插件

    例:

    grafana-cli plugins install grafana-clock-panel

    grafana-cli plugins install grafana-piechart-panel

    grafana-cli plugins install raintank-worldping-app

    grafana-cli plugins install jasonlashua-prtg-datasourc

    grafana-cli plugins install jasonlashua-prtg-datasource g

    rafana-cli plugins install grafana-worldmap-panel

     

    8、卸载插件

    例:

    grafana-cli plugins uninstall jasonlashua-prtg-datasource

    grafana-cli plugins uninstall raintank-worldping-app

     

    9、安装或卸载完成后,需要重启服务

    service grafana-server restart

     

    10、验证

    查看端口3000 是否监听:netstat nuplt 


    查看运行情况命令:
    rpm -qa | grep -i grafana

     

     

    浏览器输入http://ip:3000 查看登录页面是否正常:

     

     出现这个页面说明安装正常:账号和密码都默认admin 

    安装influx:

    1、wget https://dl.influxdata.com/influxdb/releases/influxdb-1.7.8.x86_64.rpm

    2、sudo yum localinstall influxdb-1.7.8.x86_64.rpm
    3、sudo service influxdb start
     
    连接数据库、查看表操作、log_test表就是我用go语言读取log日志,用正则进行解析之后存入表的数据,如图:

     提示:在用go语言定义字段的类型的时候要注意跟数据表的类型对应上,否则会出现插入不到表的情况,会提示类型错误之类的问题。

    influx跟mysql不一样,它没有专门的创建表的语句,它在执行插入操作的时候就默认建了一张表,下面的例子就是默认access_test表不存在,直接执行inser操作插入一条数据

    关于go的代码:

    1、我本地测试的,在目录下有一个access.log日志的脚本

    2、代码文件log文件进行读取log日志,写入influx数据库,这里强调一下

     代码,需要下载go的clinet,地址:https://github.com/influxdata/influxdb-client-go

    否则无法连接

    log文件代码:

    package main

    import (
    "bufio"
    "flag"
    "fmt"
    client "github.com/influxdata/influxdb1-client/v2"
    "io"
    "log"
    "net/http"
    "net/url"
    "os"
    "regexp"
    "strconv"
    "strings"
    "time"
    "encoding/json"
    )

    type Reader interface {
    Read(rc chan []byte)
    }

    type Writer interface {
    Write(wc chan *Message)
    }

    type LogProcess struct {
    rc chan []byte
    wc chan *Message
    read Reader
    write Writer
    }

    type ReadFromFile struct {
    path string // 读取文件的路径
    }

    type WriteToInfluxDB struct {
    influxDBDsn string // influx data source
    }

    type Message struct {
    TimeLocal time.Time
    BytesSent float64
    Path, Method, Scheme, Status string
    UpstreamTime, RequestTime float64
    }

    // 系统状态监控
    type SystemInfo struct {
    HandleLine int `json:"handleLine"` // 总处理日志行数
    Tps float64 `json:"tps"` // 系统吞出量
    ReadChanLen int `json:"readChanLen"` // read channel 长度
    WriteChanLen int `json:"writeChanLen"` // write channel 长度
    RunTime string `json:"runTime"` // 运行总时间
    ErrNum int `json:"errNum"` // 错误数
    }

    const (
    TypeHandleLine = 0
    TypeErrNum = 1
    )

    var TypeMonitorChan = make(chan int, 200)

    type Monitor struct {
    startTime time.Time
    data SystemInfo
    tpsSli []int
    }

    func (m *Monitor) start(lp *LogProcess) {

    go func() {
    for n := range TypeMonitorChan {
    switch n {
    case TypeErrNum:
    m.data.ErrNum += 1
    case TypeHandleLine:
    m.data.HandleLine += 1
    }
    }
    }()

    ticker := time.NewTicker(time.Second * 5)
    go func() {
    for {
    <-ticker.C
    m.tpsSli = append(m.tpsSli, m.data.HandleLine)
    if len(m.tpsSli) > 2 {
    m.tpsSli = m.tpsSli[1:]
    }
    }
    }()

    http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
    m.data.RunTime = time.Now().Sub(m.startTime).String()
    m.data.ReadChanLen = len(lp.rc)
    m.data.WriteChanLen = len(lp.wc)

    if len(m.tpsSli) >= 2 {
    m.data.Tps = float64(m.tpsSli[1]-m.tpsSli[0]) / 5
    }

    ret, _ := json.MarshalIndent(m.data, "", " ")
    io.WriteString(writer, string(ret))
    })

    http.ListenAndServe(":9193", nil)
    }

    func (r *ReadFromFile) Read(rc chan []byte) {
    // 读取模块
    fmt.Println("start readfromfile")
    // 打开文件
    fmt.Println(r.path)
    f, err := os.Open(r.path)

    fmt.Println(f)

    if err != nil {
    panic(fmt.Sprintf("open file error:%s", err.Error()))
    }

    // 从文件末尾开始逐行读取文件内容
    //f.Seek(0, 2)
    rd := bufio.NewReader(f)
    fmt.Println("rd",rd)

    for {
    fmt.Println("for")
    line, err := rd.ReadBytes(' ')

    fmt.Println("line:",line)
    fmt.Println("err:",err)
    fmt.Println("ioeof:",io.EOF)
    fmt.Println("linetorc",line[:len(line)-1])

    if err == io.EOF {
    time.Sleep(500 * time.Millisecond)
    continue
    } else if err != nil {
    panic(fmt.Sprintf("ReadBytes error:", err.Error()))
    }

    TypeMonitorChan <- TypeHandleLine
    rc <- line[:len(line)-1]
    fmt.Println("read foreach success",rc)

    }

    fmt.Println("read success")
    }

    func (w *WriteToInfluxDB) Write(wc chan *Message) {
    // 写入模块

    fmt.Println("writetoinfluxdb")

    // Create a new HTTPClient

    c, err := client.NewHTTPClient(client.HTTPConfig{
    Addr: "http://127.0.0.1:8086",
    Username: "admin",
    Password: "",
    })


    fmt.Println("client.newhttpclient",c)
    if err != nil {
    fmt.Println("err")
    log.Fatal(err)
    }

    fmt.Println("wc",wc)


    for v := range wc {
    fmt.Println("wccccc")
    // Create a new point batch
    bp, err := client.NewBatchPoints(client.BatchPointsConfig{
    Database: "mylogdb",
    Precision: "s",
    })

    fmt.Println("bp:",bp)
    if err != nil {
    log.Fatal(err)
    }

    // Create a point and add to batch
    // Tags: Path, Method, Scheme, Status
    tags := map[string]string{"Path": v.Path, "Method": v.Method, "Scheme": v.Scheme, "Status": v.Status}
    // Fields: UpstreamTime, RequestTime, BytesSent
    fields := map[string]interface{}{
    "UpstreamTime": v.UpstreamTime,
    "RequestTime": v.RequestTime,
    "BytesSent": v.BytesSent,
    }

    pt, err := client.NewPoint("log_test", tags, fields, v.TimeLocal)
    fmt.Println("pt:",pt)
    if err != nil {
    log.Fatal(err)
    }
    bp.AddPoint(pt)

    // Write the batch
    if err := c.Write(bp); err != nil {
    log.Fatal(err)
    }

    log.Println("write success!")
    fmt.Println("write foreach success!")
    }
    fmt.Println("write success")
    }

    func (l *LogProcess) Process() {
    // 解析模块

    /**
    172.0.0.12 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854
    */
    fmt.Println("l logprocess")

    r := regexp.MustCompile(`([d.]+)s+([^ []+)s+([^ []+)s+[([^]]+)]s+([a-z]+)s+"([^"]+)"s+(d{3})s+(d+)s+"([^"]+)"s+"(.*?)"s+"([d.-]+)"s+([d.-]+)s+([d.-]+)`)

    loc, _ := time.LoadLocation("Asia/Shanghai")

    fmt.Println("for process lrc",l.rc)

    for v := range l.rc {

    ret := r.FindStringSubmatch(string(v))

    fmt.Println("logprocessret",ret[5])

    fmt.Println("ret length:", len(ret))

    if len(ret) != 14 {
    TypeMonitorChan <- TypeErrNum
    log.Println("FindStringSubmatch fail:", string(v))
    continue
    }

    message := &Message{}

    t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc)
    if err != nil {
    TypeMonitorChan <- TypeErrNum
    log.Println("ParseInLocation fail:", err.Error(), ret[4])
    continue
    }
    message.TimeLocal = t

    byteSent, _ := strconv.Atoi(ret[8])
    //byteSent, _ := ret[8]
    message.BytesSent = float64(byteSent)

    // GET /foo?query=t HTTP/1.0
    reqSli := strings.Split(ret[6], " ")
    if len(reqSli) != 3 {
    TypeMonitorChan <- TypeErrNum
    log.Println("strings.Split fail", ret[6])
    continue
    }
    message.Method = reqSli[0]

    u, err := url.Parse(reqSli[1])
    if err != nil {
    log.Println("url parse fail:", err)
    TypeMonitorChan <- TypeErrNum
    continue
    }
    message.Path = u.Path

    message.Scheme = ret[5]
    message.Status = ret[7]

    upstreamTime, _ := strconv.ParseFloat(ret[12], 64)
    requestTime, _ := strconv.ParseFloat(ret[13], 64)
    message.UpstreamTime = upstreamTime
    message.RequestTime = requestTime

    l.wc <- message
    fmt.Println("logprocess foreach success")
    }
    fmt.Println("logprocess success")
    }

    func main() {

    //定义路径、数据库变量
    var path, influxDsn string

    //变量path、influxdsn赋值
    flag.StringVar(&path, "path", "/Users/luoyan3/go/src/go-grafana-log/access_test.log", "read file path")
    flag.StringVar(&influxDsn, "influxDsn", "http://127.0.0.1:8086@admin@@mylogdb@s", "influx data source")
    flag.Parse()
    fmt.Println("accccccess_log_path",path)

    r := &ReadFromFile{
    path: path,
    }

    fmt.Println("readfromfile:", r)


    w := &WriteToInfluxDB{
    influxDBDsn: influxDsn,
    }
    fmt.Println("writetoinfluxdb:",w)

    lp := &LogProcess{
    rc: make(chan []byte, 200),
    wc: make(chan *Message, 200),
    read: r,
    write: w,
    }

    fmt.Println("logprocess:", lp.wc)


    go lp.read.Read(lp.rc)


    for i := 0; i < 2 ; i++ {
    fmt.Println("for one")
    go lp.Process()
    }

    fmt.Println("wccccc", lp.wc)

    for i := 0; i < 4 ; i++ {
    fmt.Println("for two")
    go lp.write.Write(lp.wc)
    }


    m := &Monitor{
    startTime: time.Now(),
    data: SystemInfo{},
    }
    m.start(lp)
    fmt.Println("success")
    }

    我这个脚本是可以跑通的。
    最后在grafana展示的结果是:


    最后总结一下,在linux服务器上安装一些东西的时候会有很多报错信息,不要担心,根据报错的内容去谷歌一下看看还需要下载什么,挨个安装就可以了。最后总会安装成功的。

    后期自己会把这一套用在自己的一些项目上,可以看看自己项目的log日志对于各个接口的进行监控。

  • 相关阅读:
    19.2.8 [LeetCode 53] Maximum Subarray
    19.2.8 [LeetCode 52] N-Queens II
    19.2.8 [LeetCode 51] N-Queens
    19.2.7 [LeetCode 50] Pow(x, n)
    19.2.7 [LeetCode 49] Group Anagrams
    19.2.7 [LeetCode 48] Rotate Image
    19.2.7 [LeetCode 47] Permutations II
    19.2.7 [LeetCode 46] Permutations
    19.2.7 [LeetCode 45] Jump Game II
    19.2.4 [LeetCode 44] Wildcard Matching
  • 原文地址:https://www.cnblogs.com/weiluoyan/p/11591936.html
Copyright © 2011-2022 走看看