zoukankan      html  css  js  c++  java
  • ZooKeeper-3.5.6分布式锁

    原理
    基本方案是基于ZooKeeper的临时节点与和watch机制。当要获取锁时在某个目录下创建一个临时节点,创建成功则表示获取锁成功,创建失败则表示获取锁失败,此时watch该临时节点,当该临时节点被删除后再去尝试获取锁。临时节点好处在于,当客户端崩溃后自动删除临时节点的同时锁也被释放了。该方案有个缺点缺点,就是大量客户端监听同一个节点,当锁释放后所有等待的客户端同时尝试获取锁,并发量很大。因此有了以下优化的方案。
    优化方案基于ZooKeeper的临时有序节点和watch机制。当要获取锁时在某个目录下创建一个临时有序节点,每次创建均能成功,只是返回的节点序号不同。只有返回的节点序号是该目录下最小的才表示获取锁成功,否则表示获取锁失败,此时watch节点序号比本身小的前一个节点,当watch的节点被删除后再去尝试获取锁。
     
    示例
    1.使用Maven引入ZooKeeper客户端:
    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.6</version>
    </dependency>
    2.工具类
    package com.example.demo;
     
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
     
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
     
    public class ZkLockUtils implements Watcher {
     
    private ZooKeeper zk;
     
    /**
    * 锁根节点
    */
    private String lockRoot;
     
    /**
    * 锁子节点前缀
    */
    private String lock;
     
    /**
    * 锁子节点分隔字符串
    * 用于筛选锁根节点下和锁相关的子节点
    */
    private String splitStr;
     
    /**
    * 要监听的锁子节点
    */
    private String watchNode;
     
    /**
    * 当前创建的锁子节点
    */
    private String currentNode;
     
    /**
    * 等待计数器
    */
    private CountDownLatch latch;
     
    /**
    * 初始化
    *
    * @param connectString 连接字符串,如果多个则使用逗号分隔,例如127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
    * @param lockRoot 锁根节点,“/”开头
    * @param lock 锁子节点前缀,不带“/”
    * @param splitStr 锁子节点分隔字符串
    * @throws Exception Exception
    */
    public ZkLockUtils(String connectString, String lockRoot, String lock, String splitStr) throws Exception {
    this.lockRoot = lockRoot;
    this.lock = lock;
    if (lock.contains(splitStr)) {
    throw new RuntimeException("lock不能包含splitStr");
    }
    this.splitStr = splitStr;
     
    // 会话超时时间30秒
    int sessionTimeout = 30000;
    // 初始化zk客户端,监视器设置为当前类
    zk = new ZooKeeper(connectString, sessionTimeout, this);
    // 如果锁根节点不存在,则创建
    Stat stat = zk.exists(lockRoot, false);
    if (stat == null) {
    zk.create(lockRoot, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }
    }
     
    /**
    * 收到通知时执行
    */
    public void process(WatchedEvent event) {
    if (latch != null) {
    latch.countDown();
    }
    }
     
    /**
    * 获取锁,如果超过等待时间,则获取锁失败
    *
    * @param waitTime 等待时间
    * @param timeUnit 时间单位
    * @return true(成功) false(失败)
    * @throws Exception Exception
    */
    public boolean lock(long waitTime, TimeUnit timeUnit) throws Exception {
    // 创建临时有序锁子节点
    currentNode = zk.create(lockRoot + "/" + lock + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    // 取出所有锁根节点的子节点
    List<String> childNodes = zk.getChildren(lockRoot, false);
    // 筛选锁根节点下和锁相关的子节点
    List<String> lockChildNodes = new ArrayList<>();
    for (String childNode : childNodes) {
    String str = childNode.split(splitStr)[0];
    if (str.equals(lock)) {
    lockChildNodes.add(childNode);
    }
    }
    // 升序排序
    Collections.sort(lockChildNodes);
    // 如果是最小的锁子节点,则获得锁
    if (currentNode.equals(lockRoot + "/" + lockChildNodes.get(0))) {
    System.out.println("当前是最小的锁子节点,获取锁成功");
    return true;
    }
    // 如果不是最小的锁子节点,找到索引比自己小1的锁子节点
    String previousNode = currentNode.substring(currentNode.lastIndexOf("/") + 1);
    watchNode = lockChildNodes.get(Collections.binarySearch(lockChildNodes, previousNode) - 1);
     
    // 判断要监听的锁子节点是否存在并监听该节点,如果不存在则该节点已被删除,直接获得锁
    Stat stat = zk.exists(lockRoot + "/" + watchNode, true);
    if (stat != null) {
    latch = new CountDownLatch(1);
    boolean b = latch.await(waitTime, timeUnit);
    latch = null;
    if (b) {
    System.out.println("监听到锁子节点被删除,获取锁成功");
    return true;
    } else {
    System.out.println("等待超时,获取锁失败");
    return false;
    }
    } else {
    System.out.println("要监听的锁子节点不存在,获取锁成功");
    return true;
    }
    }
     
    /**
    * 释放锁
    */
    public void unlock() throws Exception {
    // version为-1表示匹配任何版本
    zk.delete(currentNode, -1);
    watchNode = null;
    currentNode = null;
    zk.close();
    }
    }
    3.测试
    package com.example.demo;
     
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
     
    public class Demo {
    public static void main(String[] args) throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(2);
     
    Thread thread1 = new Thread(() -> {
    ZkLockUtils zkLockUtils = null;
    try {
    zkLockUtils = new ZkLockUtils("127.0.0.1:3181,127.0.0.1:3182,127.0.0.1:3183", "/locks", "lock", "_");
    zkLockUtils.lock(5, TimeUnit.SECONDS);
    Thread.sleep(10000);
    countDownLatch.countDown();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    if (zkLockUtils != null) {
    try {
    zkLockUtils.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    });
     
    thread1.start();
     
    Thread thread2 = new Thread(() -> {
    ZkLockUtils zkLockUtils = null;
    try {
    zkLockUtils = new ZkLockUtils("127.0.0.1:3181,127.0.0.1:3182,127.0.0.1:3183", "/locks", "lock", "_");
    zkLockUtils.lock(5, TimeUnit.SECONDS);
    Thread.sleep(10000);
    countDownLatch.countDown();
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    if (zkLockUtils != null) {
    try {
    zkLockUtils.unlock();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }
    });
     
    thread2.start();
     
    countDownLatch.await();
    System.out.println("所有子线程都执行完毕");
    }
    }
     
    优缺点
    1.Redis分布式锁需要轮询获取锁,性能开销较大。ZooKeeper分布式锁基于watch机制监听节点,不需要轮询获取锁,性能开销较小。
    2.如果Redis获取锁的客户端非正常退出,那么只能等待超时时间之后才能释放锁。Redis因为创建的是临时节点,只要客户端崩溃或者连接断开,临时节点就会被删除,锁也就被立即释放了。
  • 相关阅读:
    新建一个线程作为服务端
    设置并查看pthread创建线程时传入参数中堆栈大小值
    libevent2.0.22备忘录
    Centos7如何切换启动的内核
    svn常见问题及解决方法
    nodejs monk对接mongodb密码全过程
    百万并发的长连接是否会耗尽反向代理的端口号
    Linux文件描述符限制和单机最大长连接数
    ps命令支持的最大的进程号是多少
    nginx在配置反向代理后,启动时域名不通启动报错
  • 原文地址:https://www.cnblogs.com/gjb724332682/p/12098990.html
Copyright © 2011-2022 走看看