zoukankan      html  css  js  c++  java
  • etcd v3 服务注册与发现 Go代码

    本文整理一下思路,编写示例(golang),以便加深etcd的理解

    大致如下,监听程序为master,服务为service

    1 service 启动时向etcd注册自己的信息,即注册到services/  这个目录 

    2 service 可能异常推出,需要维护一个TTL(V3 使用 lease实现),类似于心跳,挂掉了,master可以监听到

    3 master监听 services/ 目录下的所有服务,根据不同action(V3有put/delete),进行处理

    service注册 

    提供 key(service name), value(serviceInfo)进行registered

    start 启动后,执行keeplive(), 维护心跳,挂掉时revoke()

    同时监听 stop chan, 相当于unregistered

    package discovery
    
    import (
        "github.com/coreos/etcd/clientv3"
        "context"
        "time"
        "log"
        "encoding/json"
        "errors"
    )
    
    //the detail of service 
    type ServiceInfo struct{
        IP   string
    }
    
    type Service struct {
        Name        string
        Info        ServiceInfo
        stop        chan error
        leaseid     clientv3.LeaseID
        client      *clientv3.Client
    }
    
    func NewService(name string, info ServiceInfo,endpoints []string) (*Service, error) {
        cli, err := clientv3.New(clientv3.Config{
            Endpoints:    endpoints,
            DialTimeout: 2*time.Second,
        })
    
        if err != nil {
            log.Fatal(err)
            return nil, err
        }
    
        return &Service {
            Name:        name,
            Info:        info,
            stop:        make (chan error),
            client:     cli,
        },err
    }
    
    func (s *Service)  Start() error {
        
        ch, err := s.keepAlive()
        if err != nil {
            log.Fatal(err)
            return err
        }
    
        for {
            select {
            case err := <- s.stop:
                s.revoke()
                return err
            case <- s.client.Ctx().Done():
                return errors.New("server closed")
            case ka, ok := <-ch:
                if !ok {
                    log.Println("keep alive channel closed")
                    s.revoke()
                    return nil
                } else {
                    log.Printf("Recv reply from service: %s, ttl:%d", s.Name, ka.TTL)
                }
            }
        }
    }
    
    func (s *Service) Stop()  {
        s.stop <- nil 
    }
    
    func (s *Service) keepAlive() (<-chan *clientv3.LeaseKeepAliveResponse, error){
    
        info := &s.Info
    
        key := "services/" + s.Name
        value, _ := json.Marshal(info)
    
        // minimum lease TTL is 5-second
        resp, err := s.client.Grant(context.TODO(), 5)
        if err != nil {
            log.Fatal(err)
            return nil, err
        }
    
        _, err = s.client.Put(context.TODO(), key, string(value), clientv3.WithLease(resp.ID))
        if err != nil {
            log.Fatal(err)
            return nil, err
        }
        s.leaseid = resp.ID
    
        return  s.client.KeepAlive(context.TODO(), resp.ID)
    }
    
    func (s *Service) revoke() error {
    
        _, err := s.client.Revoke(context.TODO(), s.leaseid)
        if err != nil {
            log.Fatal(err)
        }
        log.Printf("servide:%s stop
    ", s.Name)
        return err
    }

    监听程序Master

    提供监听路径path,启动master,当put时加入 map,  delete时 从map去掉

    package discovery
    
    import (
        "github.com/coreos/etcd/clientv3"
        "context"
        "log"
        "time"
        "encoding/json"
        "fmt"
    )
    
    type Master struct {
        Path         string
        Nodes         map[string] *Node
        Client         *clientv3.Client
    }
    
    //node is a client 
    type Node struct {
        State    bool
        Key        string
        Info    ServiceInfo
    }
    
    
    func NewMaster(endpoints []string, watchPath string) (*Master,error) {
        cli, err := clientv3.New(clientv3.Config{
            Endpoints:    endpoints,
            DialTimeout: time.Second,
        })
    
        if err != nil {
            log.Fatal(err)
            return nil,err
        }
    
        master := &Master {
            Path:    watchPath,
            Nodes:    make(map[string]*Node),
            Client: cli,
        }
    
        go master.WatchNodes()
        return master,err
    }
    
    func (m *Master) AddNode(key string,info *ServiceInfo) {
        node := &Node{
            State:    true,
            Key:    key,
            Info:    *info,
        }
    
        m.Nodes[node.Key] = node
    }
    
    
    func GetServiceInfo(ev *clientv3.Event) *ServiceInfo {
        info := &ServiceInfo{}
        err := json.Unmarshal([]byte(ev.Kv.Value), info)
        if err != nil {
            log.Println(err)
        }
        return info
    }
    
    func (m *Master) WatchNodes()  {
        rch := m.Client.Watch(context.Background(), m.Path, clientv3.WithPrefix())
        for wresp := range rch {
            for _, ev := range wresp.Events {
                switch ev.Type {
                    case clientv3.EventTypePut:
                        fmt.Printf("[%s] %q : %q
    ", ev.Type, ev.Kv.Key, ev.Kv.Value)
                        info := GetServiceInfo(ev)    
                        m.AddNode(string(ev.Kv.Key),info)
                    case clientv3.EventTypeDelete:
                        fmt.Printf("[%s] %q : %q
    ", ev.Type, ev.Kv.Key, ev.Kv.Value)
                        delete(m.Nodes, string(ev.Kv.Key))
                }
            }
        }
    }

    service使用示例,20s后断开

    package main
    
    import (
    	"fmt"
    	dis "discovery"
    	"log"
    	"time"
    )
    
    func main() {
    
    	serviceName := "s-test"
    	serviceInfo := dis.ServiceInfo{IP:"192.168.1.26"}
    
    	s, err := dis.NewService(serviceName, serviceInfo,[]string {
    		"http://192.168.1.17:2379",
     		"http://192.168.1.17:2479",
     		"http://192.168.1.17:2579",
    	})
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	fmt.Printf("name:%s, ip:%s
    ", s.Name, s.Info.IP)
    
    
    	go func() {
    		time.Sleep(time.Second*20)
    		s.Stop()
    	}()
    	
    	s.Start()
    }
    

     master使用示例

    package main
    
    import (
    	"log"
    	"time"
    	"fmt"
    	dis "discovery"
    )
    
    func main() {
    
    	m, err := dis.NewMaster([]string{
    		"http://192.168.1.17:2379",
    		"http://192.168.1.17:2479",
    		"http://192.168.1.17:2579",
    	}, "services/")
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	for {
    		for k, v := range  m.Nodes {
    			fmt.Printf("node:%s, ip=%s
    ", k, v.Info.IP)
    		}
    		fmt.Printf("nodes num = %d
    ",len(m.Nodes))
    		time.Sleep(time.Second * 5)
    	}
    }
    

    执行结果(需要提前搭建 etcd服务器,可以到github下载,文末提供简单启动脚本)

    go run master 

    go run service

    etcd服务器启动脚本(执行环境 CentOS6.5)

    #!/bin/sh
    
    work_path=$(dirname $0)
    cd ./${work_path}
    
    #echo $(pwd)
    #echo `date`
    
    case $1 in
    
    1)
    echo -e "[1]start first server
    "
    ./etcd --name cd0 --initial-advertise-peer-urls http://127.0.0.1:2380 
      --listen-peer-urls http://127.0.0.1:2380 
      --listen-client-urls http://192.168.1.17:2379,http://127.0.0.1:2379 
      --advertise-client-urls http://192.168.1.17:2379,http://127.0.0.1:2379 
      --initial-cluster-token etcd-cluster-1 
      --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 
      --initial-cluster-state new
      ;;
    2)
    echo -e "[2]start second  server
    "
    ./etcd --name cd1 --initial-advertise-peer-urls http://127.0.0.1:2480 
      --listen-peer-urls http://127.0.0.1:2480 
      --listen-client-urls http://192.168.1.17:2479,http://127.0.0.1:2479 
      --advertise-client-urls http://192.168.1.17:2479,http://127.0.0.1:2479 
      --initial-cluster-token etcd-cluster-1 
      --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 
      --initial-cluster-state new
      ;;
    3)
    echo -e "[3]start third server
    "
    ./etcd --name cd2 --initial-advertise-peer-urls http://127.0.0.1:2580 
      --listen-peer-urls http://127.0.0.1:2580 
      --listen-client-urls http://192.168.1.17:2579,http://127.0.0.1:2579 
      --advertise-client-urls http://192.168.1.17:2579,http://127.0.0.1:2579 
      --initial-cluster-token etcd-cluster-1 
      --initial-cluster cd0=http://127.0.0.1:2380,cd1=http://127.0.0.1:2480,cd2=http://127.0.0.1:2580 
      --initial-cluster-state new
      ;;
    *)
    echo "error paramater"
    ;;
    esac
    View Code

    完整代码,请移步 git@github.com:moonlong/etcd-discovery.git 

    欢迎斧正

    完!

  • 相关阅读:
    mysql之优化器、执行计划、简单优化
    一条查询sql的执行流程和底层原理
    mysql建立索引,实际工作中建立索引的示例
    explain命令---查看mysql执行计划
    mysql 一些知识点
    开发中一些快捷键的使用
    simple-rpc
    maven
    数组合并排序
    SpringMVC配制全局的日期格式
  • 原文地址:https://www.cnblogs.com/sevenPP/p/8149890.html
Copyright © 2011-2022 走看看