zoukankan      html  css  js  c++  java
  • Zookeeper实现分布式锁

    分布式锁的比较



    zookeeper



    一,curator分布式锁
    1. InterProcessMutex 实现可重入锁
    全局同步的可重入分布式锁,任何时刻不会有两个客户端同时持有该锁。Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端(线程)在拥有锁的同时,可以多次获取,不会被阻塞。

    CuratorFramework client = CuratorFrameworkFactory.newClient(ZOOKEEPERSTRING, new ExponentialBackoffRetry(1000, 3));
    client.start();
    lock = new InterProcessMutex(client, lockPAth);

    try{
        //lock
        lock.acquire();
        // dosomething
    }finlly{
        //unlock
        lock.release();
    }

    2.不可重入 自定义
    package com.itheima.itheimazookeeperlock.zklock;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    // 实现这个 Lock接口的目的就是规范。
    public class ZookeeperDistributeLock implements Lock {
    
        // zookeeper的分布式锁:是采用zookeeper的临时节点类完成,临时节点具有超时的释放的特征。
        // WORKSPACE 用来隔离,其目录隔离 分类,所有的临时节点都必须放在这里统一管理。
    
        // 锁工作区间.用来隔离,其目录隔离
        private final static String WORKSPACE = "/lock-zookeeper";
    
        // 锁的名称或者说是锁的分离
        private String  lockName;
    
        // 第三方的zookeeper的客户端 zkClient / apchee curator
        private CuratorFramework client;
    
        // zookeeper的服务器,或者集群
        private static final String ZOOKEEPERSERVER = "127.0.0.1:2181";
        // 集群几点如下
        //private static final String ZOOKEEPERSERVER = "127.0.0.01:2181,xxxxxip:2181,xxxxip:2118";
    
        public ZookeeperDistributeLock(String lockName){//store
            this.lockName = lockName;
            init();
        }
    
    
        private void init(){
            // 1: 初始化 zk客户端对象 4中
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,10);
            client = CuratorFrameworkFactory.newClient(ZOOKEEPERSERVER,5000,1000,retryPolicy);
            client.start();
    
            // 2: 创建工作区--持久节点
            try{
                // 判断工作区间节点是否存在,不存在返回null。存在的话就抛出异常ZNodeExisteException,这样做法就是排他性。
                if(client.checkExists().forPath(WORKSPACE)==null){
    
                    // 创建节点
                    client.create().creatingParentsIfNeeded()
                            //创建持久节点  CreateMode.PERSISTENT
                            .withMode(CreateMode.PERSISTENT)
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                            .forPath(WORKSPACE);
                }
            }catch(Exception ex){
                ex.printStackTrace();
            }
        }
    
    
    
        @Override
        public void lock() {
           while (true){
               // 临时节点的创建
               String lockPath = WORKSPACE + "/" + lockName;
               try {
                   // 判断工作区间节点是否存在,不存在返回null。存在的话就抛出异常ZNodeExisteException,这样做法就是排他性。
                   if(client.checkExists().forPath(lockPath)==null){
                       // 创建节点
                       client.create().creatingParentsIfNeeded()
                               //创建临时节点  CreateMode.PERSISTENT
                               .withMode(CreateMode.EPHEMERAL)
                               .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                               .forPath(lockPath);
                       // 1: zookeeper  节点具有排他性
                       // 2:但是没有阻塞的,怎么解决
                       System.out.println("get lock success .........");
                       return;
                   }else{
                       // 其他的进程,全部在这里阻塞。
                       listenerrWait();
                   }
               }catch (Exception ex){
                   // 为什么捕获隐藏,就是排他性
                   try{
                       listenerrWait();
                   }catch(Exception ex2){
                        ex2.printStackTrace();
                   }
               }
           }
        }
    
    
    
        // 监听 ---所有请求
        public void listenerrWait() throws  Exception{
    
            // 阻塞类
            final CountDownLatch countDownLatch = new CountDownLatch(1);
    
            // 初始化话子节点监听器
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, WORKSPACE, true);
            pathChildrenCache.start();
    
            // 定义监听器,这个监听器就监听WORKSPACE的目录的变化情况,如果一定放生任何的改变,就会出发zk的watcher机制
            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
                    System.out.println(event.getType().name());
                    if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
                        System.out.println("子节点删除了.....开始释放锁....");
                        // 释放锁,通知别的进程来获取锁
                        countDownLatch.countDown();
                    }
                }
            });
            //类自旋的东西
            // 阻塞进程,不往下执行 await
            countDownLatch.await(5, TimeUnit.SECONDS);
        }
    
    
        @Override
        public void unlock() {
            // 临时节点的创建
            String lockPath = WORKSPACE + "/" + lockName;
            try{
               if (client.checkExists().forPath(lockPath)!=null) {
                    client.delete().forPath(lockPath);
                   System.out.println("释放锁..........");
               }
            }catch(Exception ex){
                ex.printStackTrace();
            }
        }
    
        
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock() {
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    









  • 相关阅读:
    gcc编译器创建和使用静态库、动态库
    shared_ptr & unique_ptr & weak_ptr (C++11)
    MyString(重写String)
    Linux进程间通讯的几种方式的特点和优缺点,和适用场合
    按行N等分某个文件
    Hbase region 某个regionserver挂掉后的处理
    gentoo
    Hbase 常用shell命令
    网络爬虫速成指南(二)网页解析(基于算法)
    hbase 0.96 java 示例
  • 原文地址:https://www.cnblogs.com/xues/p/11842216.html
Copyright © 2011-2022 走看看