zoukankan      html  css  js  c++  java
  • zk分布锁的java实现

    只做记录,直接上代码

    父类:

    package com.ylcloud.common.lock;
    
    import com.alibaba.fastjson.JSON;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.SerializableSerializer;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * @author cjh
     * @Description: zk分布锁
     * @date: 2018/9/27 11:36
     */
    public class ZkLock {
    
        private static Logger logger = LogManager.getLogger(ZkLock.class);
    
        public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";
    
        public static Integer sessionTimeout = 30000;
    
        public static Integer connectTimeout = 30000;
    
        /**
         * 节点锁标记
         */
        public String lockPath;
    
        /**
         * 前一个节点(设置用户添加监听器)
         */
        public String beforeNode;
    
        /**
         * 当前执行节点(设置用于删除)
         */
        public String currentNode;
    
        /**
         * 当前请求节点
         */
        public String threadTag = null;
    
        private String lock1 = null;
    
        private String lock2 = null;
    
        public static final ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, sessionTimeout, connectTimeout, new SerializableSerializer());
    
        private static final ThreadLocal<String> NODES = new ThreadLocal<String>();
    
        public ZkLock() {
        }
    
        public void init(String code) {
            this.lockPath = code;
            this.lock1 = code + "LOCK";
            this.lock2 = code + "UNLOCK";
            client.deleteRecursive(lockPath);
        }
    
        public void lock() {
    
            synchronized (lock1) {
                if (!client.exists(lockPath)) {
                    client.createPersistent(lockPath);
                }
    
                if (!tryLock()) {
                }
    
            }
    
        }
    
        public void unlock() {
    
            List<String> childrens = client.getChildren(lockPath);
            Collections.sort(childrens);
    
            String nodes = NODES.get();
            logger.info(JSON.toJSONString(childrens) + " ==== " + nodes + " ==== " + (nodes.equals(lockPath + '/' + childrens.get(0))));
            if (childrens.size() > 0 && nodes.equals(lockPath + '/' + childrens.get(0))) {
                client.delete(nodes);
            }
    
        }
    
        private boolean tryLock() {
    
            threadTag = client.createEphemeralSequential(lockPath + '/', "");
    
            NODES.set(threadTag);
    
            List<String> childrens = client.getChildren(lockPath);
            Collections.sort(childrens);
    
            currentNode = lockPath + '/' + childrens.get(0);
    
            if (threadTag.equals(currentNode)) {
                return true;
            } else {
    
                currentNode = threadTag;
    
                int wz = Collections.binarySearch(childrens, threadTag.substring(lockPath.length() + 1));
                beforeNode = lockPath + '/' + childrens.get(wz - 1);
    
                final CountDownLatch latch = new CountDownLatch(1);
                try {
    
                    client.subscribeDataChanges(beforeNode, new IZkDataListener() {
    
                        @Override
                        public void handleDataDeleted(String dataPath) throws Exception {
                            if (latch != null && latch.getCount() > 0) {
                                latch.countDown();
                            }
                        }
    
                        @Override
                        public void handleDataChange(String dataPath, Object data) throws Exception {
                        }
                    });
    
                    if (client.exists(beforeNode)) {
                        latch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                    }
    
                } catch (Exception e) {
                    return true;
                } finally {
                }
    
            }
            return false;
        }
    
    }

    子类

    package com.ylcloud.common.lock.ext;
    
    import com.ylcloud.common.lock.ZkLock;
    
    /**
      * @Description: 用户编码锁
      * @author cjh
      * @date: 2018/10/10 14:47
      */
    public class ZkLockUserCode extends ZkLock {
    
        public ZkLockUserCode() {
            super.init("/USER_CODE");
        }
    
    }

    使用示例:

    private ZkLock zkLock = new ZkLockUserCode();
    
    public void addUser() {
        
        try {
    
            zkLock.lock();
    
                /**
                 *业务实现
                 */
                
        } catch (Exception e) {
            logger.info("err {} {} ", e.getMessage(), e.getCause());
    
    
        } finally {
            zkLock.unlock();
        }
            
    }     

    注意:unlock必须写在finally里面,否则一旦业务出现运行错误造成没有解锁,下一次访问的人就需要等待一个sessionTime了

    题外话:zk在linux上启动命令 ./zkServer.sh start

    自己实现的很多细节没考虑到导致在高并发的项目中出现了问题,然后我改用了Curator框架来操控zookeeper实现分布锁:

        <curator.version>4.0.0</curator.version> 
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>${curator.version}</version>
            </dependency>    

    锁实现代码:

    package com.ylcloud.common.lock;
    
    import com.ylcloud.common.lock.ext.ZkLockRoleCode;
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.serialize.SerializableSerializer;
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * @author cjh
     * @Description: zk分布锁
     * @date: 2018/9/27 11:36
     */
    public class ZkLock {
    
        private static Logger logger = LogManager.getLogger(ZkLock.class);
    
        public static String ZOOKEEPER_IP_PORT = "127.0.0.1:2181";
    
        public static String NAME_SPACE = "YL_CLOUD";
    
        public static Integer sessionTimeout = 15000;
    
        public static Integer connectTimeout = 30000;
    
        private static Object initLock = new Object();
    
        public static CuratorFramework client = null;
    
        private static Map<String,ZkLock> tagLocks = new HashMap<String,ZkLock>();
    
        public ZkLock() {
        }
    
        public InterProcessMutex mutex = null;
    
        static {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            client =
                    CuratorFrameworkFactory.builder()
                            .connectString(ZOOKEEPER_IP_PORT)
                            .sessionTimeoutMs(sessionTimeout)
                            .connectionTimeoutMs(connectTimeout)
                            .retryPolicy(retryPolicy)
                            .namespace(NAME_SPACE)
                            .build();
    
            client.start();
        }
    
        public void init(String code) {
            this.mutex = new InterProcessMutex(client,code);
        }
    
        //prefix 标记  code 主键
        public static ZkLock getTagLock(String prefix,String code){
            String tag = prefix + "/" + code;
    
            if(!tagLocks.containsKey(tag)){
                synchronized (initLock){
                    if(!tagLocks.containsKey(tag)){
                        ZkLock zkLock = new ZkLock();
                        zkLock.init(tag);
                        tagLocks.put(tag,zkLock);
                    }
                }
            }
    
            return tagLocks.get(tag);
        }
    
        public void lock(){
            try {
                this.mutex.acquire();
            } catch (Exception e) {
                logger.error("加锁失败");
            }
        }
    
        public void unlock(){
            try {
                this.mutex.release();
            } catch (Exception e) {
                logger.error("锁释放失败");
            }
        }
    
    }

    调用代码(不变)

    转载请注明博客出处:http://www.cnblogs.com/cjh-notes/

  • 相关阅读:
    转-文件批量重命名
    解决死锁之路(终结篇)
    Python批量修改文件名
    Android AudioTrack分析
    Android Audio介绍
    Android GNSS介绍
    Android HIDL介绍
    【vue】类和内联样式绑定
    【vue】计算属性
    svn post-commit不能同步
  • 原文地址:https://www.cnblogs.com/cjh-notes/p/9744183.html
Copyright © 2011-2022 走看看