zoukankan      html  css  js  c++  java
  • zookeeper分布式读写锁

    获取写锁:

    1、创建临时序号单调递增节点

    2、若存在比本节点序号小的读或者写节点,则设置观察点,观察仅小于本节点序号的节点。收到观察点删除事件,则获得写锁

    3、获得写锁后,开始执行业务。业务完成后,删除本节点

    获取写锁:

    1、创建临时序号单调递增节点

    2、若存在比本节点序号小的写节点,则设置观察点,观察仅小于本节点序号的写节点。收到观察点删除事件,则获得读锁

    3、获得读锁后,开始执行业务。业务完成后,删除本节点

    func ZookeeperInit() error {
    
        var hosts = []string{"192.168.228.141:2181"}
        conn, _, err := zk.Connect(hosts, time.Second*5)
        if err != nil {
            logrus.Errorf(err.Error())
            return err
        }
        defer conn.Close()
    
        // 创建Lock节点(永久节点)
        var lockPath = "/Lock"
        var lockData []byte = []byte("Lock")
        var lockFlags int32 = 0
        var acls = zk.WorldACL(zk.PermAll)
        _, err = conn.Create(lockPath, lockData, lockFlags, acls)
        if err != nil {
            if 0 == strings.Compare("zk: node already exists", err.Error()) {
                logrus.Infof("Create Node %s Success", lockPath)
                return nil
            }
            logrus.Errorf("%s", err.Error())
            return err
        }
    
        return nil
    }
    func Lock() (lockName string, err error) {
    
        // 创建临时写节点
        var hosts = []string{"192.168.228.141:2181"}
        conn, _, err := zk.Connect(hosts, time.Second*5)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
        defer conn.Close()
    
        // 获取当前子节点
        children, _, err := conn.Children("/Lock")
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
    
        fmt.Println("children", children)
        maxChild := GetMaxchild(children)
    
        // 创建当前节点
        var wLockPath = "/Lock/w"
        var wLockData []byte = []byte(strconv.FormatInt(time.Now().Unix(), 10))
        var wLockFlags int32 = 2 // 永久序列增长节点
        var acl = zk.WorldACL(zk.PermAll)
    
        lockPath, err := conn.Create(wLockPath, wLockData, wLockFlags, acl)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
    
        if "" != maxChild {
            // 对最大子节点设置观察点
            _, _, ech, err := conn.ExistsW(maxChild)
            if err != nil {
                logrus.Errorf("%s", err.Error())
                return "", err
            }
            timeout := 60 // 超时时间10s
            for timeout > 0 {
                select {
                case _, ok := <-ech:
                    if ok {
                        return lockPath, nil
                    }
                default:
                    time.Sleep(time.Second)
                    timeout--
                }
            }
            return "", nil
        } else {
            return lockPath, nil
        }
    
        return "", nil
    }
    
    func Unlock(lockName string) error {
    
        var hosts = []string{"192.168.228.141:2181"}
        conn, _, err := zk.Connect(hosts, time.Second*5)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return err
        }
        defer conn.Close()
    
        // 删除节点
        err = conn.Delete(lockName, 0)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return err
        }
    
        return nil
    }
    
    func RLock() (lockName string, err error) {
        // 创建临时写节点
        var hosts = []string{"192.168.228.141:2181"}
        conn, _, err := zk.Connect(hosts, time.Second*5)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
        defer conn.Close()
    
        // 获取当前子节点
        children, _, err := conn.Children("/Lock")
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
    
        maxChild := GetMaxWritechild(children)
    
        // 创建子节点
        var wLockPath = "/Lock/r"
        var wLockData []byte = []byte(strconv.FormatInt(time.Now().Unix(), 10))
        var wLockFlags int32 = 2 // 永久序列增长节点
        var acl = zk.WorldACL(zk.PermAll)
    
        lockPath, err := conn.Create(wLockPath, wLockData, wLockFlags, acl)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return "", err
        }
    
        if "" != maxChild {
            // 对最大子节点设置观察点
            _, _, ech, err := conn.ExistsW(maxChild)
            if err != nil {
                logrus.Errorf("%s", err.Error())
                return "", err
            }
            timeout := 60 // 超时时间10s
            for timeout > 0 {
                select {
                case _, ok := <-ech:
                    if ok {
                        return lockPath, nil
                    }
                default:
                    time.Sleep(time.Second)
                    timeout--
                }
            }
            return "", nil
        } else {
            return lockPath, nil
        }
    
        return "", nil
    }
    
    func RUnlock(lockName string) error {
        var hosts = []string{"192.168.228.141:2181"}
        conn, _, err := zk.Connect(hosts, time.Second*5)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return err
        }
        defer conn.Close()
    
        // 删除节点
        err = conn.Delete(lockName, 0)
        if err != nil {
            logrus.Errorf("%s", err.Error())
            return err
        }
    
        return nil
    }
    
    func GetMaxchild(children []string) (child string) {
        if 0 == len(children) {
            return ""
        }
    
        var maxChild = children[0]
        maxIndex := maxChild[1:]
        for _, value := range children {
            curIndex := value[1:]
            if curIndex > maxIndex {
                maxIndex = curIndex
            }
            maxChild = value
        }
    
        return maxChild
    }
    
    func GetMaxWritechild(children []string) (child string) {
        //过滤所有写节点
        writeChildren := make([]string, 0)
        for _, value := range children {
            if strings.HasPrefix(value, "w") {
                writeChildren = append(writeChildren, value)
            }
        }
    
        if 0 == len(writeChildren) {
            return ""
        }
    
        var maxChild = children[0]
        maxIndex := maxChild[1:]
        for _, value := range children {
            curIndex := value[1:]
            if curIndex > maxIndex {
                maxIndex = curIndex
            }
            maxChild = value
        }
    
        return maxChild
    }
  • 相关阅读:
    absorb|state|
    confessed to doing|conform|confined|entitle|
    relieved|auction|calculate|campaign|charge for |chartered
    worship|spurs|drowns out|frauds|expell|spray with|deposit|moist|gave a sigh
    discount the possibility|pessimistic|bankrupt|
    every|each|the用于姓氏的复数形式|comrades-in-arms|clothes are|word|steel|affect|effect
    Measures of Center
    2020年会分享
    source insight 4.0的基本使用方法
    opencv doc学习计划
  • 原文地址:https://www.cnblogs.com/zengyjun/p/10139098.html
Copyright © 2011-2022 走看看