zoukankan      html  css  js  c++  java
  • sql 数据定时发送webhook golang 服务

    目的很简单,主要是基于cron表达式定时获取sql 数据库数据(sql server,mysql,pg,clickhouse)同时通过webhook 发送到外部接口

    几个需求说明

    • 应该基于配置管理,而不是代码写死的
    • 支持多数据库同时运行(减少运行的实例)
    • 支持sql 数据的处理(对于不用webhook 的数据可能不一样,我们需要处理下)
    • job 支持灵活的cron 表达式
    • 应该轻量,简单,容易使用

    设计

    • 简单说明
      参考了一个sql2slack 的服务,基于golang 编写,使用hcl 进行配置管理,同时基于js 引擎处理数据,同时为了方便数据处理
      提供了内置underscore ,对于cron 的处理基于golang 版本的cron表达式引擎 ,一些改进:基于hcl v2 版本,支持多js 引擎
      (otto以及goja 基于配置指定),调整webhook 消息的发送,支持发送任意消息,同时调整cron支持秒的处理
    • job 格式说明
      基于hcl ,Name 为label,实际上后边可以调整下,将job 扩展为webhook以及db 模式的,
     
    type Job struct {
      Name            string        `hcl:",label"`
      Driver          string        `hcl:"driver"`
      DSN             string        `hcl:"dsn"`
      Query           string        `hcl:"query"`
      Webhook         string        `hcl:"webhook"`
      Schedule        string        `hcl:"schedule"`
      MessageString   string        `hcl:"message"`
      MessageCompiled executor.JSVM `hcl:"-"`
      Conn            *sqlx.DB      `hcl:"-"`
      EngineName      string        `hcl:"jsengine"`
      JSVM            string        `hcl:"-"`
      Stmnt           *sqlx.Stmt    `hcl:"-"`
    }

    参考hcl配置

    job tst {
        webhook = "http://127.0.0.1:4195"
        driver = "mysql"
        dsn = "demo:demo@tcp(127.0.0.1:3306)/demo"
        jsengine = "otto"
        query = <<SQL
            SELECT users.* FROM users
        SQL
        schedule = "* * * * * *"
        message = <<JS
            if ( $rows.length < 1 ) {
                return
            }
            log("this is a demo")
            var msg =  "";
             _.chain($rows).pluck('name').each(function(name){
                msg += name+"--------demo--from otto----";
            })
             var info = {
                msgtype: "text",
                text: {
                    content: msg
                }
            }
            log(JSON.stringify(info))
            send(JSON.stringify(info))
        JS
    }
    • 代码结构
    ├── Dockerfile
    ├── Makefile
    ├── README.md
    ├── cmd
    │   ├── cli
    │   │   ├── Dockerfile
    │   │   ├── Makefile
    │   │   ├── README.md
    │   │   └── main.go
    │   └── server
    │       ├── Dockerfile
    │       ├── Makefile
    │       ├── README.md
    │       └── main.go
    ├── demo.hcl
    ├── demo2.hcl
    ├── docker-compose.yaml
    ├── go.mod
    ├── go.sum
    ├── pkg
    │   ├── agent
    │   ├── buildinfo
    │   │   └── version.go
    │   ├── commands
    │   │   ├── cli.go
    │   │   └── server.go
    │   ├── executor
    │   │   └── jsengine.go
    │   ├── jobs
    │   │   └── job.go
    │   ├── npm
    │   │   └── bindata.go
    │   ├── storage
    │   └── webhooks
    ├── underscore-min.js
    └── webhook.yaml
    • 代码说明
      核心是 jsengine.go以及job.go,jsengine.go 包含了js 引擎的处理,job.go 主要是对于hcl 配置的解析以及cron 的处理
      job.go
      为了方便使用js engine 暴露了log $rows 以及send 发送,可以扩展,同时解析job
     
    package jobs
    import (
        "encoding/json"
        "errors"
        "fmt"
        "log"
        "path/filepath"
        "github.com/dop251/goja"
        "github.com/go-resty/resty/v2"
        "github.com/hashicorp/hcl/v2/hclsimple"
        "github.com/jmoiron/sqlx"
        "github.com/robertkrimen/otto"
        "github.com/robfig/cron/v3"
        "github.com/rongfengliang/sql-server-exporter/pkg/executor"
    )
    // Job is one type for sql data fetch
    type Job struct {
        Name            string        `hcl:",label"`
        Driver          string        `hcl:"driver"`
        DSN             string        `hcl:"dsn"`
        Query           string        `hcl:"query"`
        Webhook         string        `hcl:"webhook"`
        Schedule        string        `hcl:"schedule"`
        MessageString   string        `hcl:"message"`
        MessageCompiled executor.JSVM `hcl:"-"`
        Conn            *sqlx.DB      `hcl:"-"`
        EngineName      string        `hcl:"jsengine"`
        JSVM            string        `hcl:"-"`
        Stmnt           *sqlx.Stmt    `hcl:"-"`
    }
    // ParseJobs  parseJobs
    func ParseJobs(jobsdir string) (map[string]*Job, *cron.Cron, error) {
        var cronhub *cron.Cron = cron.New(cron.WithChain(
            cron.SkipIfStillRunning(cron.DefaultLogger),
            cron.Recover(cron.DefaultLogger),
        ), cron.WithParser(cron.NewParser(
            cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
        )))
        files, err := filepath.Glob(filepath.Join(jobsdir, "*.hcl"))
        if err != nil {
            return nil, nil, err
        }
        result := map[string]*Job{}
        for _, filename := range files {
            var fileJobs struct {
                Jobs []*Job `hcl:"job,block"`
            }
            if err != nil {
                return nil, nil, err
            }
            err := hclsimple.DecodeFile(filename, nil, &fileJobs)
            if err != nil {
                return nil, nil, errors.New("#hcl: " + err.Error())
            }
            for _, job := range fileJobs.Jobs {
                job.MessageCompiled, err = NewJSVM(job.EngineName, job.Name, fmt.Sprintf("(function(){%s})()", job.MessageString))
                if err != nil {
                    return nil, nil, errors.New("#javascript: " + err.Error())
                }
                job.Conn, err = sqlx.Connect(job.Driver, job.DSN)
                if err != nil {
                    return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
                }
                job.Stmnt, err = job.Conn.Preparex(job.Query)
                if err != nil {
                    return nil, nil, errors.New("#sql:" + job.Name + ": " + err.Error())
                }
                if job.Webhook == "" {
                    return nil, nil, errors.New("#webhook:" + job.Name + ": webhook is required")
                }
                if err := (func(job *Job) error {
                    _, err := cronhub.AddFunc(job.Schedule, func() {
                        if err := job.Exec(); err != nil {
                            panic(err)
                        }
                    })
                    return err
                })(job); err != nil {
                    return nil, nil, errors.New("#cron:" + job.Name + ":" + err.Error())
                }
                result[job.Name] = job
            }
        }
        return result, cronhub, nil
    }
    // NewJSVM  NewJSVM
    func NewJSVM(engine string, name, src string) (executor.JSVM, error) {
        var jsjvm executor.JSVM
        switch engine {
        case "goja":
            jsjvm = executor.NewGojaExecutor(src, goja.New())
        case "otto":
            vm := otto.New()
            script, err := vm.Compile(name, src)
            if err != nil {
                return nil, err
            }
            jsjvm = executor.NewOttoExecutor(src, vm, script)
        default:
            return nil, errors.New("not supported js engine")
        }
        return jsjvm, nil
    }
    // Exec job
    func (j *Job) Exec() error {
        rows, err := j.Stmnt.Queryx()
        if err != nil {
            return err
        }
        defer rows.Close()
        var res []map[string]interface{}
        for rows.Next() {
            o := map[string]interface{}{}
            if err := rows.MapScan(o); err != nil {
                return err
            }
            for k, v := range o {
                if nil == v {
                    continue
                }
                switch v.(type) {
                case []uint8:
                    v = []byte(v.([]uint8))
                default:
                    v, _ = json.Marshal(v)
                }
                var d interface{}
                if nil == json.Unmarshal(v.([]byte), &d) {
                    o[k] = d
                } else {
                    o[k] = string(v.([]byte))
                }
            }
            res = append(res, o)
        }
        msg := ""
        ctx := map[string]interface{}{
            "$rows": res,
            "log":   log.Println,
            "send": func(in ...interface{}) {
                msg += fmt.Sprint(in...) + "
    "
            },
        }
        if err := j.MessageCompiled.Execute(ctx); err != nil {
            return err
        }
        _, err = resty.New().R().SetDoNotParseResponse(true).SetHeader("content-type", "application/json").SetBody(msg).Post(j.Webhook)
        return err
    }
     

    jsengine.go
    js 引擎的处理使用了JSVM 接口,同时实现了otto 以及goja 的扩展,都包含了underscore 库

     
    package executor
    import (
        "github.com/dop251/goja"
        "github.com/dop251/goja_nodejs/require"
        "github.com/robertkrimen/otto"
        "github.com/rongfengliang/sql-server-exporter/pkg/npm"
    )
    // JSVM  js Engine define
    type JSVM interface {
        // Execute job command
        Execute(map[string]interface{}) error
    }
    // GojaExecutor goja js executor engine
    type GojaExecutor struct {
        Src string
        VM  *goja.Runtime
    }
    // Execute goja execute command
    func (goja *GojaExecutor) Execute(context map[string]interface{}) error {
        for k, v := range context {
            goja.VM.Set(k, v)
        }
        _, err := goja.VM.RunString(goja.Src)
        return err
    }
    // NewGojaExecutor  GojaExecutor
    func NewGojaExecutor(src string, vm *goja.Runtime) JSVM {
        registry := require.NewRegistryWithLoader(func(path string) ([]byte, error) {
            return npm.Asset(path)
        })
        m, _ := registry.Enable(vm).Require("underscore-min.js")
        vm.Set("_", m)
        return &GojaExecutor{
            Src: src,
            VM:  vm,
        }
    }
    // OttoExecutor Otto js executor engine
    type OttoExecutor struct {
        Src    string
        VM     *otto.Otto
        Script *otto.Script
    }
    // Execute goja execute command
    func (otto *OttoExecutor) Execute(context map[string]interface{}) error {
        for k, v := range context {
            if err := otto.VM.Set(k, v); err != nil {
                return err
            }
        }
        _, err := otto.VM.Run(otto.Script)
        return err
    }
    // Execute js exec script method with vm
    func Execute(jsvm JSVM, context map[string]interface{}) error {
        return jsvm.Execute(context)
    }
    // NewOttoExecutor  OttoExecutor
    func NewOttoExecutor(src string, vm *otto.Otto, script *otto.Script) JSVM {
        return &OttoExecutor{
            Src:    src,
            VM:     vm,
            Script: script,
        }
    }
     

    server.go
    主要是server 端启动的,包含参数的解析以及加载依赖的job 基于urfave/cli/ 提供cli 的处理

     
    package commands
    import (
        "fmt"
        "log"
        "os"
        "github.com/rongfengliang/sql-server-exporter/pkg/buildinfo"
        "github.com/rongfengliang/sql-server-exporter/pkg/jobs"
        "github.com/urfave/cli/v2"
    )
    // Server server
    type Server struct {
    }
    // NewServer return one  Server  Instance
    func NewServer() *Server {
        return &Server{}
    }
    // Run run
    func (s *Server) Run() {
        // TODos
        // load jobs create scheduler info
        app := cli.NewApp()
        app.Usage = "basic sql server data fetch service"
        app.Flags = []cli.Flag{
            &cli.StringFlag{
                Name:  "jobs-dir",
                Usage: "set job dirs",
                Value: ".",
            },
        }
        app.Commands = []*cli.Command{{
            Name:    "version",
            Aliases: []string{"v"},
            Usage:   "print application version",
            Action: func(c *cli.Context) error {
                fmt.Println(buildinfo.Version)
                return nil
            },
        }, {
            Name:  "start",
            Usage: "start service",
            Action: func(c *cli.Context) error {
                fmt.Println(c.String("jobs-dir"))
                jobdir := c.String("jobs-dir")
                if jobdir != "" {
                    loadJobs, cronhub, err := jobs.ParseJobs(jobdir)
                    if err != nil {
                        log.Fatal(err.Error())
                    }
                    for _, v := range loadJobs {
                        log.Println(v.EngineName)
                    }
                    cronhub.Run()
                }
                return nil
            },
        }}
        err := app.Run(os.Args)
        if err != nil {
            log.Fatal(err)
        }
    } 

    server 启动入口
    主要是提供了加载sql driver 以及调用server.go 的解析


     
    package main
    import (
        _ "github.com/ClickHouse/clickhouse-go"
        _ "github.com/denisenkom/go-mssqldb"
        _ "github.com/go-sql-driver/mysql"
        _ "github.com/lib/pq"
        _ "github.com/robertkrimen/otto/underscore"
        "github.com/rongfengliang/sql-server-exporter/pkg/commands"
    )
    func main() {
        // create Server instance
        s := commands.NewServer()
        s.Run()
    }

    测试

    • 构建
      以及make,可以参考源码
     
    make
    • 运行环境准备
      docker-compose.yaml
     
    version: "3"
    services:
      webhook:
          image: jeffail/benthos
          volumes:
          - "./webhook.yaml:/benthos.yaml"
          ports:
          - "4195:4195"
      mysql:
        image: mysql:5.7.16
        ports:
          - 3306:3306
        command: --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
        environment:
          MYSQL_ROOT_PASSWORD: demo
          MYSQL_DATABASE: demo
          MYSQL_USER: demo
          MYSQL_PASSWORD: demo
          TZ: Asia/Shanghai

    webhook.yaml

    input:
      type: broker
      broker:
        inputs:
          - type: http_server
            http_server:
              path: /
            processors:
              - type: text
                text:
                  operator: prepend
                  value: "get message: "
    output:
      type: stdout
     
     
    • 数据准备
    CREATE TABLE `users` (
      `name` varchar(100) ,
      `status` varchar(100)
    ) ENGINE=InnoDB
    INSERT INTO demo.users
    (name, status)
    VALUES('dalong', '0');
    INSERT INTO demo.users
    (name, status)
    VALUES('demo', '1');
    INSERT INTO demo.users
    (name, status)
    VALUES('rong', '1');
    • 运行效果
    ./bin/exporter-server start

    说明

    以上是一个简单的介绍,详细的可以参考github 代码

    参考资料

    https://github.com/rongfengliang/sql-data-exporter

  • 相关阅读:
    asp.net 正则表达式
    字符串分隔
    用定时器实现逐渐放大层的功能
    js获取剪贴板内容
    使用无线网卡共享上网
    使用事件探查器跟踪sqlserver进程
    document.all.WebBrowser.ExecWB
    (转)JAVA与.NET DES加密解密
    web打印的实现
    关于div的定位
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/13237049.html
Copyright © 2011-2022 走看看