zoukankan      html  css  js  c++  java
  • go-zookeeper

      connect是客户端向zk服务器发起链接函数;client向zk zerver发起链接时分为两步:1.tcp三次握手;2.握手成功后与zk server进行atuh认证维持的状态

    1. StateUnknown State = -1//未使用
    2. StateDisconnected State = 0//client与zk server链接断开;1.在client与zkserver tcp三次握手成功后与zkserver进行auth认证时session超期置为此状态;2.client主动断开链接
    3. StateConnecting State = 1//准备发起tcp链接但还未成功
    4. StateAuthFailed State = 4//
    5. StateConnectedReadOnly State = 5
    6. StateSaslAuthenticated State = 6
    7. StateExpired State = -112//tcp链接成功后与zk server进行auth过程中session超时
    8. StateConnected = State(100)//已经完成tcp链接
    9. StateHasSession = State(101)//已经完成atuh认证,在session只有为此状态此时go- zookeeper创建的链接才可用

      以下是go-zeekeeper部分源码详解

    connect

    func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
    
        //...
        // Randomize the order of the servers to avoid creating hotspots
        stringShuffle(srvs)
    
        ec := make(chan Event, eventChanSize)
        conn := &Conn{
            dialer:         net.DialTimeout,
            hostProvider:   &DNSHostProvider{},
            conn:           nil,//连接成功后返回可用连接的句柄
            state:          StateDisconnected,
            eventChan:      ec,//zkserver节点有变化时向client发送消息,zo-zookeeper是通过event类型来处理watch的,event发送到此channel中
            shouldQuit:     make(chan struct{}),//clienet主动退出时close此channel,阻塞的select监听到此channel可读后返回
            connectTimeout: 1 * time.Second,
            sendChan:       make(chan *request, sendChanSize),//数据发送到此channel中
            requests:       make(map[int32]*request),//数据从此channel接受
            watchers:       make(map[watchPathType][]chan Event),//client wath server中的节点(包括节点path和watch类型,节点是否存在、节点数据、节点的子节点)
            passwd:         emptyPassword,
            logger:         DefaultLogger,
            logInfo:        true, // default is true for backwards compatability
            buf:            make([]byte, bufferSize),
        }
    
        // Set provided options.
        for _, option := range options {
            option(conn)
        }
    
        if err := conn.hostProvider.Init(srvs); err != nil {
            return nil, nil, err
        }
    
        conn.setTimeouts(int32(sessionTimeout / time.Millisecond))
    
        go func() {
            conn.loop()//处理连接请求
            conn.flushRequests(ErrClosing)
            conn.invalidateWatches(ErrClosing)
            close(conn.eventChan)
        }()
        return conn, ec, nil//ec为添加watch后server变化返回的变化的事件类型
    }

    loop

      connect中调用loop,在loop中做的重要的两件事:

    1. 调用connect进行tcp三次握手,成功后并置为connected状态
    2. 调用auth进行链接后的状态维护,完成后置为hasSession状态
    3. auth成功后,起两个协程,一个进行数据发送(包含心跳维护,检测服务是否正常,如服务不正常客户端关闭);一个进行数据接受(或者服务端watch节点变化发送的watch消息进行处理,处理后再次添加到con的watchers列表中,并且调用全局的watch函数,此函数在创建zk链接时可作为可选参数传入)
    func (c *Conn) loop() {
        for {
            if err := c.connect(); err != nil {
                // c.Close() was called
                return
            }
    
            err := c.authenticate()
            switch {
            case err == ErrSessionExpired:
                c.logger.Printf("authentication failed: %s", err)
                c.invalidateWatches(err)
            case err != nil && c.conn != nil:
                c.logger.Printf("authentication failed: %s", err)
                c.conn.Close()
            case err == nil:
                if c.logInfo {
                    c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
                }
                c.hostProvider.Connected()        // mark success
                c.closeChan = make(chan struct{}) // channel to tell send loop stop
                reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted
    
                var wg sync.WaitGroup
                wg.Add(1)
                go func() {
                    <-reauthChan
                    if c.debugCloseRecvLoop {
                        close(c.debugReauthDone)
                    }
                    err := c.sendLoop()
                    if err != nil || c.logInfo {
                        c.logger.Printf("send loop terminated: err=%v", err)
                    }
                    c.conn.Close() // causes recv loop to EOF/exit
                    wg.Done()
                }()
    
                wg.Add(1)
                go func() {
                    var err error
                    if c.debugCloseRecvLoop {
                        err = errors.New("DEBUG: close recv loop")
                    } else {
                        err = c.recvLoop(c.conn)
                    }
                    if err != io.EOF || c.logInfo {
                        c.logger.Printf("recv loop terminated: err=%v", err)
                    }
                    if err == nil {
                        panic("zk: recvLoop should never return nil error")
                    }
                    close(c.closeChan) // tell send loop to exit
                    wg.Done()
                }()
    
                c.resendZkAuth(reauthChan)
    
                c.sendSetWatches()
                wg.Wait()
            }
    
            c.setState(StateDisconnected)
    
            select {
            case <-c.shouldQuit:
                c.flushRequests(ErrClosing)
                return
            default:
            }
    
            if err != ErrSessionExpired {
                err = ErrConnectionClosed
            }
            c.flushRequests(err)
    
            if c.reconnectLatch != nil {
                select {
                case <-c.shouldQuit:
                    return
                case <-c.reconnectLatch:
                }
            }
        }
    }
    c.sendEvent(ev)
                wTypes := make([]watchType, 0, 2)
                switch res.Type {
                case EventNodeCreated:
                    wTypes = append(wTypes, watchTypeExist)
                case EventNodeDeleted, EventNodeDataChanged:
                    wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
                case EventNodeChildrenChanged:
                    wTypes = append(wTypes, watchTypeChild)
                }
                c.watchersLock.Lock()
                for _, t := range wTypes {
                    wpt := watchPathType{res.Path, t}
                    if watchers, ok := c.watchers[wpt]; ok {
                        for _, ch := range watchers {
                            ch <- ev
                            close(ch)
                        }
                        delete(c.watchers, wpt)
                    }
                }
                c.watchersLock.Unlock()

    watch

      go- zookeeper的watch是通过event来实现的,有两种处理方式:1.局部处理,处理监听connect返回的ec来处理watch;2.添加全局的withWatchEvent,每次server发消watch消息时client自动调用此函数。

      go-zookeeper watch只watch一次,当watch消息返回后想再次watch需再一次调用watch函数

    局部处理,对go-zookeeper进行封装,处理每次返回的event

    type Client struct {
        //Znodepath,zk.Conn
        quit <- chan struct{}
        event <- chan zk.Event
    }
    
    func (c *Client)hander(event zk.Event) {
        switch event.Type {
        case zk.EventSession:
            switch event.State {
            case zk.StateExpired:
                //
            }
        case zk.EventNodeCreated, zk.EventNodeDataChanged ...:
            rewatch(c)
        }
    }
    func (c *Client)watchEventLoop() {
        ticker := time.NewTicker(time.Second * 3)
        defer ticker.Stop()
        for {
            select {
            case <- c.quit:
                //正常退出
                return
            case e, _ := <- c.event:
                c.hander(e)
            case <- ticker.C:
                rewatch(c)
            }
        }
    }

    全局watch

    // 创建监听的option,用于初始化zk
        eventCallbackOption := zk.WithEventCallback(callback)
        // 连接zk
        conn, _, err := zk.Connect(hosts, time.Second*5, eventCallbackOption)

    例子

    package main
    
    import (
        "encoding/json"
        "fmt"
        "github.com/samuel/go-zookeeper/zk"
        "time"
    )
    
    type address struct {
        Ip string `json:"omitempty"`
        Port uint32
    }
    func main() {
    
        serverAddr := []string{"127.0.0.1:2181","127.0.0.1:2182","127.0.0.1:2183"}
        con, _, err := zk.Connect(serverAddr, time.Hour)
        defer con.Close()
        if err != nil {
            fmt.Println(err)
            return
        }
    
        //client处理watch监听事件,go-zookeeer监听是通过channel来实现的,根据不同的event进行处理
        _, _, evCh, errs := con.ExistsW("/data")
        if errs != nil {
            fmt.Println("exist error : ", errs)
            return
        }
        //go wathZkEvenv(evCh)
        go func() {
            ev := <-evCh
            fmt.Println("path = ", ev.Path)
            fmt.Println("type = ", ev.Type)
            fmt.Println("state = ", ev.State.String())
        }()
    
        _ ,createErr := con.Create("/data", []byte("1.1.1.1"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if createErr != nil {
            fmt.Println("create failed: ", err)
            return
        }
        childList, _, _, _ := con.ChildrenW("/")
        fmt.Println("childlist = ", childList)
    
        addr := address{
            Ip: "3.3.3.3",
            Port: uint32(99),
        }
        marshalAddr, marshalErr := json.Marshal(&addr)
        if marshalErr != nil {
            fmt.Println(marshalErr)
            return
        }
        _, createErrs := con.Create("/jsonPath", marshalAddr, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if createErrs != nil {
            fmt.Println(createErrs)
            return
        }
        resData, _, getErr := con.Get("/jsonPath")
        if getErr != nil {
            fmt.Println(getErr)
            return
        }
        unmarshalData := address{}
        err = json.Unmarshal(resData, &unmarshalData)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(unmarshalData)
        return
    }

      其它api如:children(获取该节点下的所有子节点)与childrenw区别是后者为该节点添加watch功能;get获取此节点信息;exist判断此节点是否存在。创建一个节点时首先要创建其父节点,只有父节点存在才能创建子节点。

      部署zookeeper时在docker上部署,用docker-compose来管理,docker安装zookeeper文件:

    version: '3.1'
    
    services:
      zoo1:
        image: zookeeper
        restart: always
        privileged: true
        hostname: zoo1
        ports:
          - 2181:2181
        volumes: # 挂载数据
          - /Users/user/zookeeper-cluster/node1/data:/data
          - /Users/user/zookeeper-cluster/node1/datalog:/datalog
        environment:
          ZOO_MY_ID: 1
          ZOO_SERVERS: server.1=172.20.0.5:2888:3888;2181 server.2=172.20.0.6:2888:3888;2181 server.3=172.20.0.7:2888:3888;2181
        networks:
          default:
            ipv4_address: 172.20.0.5
    
      zoo2:
        image: zookeeper
        restart: always
        privileged: true
        hostname: zoo2
        ports:
          - 2182:2181
        volumes: # 挂载数据
          - /Users/user/zookeeper-cluster/node2/data:/data
          - /Users/user/zookeeper-cluster/node2/datalog:/datalog
        environment:
          ZOO_MY_ID: 2
          ZOO_SERVERS: server.1=172.20.0.5:2888:3888;2181 server.2=172.20.0.6:2888:3888;2181 server.3=172.20.0.7:2888:3888;2181
        networks:
          default:
            ipv4_address: 172.20.0.6
    
      zoo3:
        image: zookeeper
        restart: always
        privileged: true
        hostname: zoo3
        ports:
          - 2183:2181
        volumes: # 挂载数据
          - /Users/user/zookeeper-cluster/node3/data:/data
          - /Users/user/zookeeper-cluster/node3/datalog:/datalog
        environment:
          ZOO_MY_ID: 3
          ZOO_SERVERS: server.1=172.20.0.5:2888:3888;2181 server.2=172.20.0.6:2888:3888;2181 server.3=172.20.0.7:2888:3888;2181
        networks:
          default:
            ipv4_address: 172.20.0.7
    
    networks: # 自定义网络
      default:
        name: zoonet
        ipam:
          config:
            - subnet: 172.20.0.0/24
    View Code

      分布式锁:

    package zookeeper
    
    import (
        "encoding/json"
        "errors"
        "fmt"
        "github.com/samuel/go-zookeeper/zk"
        "sort"
        "strings"
        "sync"
        "time"
    )
    
    type address struct {
        Ip string `json:"omitempty"`
        Port uint32
    }
    
    func init() {
        fmt.Println("this is zookeeper init")
    }
    func wathZkEvenv(ev zk.Event) {
        fmt.Println("path = ", ev.Path)
        fmt.Println("type = ", ev.Type)
        fmt.Println("state = ", ev.State.String())
    }
    
    func writeAndReadData(con *zk.Conn) {
        //client处理watch监听事件,go-zookeeer监听是通过channel来实现的,根据不同的event进行处理
        _, _, _, errs := con.ExistsW("/data")
        if errs != nil {
            fmt.Println("exist error : ", errs)
            return
        }
        //go wathZkEvenv(evCh)
    
        _, createErr := con.Create("/data", []byte("1.1.1.1"), zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if createErr != nil {
            fmt.Println("create failed: ", createErr)
            return
        }
    
        addr := address{
            Ip: "3.3.3.3",
            Port: uint32(99),
        }
        marshalAddr, marshalErr := json.Marshal(&addr)
        if marshalErr != nil {
            fmt.Println(marshalErr)
            return
        }
        _, createErrs := con.Create("/jsonPath", marshalAddr, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
        if createErrs != nil {
            fmt.Println(createErrs)
            return
        }
        resData, _, getErr := con.Get("/jsonPath")
        if getErr != nil {
            fmt.Println(getErr)
            return
        }
        unmarshalData := address{}
        err := json.Unmarshal(resData, &unmarshalData)
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(unmarshalData)
        return
    }
    
    var wg sync.WaitGroup
    
    type zkLockNode struct {
        rootPath string
        fullPath map[int]string
        con *zk.Conn
    }
    
    func (z *zkLockNode)checkParam() error {
        if z.con == nil || z.rootPath == "" {
            return errors.New("param is invalid")
        }
        return nil
    }
    
    func (z *zkLockNode)createLockNode(index int) error{
        if err := z.checkParam(); err != nil {
            return err
        }
    
        isExist, _, existErr := z.con.Exists(z.rootPath)
        if existErr != nil {
            return existErr
        }
        //如果父节点不存在创建父节点
        if isExist == false {
            _, createErr := z.con.Create(z.rootPath, nil, zk.FlagSequence, zk.WorldACL(zk.PermAll))
            if createErr != nil {
                return  createErr
            }
        }
    
        var lockNodeErr error
        z.fullPath[index], lockNodeErr = z.con.Create(z.rootPath + "/", nil, zk.FlagEphemeral | zk.FlagSequence, zk.WorldACL(zk.PermAll))
        if lockNodeErr != nil {
            fmt.Println(lockNodeErr)
            return lockNodeErr
        }
        fmt.Println(fmt.Sprintf("thread %d create lock node = %s", index, z.fullPath[index]))
        return nil
    }
    
    func (z *zkLockNode)watchPreNode(path string, index int) error {
        if err := z.checkParam(); err != nil {
            return err
        }
    
        watchPath := z.rootPath + "/" + path
        isExist, _, ch, existErr := z.con.ExistsW(watchPath)
        if existErr != nil {
            return existErr
        }
        if isExist == false {
            return errors.New("watching node not exist")
        }
    
        for {
            select {
            case evCh := <- ch:
                if evCh.Type == zk.EventNodeDeleted {
                    fmt.Println(fmt.Sprintf("thread %d watching node %s deleted, can have lock", index, watchPath))
                    return nil
                }
            }
        }
    }
    
    //尝试加锁,如果此节点前无等待节点直接加锁否则监听上一个节点
    func (z *zkLockNode)truLock(index int) error {
        if err := z.checkParam(); err != nil {
            return err
        }
    
        childList, _ , childErr := z.con.Children(z.rootPath)
        if childErr != nil {
            return childErr
        }
    
        sort.Strings(childList)
        if len(childList) == 0 {
            fmt.Println(fmt.Sprintf("thread %d is first have lock", index))
            return nil
        }
        //debug
        fmt.Println(fmt.Sprintf("thread %d child list %s", index, childList))
        fmt.Println(fmt.Sprintf("thread %d path %s try have lock", index, z.fullPath[index]))
        fmt.Println(fmt.Sprintf("thread %d path %s already haved lock", index, z.rootPath + "/" + childList[0]))
    
        //检验自身是否持有锁
        parts := strings.Split(z.fullPath[index], "/")
        partsLen := len(parts)
        haveLockNum := parts[partsLen - 1]
        if haveLockNum == childList[0] {
            fmt.Println(fmt.Sprintf("thread %d self have lock", index))
            return nil
        }
    
        //监听前一个节点
        for i := 0; i < len(childList); i++ {
            if childList[i] == haveLockNum {
                if err := z.watchPreNode(childList[i - 1], index); err != nil {
                    return err
                }
            }
        }
    
        return nil
    }
    
    func (z *zkLockNode)unLock(index int) error {
        if err := z.checkParam(); err != nil {
            return err
        }
    
        _, stat, getErr := z.con.Get(z.fullPath[index])
        if getErr != nil {
            return getErr
        }
    
        fmt.Println("thread delete node = ", index, z.fullPath[index])
        return z.con.Delete(z.fullPath[index], stat.Version)
    }
    
    func testDistributeLock(lockNode *zkLockNode, index int) error {
        err := lockNode.createLockNode(index)
        if err != nil {
            fmt.Println(err)
        }
        err = lockNode.truLock(index)
        if err != nil {
            fmt.Println(err)
        }
        err = lockNode.unLock(index)
        if err != nil {
            fmt.Println(err)
        }
        wg.Done()
        return nil
    }
    func distributeLock(con *zk.Conn) {
        //创建父节点,在此父节点下创建有序子节点
        //_, _ = con.Create("/lock", []byte(""), zk.FlagSequence, zk.WorldACL(zk.PermAll))
        //list ,_ , err:= con.Children("/lock")
        //if err != nil {
        //    return
        //}
        //fmt.Println(list)
        lockNode := zkLockNode{
            rootPath: "/lock",
            con: con,
            fullPath: make(map[int]string),
        }
    
        wg.Add(1)
        go testDistributeLock(&lockNode, 1)
        wg.Add(1)
        go testDistributeLock(&lockNode, 2)
        wg.Wait()
        return
    }
    
    func TestZkDistribute() {
        serverAddr := []string{"127.0.0.1:2181","127.0.0.1:2182","127.0.0.1:2183"}
        opt := zk.WithEventCallback(wathZkEvenv)
        con, _, err := zk.Connect(serverAddr, time.Second * 10, opt)
        defer con.Close()
        if err != nil {
            fmt.Println(err)
            return
        }
    
        //writeAndReadData(con)
        distributeLock(con)
        return
    }
    View Code

    docker:http://www.dockerinfo.net/%e5%ba%95%e5%b1%82%e5%ae%9e%e7%8e%b0

    protocol buf:https://developers.google.com/protocol-buffers/docs/overview

    中文版protocol buf:https://skyao.gitbooks.io/learning-proto3/content/guide/language/options.html

  • 相关阅读:
    自己修改的两个js文件
    .net4缓存笔记
    使用.net的Cache框架快速实现Cache操作
    关于招聘面试(转)
    PHP中获取当前页面的完整URL
    Linux在本地使用yum安装软件(转)
    Phalcon的学习篇-phalcon和devtools的安装和设置
    GY的实验室
    aip接口中对url参数md5加密防篡改的原理
    nginx 多站点配置方法集合(转)
  • 原文地址:https://www.cnblogs.com/tianzeng/p/15241698.html
Copyright © 2011-2022 走看看