zoukankan      html  css  js  c++  java
  • golang 版本 gearman 试用

    g2 是golang 版的gearman 实现,包含了server (支持leveldb,以及metrics)、client 代码、worker 代码
    使用上还是很方便的,同时部署也比较简单,结合docker 我们可以实现快速的部署
    使用go mod 进行包管理

    docker-compose文件

     
    version: "3"
    services:
      app:
        image: appscode/gearmand:0.5.2
        command: run --v=3 --storage-dir=/my-dir --addr="0.0.0.0:4730"
        volumes:
        - "./db:/my-dir"
        ports:
        - "4730:4730"
        - "3000:3000"
      client:
        build:
         context: ./client
      worker:
        build:
         context: ./worker
     
     

    client 测试代码

    package main
    import (
      "log"
      "os"
      "sync"
      "github.com/appscode/g2/client"
      rt "github.com/appscode/g2/pkg/runtime"
    )
    func main() {
      // Set the autoinc id generator
      // You can write your own id generator
      // by implementing IdGenerator interface.
      // client.IdGen = client.NewAutoIncId()
      c, err := client.New(rt.Network, "app:4730")
      if err != nil {
        log.Fatalln(err)
      }
      defer c.Close()
      c.ErrorHandler = func(e error) {
        log.Println(e)
        os.Exit(1)
      }
      echo := []byte("Hellox00 world")
      echomsg, err := c.Echo(echo)
      if err != nil {
        log.Fatalln(err)
      }
      log.Println(string(echomsg))
      jobHandler := func(resp *client.Response) {
        switch resp.DataType {
        case rt.PT_WorkException:
          fallthrough
        case rt.PT_WorkFail:
          fallthrough
        case rt.PT_WorkComplete:
          if data, err := resp.Result(); err == nil {
            log.Printf("RESULT: %v
    ", data)
          } else {
            log.Printf("RESULT: %s
    ", err)
          }
        case rt.PT_WorkWarning:
          fallthrough
        case rt.PT_WorkData:
          if data, err := resp.Update(); err == nil {
            log.Printf("UPDATE: %v
    ", data)
          } else {
            log.Printf("UPDATE: %v, %s
    ", data, err)
          }
        case rt.PT_WorkStatus:
          if data, err := resp.Status(); err == nil {
            log.Printf("STATUS: %v
    ", data)
          } else {
            log.Printf("STATUS: %s
    ", err)
          }
        default:
          log.Printf("UNKNOWN: %v", resp.Data)
        }
      }
      handle, err := c.Do("ToUpper", echo, rt.JobNormal, jobHandler)
      if err != nil {
        log.Fatalln(err)
      }
      status, err := c.Status(handle)
      if err != nil {
        log.Fatalln(err)
      }
      log.Printf("%v", *status)
      _, err = c.Do("Foobar", echo, rt.JobNormal, jobHandler)
      if err != nil {
        log.Fatalln(err)
      }
      log.Println("Press Ctrl-C to exit ...")
      var mutex sync.Mutex
      mutex.Lock()
      mutex.Lock()
    }
     
     

    worker 测试代码

    package main
    import (
      "log"
      "net"
      "os"
      "strings"
      "time"
      "github.com/appscode/g2/worker"
      "github.com/mikespook/golib/signal"
    )
    func ToUpper(job worker.Job) ([]byte, error) {
      log.Printf("ToUpper: Data=[%s]
    ", job.Data())
      data := []byte(strings.ToUpper(string(job.Data())))
      return data, nil
    }
    func ToUpperDelay10(job worker.Job) ([]byte, error) {
      log.Printf("ToUpper: Data=[%s]
    ", job.Data())
      time.Sleep(10 * time.Second)
      data := []byte(strings.ToUpper(string(job.Data())))
      return data, nil
    }
    func Foobar(job worker.Job) ([]byte, error) {
      log.Printf("Foobar: Data=[%s]
    ", job.Data())
      for i := 0; i < 10; i++ {
        job.SendWarning([]byte{byte(i)})
        job.SendData([]byte{byte(i)})
        job.UpdateStatus(i+1, 100)
      }
      return job.Data(), nil
    }
    func main() {
      log.Println("Starting ...")
      defer log.Println("Shutdown complete!")
      w := worker.New(worker.Unlimited)
      defer w.Close()
      w.ErrorHandler = func(e error) {
        log.Println(e)
        if opErr, ok := e.(*net.OpError); ok {
          if !opErr.Temporary() {
            proc, err := os.FindProcess(os.Getpid())
            if err != nil {
              log.Println(err)
            }
            if err := proc.Signal(os.Interrupt); err != nil {
              log.Println(err)
            }
          }
        }
      }
      w.JobHandler = func(job worker.Job) error {
        log.Printf("Data=%s
    ", job.Data())
        return nil
      }
      w.AddServer("tcp4", "app:4730")
      w.AddFunc("Foobar", Foobar, worker.Unlimited)
      w.AddFunc("ToUpper", ToUpper, worker.Unlimited)
      w.AddFunc("ToUpperTimeOut5", ToUpperDelay10, 5)
      w.AddFunc("ToUpperTimeOut20", ToUpperDelay10, 20)
      w.AddFunc("SysInfo", worker.SysInfo, worker.Unlimited)
      w.AddFunc("MemInfo", worker.MemInfo, worker.Unlimited)
      if err := w.Ready(); err != nil {
        log.Fatal(err)
        return
      }
      go w.Work()
      signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
      signal.Wait()
    }
     
     

    运行&&测试

    • 构建&&运行
    docker-compose build && docker-compose up -d
    • 查看日志log
      日志包含了请求的处理
     
    app_1 | I0117 13:43:22.960922 1 server.go:824] total cron job: 0 #repeated job: 0 #onetime job: 0
    app_1 | I0117 13:43:22.960955 1 server.go:834] total job: 0 #background: 0 #running: 0
    app_1 | I0117 13:53:22.960868 1 server.go:824] total cron job: 0 #repeated job: 0 #onetime job: 0
    app_1 | I0117 13:53:22.960895 1 server.go:834] total job: 0 #background: 0 #running: 0
    app_1 | I0117 14:03:22.960897 1 server.go:824] total cron job: 0 #repeated job: 0 #onetime job: 0
    app_1 | I0117 14:03:22.960928 1 server.go:834] total job: 0 #background: 0 #running: 0
    app_1 | I0117 14:13:22.960884 1 server.go:824] total cron job: 0 #repeated job: 0 #onetime job: 0
    app_1 | I0117 14:13:22.960922 1 server.go:834] total job: 0 #background: 0 #running: 0
    client_1 | 2019/01/17 13:34:17 {H:-866e208c2d8c:-1-1547732002-3 false false 0 100}
    client_1 | 2019/01/17 13:34:17 Press Ctrl-C to exit ...
    worker_1 | 2019/01/17 13:33:22 Starting ...
    worker_1 | 2019/01/17 13:33:23 ToUpper: Data=[Hello world]
    worker_1 | 2019/01/17 13:33:23 Foobar: Data=[Hello world]
    worker_1 | 2019/01/17 13:34:17 ToUpper: Data=[Hello world]
    worker_1 | 2019/01/17 13:34:17 Foobar: Data=[Hello world]
     
     

    说明

    gearman 是标准的job 处理框架,已经有了好多语言的实现

    参考资料

    https://github.com/appscode/g2 
    https://github.com/rongfengliang/gearmangolang-docker 
    http://gearman.org/

  • 相关阅读:
    [leetcode]34.Find First and Last Position of Element in Sorted Array找区间
    [leetcode]278. First Bad Version首个坏版本
    [leetcode]367. Valid Perfect Square验证完全平方数
    [leetcode]45. Jump Game II青蛙跳(跳到终点最小步数)
    [leetcode]55. Jump Game青蛙跳(能否跳到终点)
    [leetcode]26. Remove Duplicates from Sorted Array有序数组去重(单个元素只出现一次)
    [leetcode]27. Remove Element删除元素
    [leetcode]20. Valid Parentheses有效括号序列
    [leetcode]15. 3Sum三数之和
    C#中的局部类型
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/10285306.html
Copyright © 2011-2022 走看看