zoukankan      html  css  js  c++  java
  • Go语言学习之12 etcd、contex、kafka消费实例、logagent

    本节内容:
        1. etcd介绍与使用
        2. ElastcSearch介绍与使用

    1. etcd介绍与使用
        概念:高可用的分布式key-value存储,可以使用配置共享和服务发现
        类似项目:zookeeper和consul
        开发语言:Go
        接口:提供restful的http接口,使用简单
        实现算法:基于raft算法的强一致性、高可用的服务存储目录

    2. etcd的应用场景
        a. 服务发现和服务注册
        b. 配置中心
        c. 分布式存储
        d. master选举

    3. etcd搭建
        a. 下载etcd release版本:https://github.com/etcd-io/etcd/releases 版本
        b. 解压后,进入到etcd的根目录,直接执行./etcd 可以启动etcd
        c. 使用etcdctl工具更改配置

    4. context使用介绍
        a. 如何控制goroutine
        b. 如何保存上下文数据

     (1)使用context处理超时
        ctx, cancel = context.With.Timeout(context.Background(), 2*time.Second)

        示例是通过设置ctx超时时间为2s,如果2s类无法接收到baidu的请求返回,则超时异常。

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "io/ioutil"
     7     "net/http"
     8     "time"
     9 )
    10 type Result struct {
    11     r   *http.Response
    12     err error
    13 }
    14 
    15 func process() {
    16     ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    17     defer cancel()
    18     tr := &http.Transport{}
    19     client := &http.Client{Transport: tr}
    20     c := make(chan Result, 1)
    21     req, err := http.NewRequest("GET", "http://www.baidu.com", nil)
    22     if err != nil {
    23         fmt.Println("http request failed, err:", err)
    24         return
    25     }
    26     go func() {
    27         resp, err := client.Do(req)
    28         pack := Result{r: resp, err: err}
    29         c <- pack
    30     }()
    31     select {
    32     case <-ctx.Done():
    33         tr.CancelRequest(req)
    34         res := <-c
    35         fmt.Println("Timeout! err:", res.err)
    36     case res := <-c:
    37         defer res.r.Body.Close()
    38         out, _ := ioutil.ReadAll(res.r.Body)
    39         fmt.Printf("Server Response: %s", out)
    40     }
    41     return
    42 }
    43 func main() {
    44     process()
    45 }
    ctx_timeout

    (2) 使用context保存上下文
        利用context来保存上下文值:

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6 )
     7 
     8 func process(ctx context.Context) {
     9     ret,ok := ctx.Value("trace_id").(int)
    10     if !ok {
    11         ret = 21342423
    12     }
    13 
    14     fmt.Printf("ret:%d
    ", ret)
    15 
    16     s , _ := ctx.Value("session").(string)
    17     fmt.Printf("session:%s
    ", s)
    18 }
    19 
    20 func main() {
    21     ctx := context.WithValue(context.Background(), "trace_id", 13483434)
    22     ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
    23     process(ctx)
    24 }
    ctx_value

        同时还有context ctx_cancel 和 ctx_deadline

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "time"
     7 )
     8 
     9 func gen(ctx context.Context) <-chan int {
    10     dst := make(chan int)
    11     n := 1
    12     go func() {
    13         for {
    14             select {
    15             case <-ctx.Done():  //当43行的test函数执行结束之后,执行defer cancel(),则会触发该行
    16                 fmt.Println("i exited")
    17                 return // returning not to leak the goroutine
    18             case dst <- n:
    19                 n++
    20             }
    21         }
    22     }()
    23     return dst
    24 }
    25 
    26 func test() {
    27     // gen generates integers in a separate goroutine and
    28     // sends them to the returned channel.
    29     // The callers of gen need to cancel the context once
    30     // they are done consuming generated integers not to leak
    31     // the internal goroutine started by gen.
    32     ctx, cancel := context.WithCancel(context.Background())
    33     defer cancel() // cancel when we are finished consuming integers
    34     intChan := gen(ctx)
    35     for n := range intChan {
    36         fmt.Println(n)
    37         if n == 5 {
    38             break
    39         }
    40     }
    41 }
    42 func main() {
    43     test()
    44     time.Sleep(time.Hour)
    45 }
    ctx_cancel
     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "time"
     7 )
     8 
     9 func main() {
    10     d := time.Now().Add(50 * time.Millisecond)
    11     ctx, cancel := context.WithDeadline(context.Background(), d)
    12 
    13     // Even though ctx will be expired, it is good practice to call its
    14     // cancelation function in any case. Failure to do so may keep the
    15     // context and its parent alive longer than necessary.
    16     defer cancel()
    17 
    18     select {
    19     case <-time.After(1 * time.Second):
    20         fmt.Println("overslept")
    21     case <-ctx.Done():
    22         fmt.Println(ctx.Err())  //context deadline exceeded
    23     }
    24 
    25 }
    ctx_deadline

    5. etcd介绍与使用
        etcd使用示例 (由于虚拟机出现问题,后面的程序全在Windows上面操作):

    (1)客户端连接 etcd server端

     1 package main
     2 
     3 import (
     4     "fmt"
     5     //etcd_client "github.com/coreos/etcd/clientv3"
     6     etcd_client "go.etcd.io/etcd/clientv3"
     7     "time"
     8 )
     9 
    10 func main() {
    11 
    12     cli, err := etcd_client.New(etcd_client.Config{
    13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
    14         DialTimeout: 5 * time.Second,
    15     })
    16     if err != nil {
    17         fmt.Println("connect failed, err:", err)
    18         return
    19     }
    20 
    21     fmt.Println("connect succ")
    22     defer cli.Close()
    23 }
    etcd_conn

    (2)put 和 get

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "go.etcd.io/etcd/clientv3"
     7     "time"
     8 )
     9 
    10 func main() {
    11 
    12     cli, err := clientv3.New(clientv3.Config{
    13         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
    14         DialTimeout: 5 * time.Second,
    15     })
    16     if err != nil {
    17         fmt.Println("connect failed, err:", err)
    18         return
    19     }
    20 
    21     fmt.Println("connect succ")
    22     defer cli.Close()
    23     //设置1秒超时,访问etcd有超时控制
    24     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    25     //操作etcd
    26     _, err = cli.Put(ctx, "/logagent/conf/", "192.168.0.1")
    27     //操作完毕,取消etcd
    28     cancel()
    29     if err != nil {
    30         fmt.Println("put failed, err:", err)
    31         return
    32     }
    33     //取值,设置超时为1秒
    34     ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
    35     resp, err := cli.Get(ctx, "/logagent/conf/")
    36     cancel()
    37     if err != nil {
    38         fmt.Println("get failed, err:", err)
    39         return
    40     }
    41     for _, ev := range resp.Kvs {
    42         fmt.Printf("%s : %s
    ", ev.Key, ev.Value)
    43     }
    44 }
    put_get

    (3)watch(观测key值发生变化)

     1 package main
     2 
     3 import (
     4     "context"
     5     "fmt"
     6     "time"
     7 
     8     etcd_client "go.etcd.io/etcd/clientv3"
     9 )
    10 
    11 func main() {
    12 
    13     cli, err := etcd_client.New(etcd_client.Config{
    14         Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
    15         DialTimeout: 5 * time.Second,
    16     })
    17     if err != nil {
    18         fmt.Println("connect failed, err:", err)
    19         return
    20     }
    21     defer cli.Close()
    22 
    23     fmt.Println("connect succ")
    24     
    25     ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    26     _, err = cli.Put(ctx, "/logagent/conf/", "99999")
    27     if err != nil {
    28         fmt.Println("put failed, err:", err)
    29         return
    30     }
    31     cancel()
    32     
    33     fmt.Println("put succ")
    34 
    35     for {
    36         rch := cli.Watch(context.Background(), "/logagent/conf/")
    37         for wresp := range rch {
    38             for _, ev := range wresp.Events {
    39                 fmt.Printf("%s %q : %q
    ", ev.Type, ev.Kv.Key, ev.Kv.Value)
    40             }
    41         }
    42     }
    43 }
    watch

        运行上面的watch程序监控key(/logagent/conf/)操作的变化,然后再运行(2)的程序,结果如下:

        kafka消费示例代码:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "strings"
     6     "sync"
     7 
     8     "github.com/Shopify/sarama"
     9 )
    10 
    11 var (
    12     wg sync.WaitGroup
    13 )
    14 
    15 func main() {
    16 
    17     consumer, err := sarama.NewConsumer(strings.Split("192.168:30.136:9092", ","), nil)
    18     if err != nil {
    19         fmt.Println("Failed to start consumer: %s", err)
    20         return
    21     }
    22     partitionList, err := consumer.Partitions("nginx_log")
    23     if err != nil {
    24         fmt.Println("Failed to get the list of partitions: ", err)
    25         return
    26     }
    27 
    28     fmt.Println(partitionList)
    29 
    30     for partition := range partitionList {
    31         pc, err := consumer.ConsumePartition("nginx_log", int32(partition), sarama.OffsetNewest)
    32         if err != nil {
    33             fmt.Printf("Failed to start consumer for partition %d: %s
    ", partition, err)
    34             return
    35         }
    36         defer pc.AsyncClose()
    37         
    38         go func(pc sarama.PartitionConsumer) {
    39             wg.Add(1)
    40             for msg := range pc.Messages() {
    41                 fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
    42                 fmt.Println()
    43             }
    44             wg.Done()
    45         }(pc)
    46     }
    47     //time.Sleep(time.Hour)
    48     wg.Wait()
    49     consumer.Close()
    50 }
    kafka消费示例代码

    6. sync.WaitGroup介绍
    1)等待一组groutine结束
    2)使用Add方法设置等待的数量加1
    3)使用Delete方法设置等待的数量减1
    4)当等待的数量等于0,Wait函数返回

    sync.WaitGroup实例:

     1 package main
     2 
     3 import (
     4     "fmt"
     5     "sync"
     6     "time"
     7 )
     8 
     9 func main() {
    10     wg := sync.WaitGroup{}
    11     for i := 0; i < 10; i++ {
    12         wg.Add(1)
    13         go calc(&wg, i)
    14     }
    15 
    16     wg.Wait() //阻塞,等待所有groutine结束
    17     fmt.Println("all goroutine finish")
    18 }
    19 
    20 func calc(w *sync.WaitGroup, i int) {
    21     //注意: wg.Add(1) 放到这会有问题,也就是main函数结束比wg.Add(1)要快
    22     fmt.Println("calc:", i)
    23     time.Sleep(time.Second)
    24     w.Done()
    25 }
    waitGroup示例

    7. ElastcSearch安装及go操作es

    (1)安装 es
       1)下载ES,下载地址:https://www.elastic.co/products/elasticsearch,我下载的是 elasticsearch-6.7.1.zip。
       2)修改在解压后根目录下的 /config/elasticsearch.yml 配置:

        放开注释并将 youIP换成你自己机器的 ip

    cluster.name: my-application
    node.name: node-1
    network.host: youIP
    http.port: 9200

       3)修改 /config/jvm.options 文件,当然如果机器性能好也可以不用修改:

    -Xms512m
    -Xmx512m

       4)进入根目录,启动es,.inelasticsearch.bat

    (2)go 操作 es 示例

        安装第三方插件:

    go get gopkg.in/olivere/elastic.v2

        示例:注意将程序里面的 url = "http://yourIP:9200/",yourIP替换为你安装es机器的 ip:

     1 package main
     2 
     3 import (
     4     "fmt"
     5 
     6     elastic "gopkg.in/olivere/elastic.v2"
     7 )
     8 
     9 type Tweet struct {
    10     User    string
    11     Message string
    12 }
    13 
    14 var (
    15     url = "http://yourIP:9200/"
    16 )
    17 
    18 func main() {
    19     client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(url))
    20     if err != nil {
    21         fmt.Println("connect es error", err)
    22         return
    23     }
    24 
    25     fmt.Println("conn es succ")
    26 
    27     tweet := Tweet{User: "olivere", Message: "Take Five"}
    28     _, err = client.Index().
    29         Index("twitter").
    30         Type("tweet").
    31         Id("1").
    32         BodyJson(tweet).
    33         Do()
    34     if err != nil {
    35         // Handle error
    36         panic(err)
    37         return
    38     }
    39 
    40     fmt.Println("insert succ")
    41 }
    es示例

        链式存储:

     1 package main
     2 
     3 import "fmt"
     4 
     5 type Stu struct {
     6     Name string
     7     Age  int
     8 }
     9 
    10 func (p *Stu) SetName(name string) *Stu {
    11     p.Name = name
    12     return p
    13 }
    14 
    15 func (p *Stu) SetAge(age int) *Stu {
    16     p.Age = age
    17     return p
    18 }
    19 
    20 func (p *Stu) Print() {
    21     fmt.Printf("age:%d name:%s
    ", p.Age, p.Name)
    22 }
    23 
    24 func main() {
    25     stu := &Stu{}
    26     stu.SetAge(12).SetName("stu01").Print()
    27     //stu.SetName("stu01")
    28     //stu.Print()
    29 }
    链式存储示例
  • 相关阅读:
    C#中的const和readonly之间的不同(转)
    文字在状态栏上从右往左显示,而且是循环的
    文字在状态栏上从左往右一个一个地显示
    猛然发现,已经第100篇随笔了
    怎样使按钮响应回车键
    编程之我见(二 类库)初露牛角
    编程之我见(一 语言)小试牛刀
    开始→运行→输入的命令集锦(转)收藏
    显示走动的数字时间和显示星期,年,月,日
    在两个页面之间互相写其控件内的值
  • 原文地址:https://www.cnblogs.com/xuejiale/p/10660857.html
Copyright © 2011-2022 走看看