zoukankan      html  css  js  c++  java
  • golang 版本 zookeeper 分布式锁试验随笔

    package main
    
    import (
        "fmt"
        "go-zookeeper/zk"
        "time"
    )
    
    var (
        flags int32 = zk.FlagEphemeral                    // 临时节点
        acls []zk.ACL = zk.WorldACL(zk.PermAll)            // 权限 all
    
        parentPath string = "/lock"                        // parent path
        ZkHosts string = "129.204.83.47:2181"            // zk address
    )
    
    // 创建zookeeper链接,并创建永久父级节点
    func NewZkConn(address,parentPath string) *zk.Conn{
        hosts := []string{address}
        conn,_,err := zk.Connect(hosts,time.Second*5)
        if err != nil {
            panic(err)
        }
    
        ok,_,_ := conn.Exists(parentPath)
        if !ok {
            // 创建永久节点
            nodeName,err := conn.Create(parentPath,nil,zk.FlagSequence,acls)
            if err != nil {
                panic(err)
            }
            fmt.Println("create node name :",nodeName)
        }
        return conn
    }
    
    
    func main() {
    
        conn := NewZkConn(ZkHosts,parentPath)
    
        // 假设临时节点
        path := parentPath + "/001_up_user"
    
        for i := 0;i<2;i++{
            go func(conn *zk.Conn,path string,id int) {
                ok := t(conn,path,id)
                if ok {
                    fmt.Printf("goroutine [%d] create node  [%s]  and wait 5s 
    ",id,path)
                    time.Sleep(time.Second*5)
                    err := conn.Delete(path,0)
                    if err != nil {
                        fmt.Println(err)
                    }
                    fmt.Printf("删除成功 id为[%d] 
    ",id)
                }else {
                    fmt.Printf("创建失败id为[%d] 
    ",id)
                }
            }(conn,path,i)        // (conn,fmt.Sprintf(path,i),i)
        }
    
        //_,s,event,err := conn.ExistsW(path)
        //for {
        //    select {
        //    case ch_event := <-event:
        //        {
        //            fmt.Println("path:", ch_event.Path)
        //            fmt.Println("type:", ch_event.Type.String())
        //            fmt.Println("state:", ch_event.State.String())
        //
        //            if ch_event.Type == zk.EventNodeCreated {
        //                fmt.Printf("has node[%s] detete
    ", ch_event.Path)
        //            } else if ch_event.Type == zk.EventNodeDeleted {
        //                fmt.Printf("has new node[%d] create
    ", ch_event.Path)
        //            } else if ch_event.Type == zk.EventNodeDataChanged {
        //                fmt.Printf("has node[%d] data changed", ch_event.Path)
        //            }
        //        }
        //    }
        //}
    
        time.Sleep(time.Second*60)
    
    }
    
    func t(conn *zk.Conn,path string,id int) bool{
        fmt.Printf("第 [%d] 个goroutine 
    ",id)
    
        // 设置条件,避免多个goroutine并发请求返回结果都为 节点未创建状态 抢占建立节点
        if id == 1{
            time.Sleep(time.Second*1)
            //list, _, zz, _ :=conn.ChildrenW("/lock")
            //for _,v := range list{
            //    fmt.Println(v)
            //}
            //z := <-zz
            //fmt.Println(z)
        }
    
    
    
    
        ok,_,ch,err := conn.ExistsW(path)
        if err != nil {
            return false
        }
    
        ex := false
        if ok {
            fmt.Printf("goroutine [%d] 节点 [%s] 已存在
    ",id,path)
            for {
                fmt.Printf("goroutine [%d]  监控节点状态ing...",id)
                select {
                case c := <-ch:
                    {
                        //fmt.Println("path:", c.Path)
                        //fmt.Println("type:", c.Type.String())
                        //fmt.Println("state:", c.State.String())
                        if c.Type == zk.EventNodeDeleted {
                            fmt.Printf("other node delete,current node[%s] id [%d]
    ", c.Path,id)
                            ex = true
                            break
                        }
                    }
                }
                if ex {
                    break
                }
            }
        }
    
        fmt.Printf("当前id为[%d] 节点 [%s] 不存在并创建 
    ",id,path)
        _,err = conn.Create(path,nil,flags,acls)
        if err != nil {
            fmt.Printf("创建失败 [%s] 当前节点为[%d] 原因可能为 节点被抢占 or zk宕机",path,id)
            return false
        }
        fmt.Printf("[%s] 创造节点的id为 [%d] 
    ",path,id)
        return true
    }
    
    
    
    
    
    /*
        貌似这个库没有实现监控,只能已以下方式实现
    
        exists(path)  // path 为用户id_操作行为_操作方法名
        当给节点存在则进行轮询n次是否删除
        如果删除则进行创建,注意是否其他分布式实例是否抢占先创建了节点
            -如果创建失败 轮询 3 次是否删除,删除则创建,如果失败则直接退出,返回lock失败
            -创建成功返回lock成功
     */
    // 创建一个永久父级路径
    // 之后的临时路径都在该永久路劲之下
    // 这次实验为 两个实例创建一个相同节点
    
    // 未完成 当多个实例中查询结果都为该节点未创建,进入下一个阶段,当其中一个实例抢占成功,其他则创建失败,不应该直接返回创建失败,应该重试多次,多次中返回则成功,否则失败
  • 相关阅读:
    Shell printf 命令
    Shell echo命令
    Shell 基本运算符
    Shell 数组
    Shell 传递参数
    Shell 变量
    Spark基础知识汇总
    DataScientist————汇总篇
    Java的Unsafe类
    java 中文字符和unicode编码值相互转化
  • 原文地址:https://www.cnblogs.com/zengxm/p/12829265.html
Copyright © 2011-2022 走看看