zoukankan      html  css  js  c++  java
  • 基于ZooKeeper的分布式锁

    一、简介

      锁的概念,在Java日常开发和面试中,都是个很重要的知识点。锁能很好的控制生产数据的安全性,比如商品的数量超卖问题等。传统的做法中,可以直接利用数据库锁(行锁或者表锁)来进行数据访问控制。随着请求量逐步变多的情况下,将压力怼到数据库上会对其性能产生极大影响。这时候,单体应用中可以利用JVM锁,在程序层面进行访问的控制,将压力前移,对数据库友好。当请求量再进一步变多,这时候一般会考虑集群分布式去处理,不断的加机器来抗压。这时候,JVM锁就不能很好的控制压力了,同一时刻还是会有大量请求怼到数据库上,这时就需要提升为分布式锁去控制了,将压力继续停留在程序层面。

      Java的面向接口编程,可以很好很快的去切换实现而不需要动业务代码部分。下面,基于Lock接口去使用锁。

      zookeeper的集群搭建:https://www.cnblogs.com/eric-fang/p/9283904.html

    二、JVM锁

      基于ReentrantLock实现锁控制,业务控制层service部分代码如下,用 lock 锁去控制并发访问

    package com.cfang.service;
    
    import java.sql.Time;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Scope;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Isolation;
    import org.springframework.transaction.annotation.Transactional;
    
    import com.cfang.dao.ProductDao;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Service
    @Slf4j
    @Scope("prototype")
    public class ProductWithLockService {
        
        private Lock lock = new ReentrantLock();
    
        @Autowired
        private ProductDao productDao;
        
        @Transactional
        public boolean buy(String userName, String productname, int number) {
            boolean result = false;
            try {
                lock.lock();
    //            TimeUnit.SECONDS.sleep(1);
                log.info("用户{}欲购买{}个{}",  userName, number, productname);
                int stock = productDao.getStock(productname);
                log.info("{} 查询数量{}...", userName, stock);
                if(stock < number) {
                    log.warn("库存不足...");
                    return false;
                }
                result = productDao.buy(userName, productname, number);
            } catch (Exception e) {
                
            } finally {
                log.info("{} 释放锁...", userName);
                lock.unlock();
            }
            log.info("{}购买结果,{}",userName,  result);
            return result;
        }
    }

      在单体应用中,这样子使用是可以的,但是当应用部署多套的时候,那么,就不能很好的保障并发控制了,同一时刻的请求可能会大量打到数据库上。所以,这就引入下面的分布式锁去控制了。

    三、基于ZooKeeper的分布式锁

        首先,锁获取释放的工具类:

    package com.cfang.zkLockUtil;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.I0Itec.zkclient.exception.ZkNodeExistsException;
    import org.apache.commons.lang3.StringUtils;
    
    import com.cfang.zkClient.MyZkSerializer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ZkLockUtil implements Lock{
        
        private String znode;
        private ZkClient zkClient;
        
        public ZkLockUtil(String znode) {
            if(StringUtils.isBlank(znode)) {
                throw new IllegalArgumentException("锁节点znode不能为空字符串");
            }
            this.znode = znode;
            this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
            this.zkClient.setZkSerializer(new MyZkSerializer());
        }
    
        @Override
        public void lock() {
            if(!tryLock()) { //抢锁失败
                // 阻塞等待锁节点的释放
                waitLock();
                //递归调用,重新尝试去抢占锁
                lock();
            }
        }
        
        private void waitLock() {
            CountDownLatch latch = new CountDownLatch(1);
            // 注册监听znode锁节点变化,当删除的时候,说明锁被释放
            IZkDataListener listener = new IZkDataListener() {
                
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    log.info("znode节点被删除,锁释放...");
                    latch.countDown();
                }
                
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                }
            };
            this.zkClient.subscribeDataChanges(this.znode, listener);
            try {
                // 阻塞等待锁znode节点的删除释放
                if(this.zkClient.exists(znode)) {
                    latch.await();
                }
            } catch (Exception e) {
            }
            //取消znode节点监听
            this.zkClient.unsubscribeDataChanges(this.znode, listener);
        }
        
        @Override
        public boolean tryLock() {
            boolean result = false;
            try {
                this.zkClient.createEphemeral(znode); //创建临时节点
                result = true;
            } catch (ZkNodeExistsException e) {
                log.warn("锁节点znode已存在,抢占失败...");
                result = false;
            } catch (Exception e) {
                log.warn("创建锁节点znode异常,{}...", e.getMessage());
            }
            return result;
        }
    
        @Override
        public void unlock() {
            zkClient.delete(znode);
        }
        
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            // TODO Auto-generated method stub
            return false;
        }
        
        @Override
        public void lockInterruptibly() throws InterruptedException {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public Condition newCondition() {
            // TODO Auto-generated method stub
            return null;
        }
    
    }

      业务控制service中,就是将基本的JVM锁的service中,Lock的实现更换即可:

    private Lock lock = new ZkLockUtil("/p1node");

      当程序运行中,所有的请求会去争抢创建zk节点,谁创建成功,则就获得锁资源,继续执行业务代码。其他所有线程基于递归等待,等待zk节点的删除,然后再去尝试争抢创建。达到控制并发的目的。

    但是,这种但是有个不好的地方,也就是,当一个锁释放后,所有的线程都会一下子全去争抢,每次都是轮回这样哄抢的过程,会有一定的压力,也不必如此。所以,下面基于zk永久节点下临时顺序节点做点改善,每个线程节点,只需要关注前面一个节点变化即可,不需要造成哄抢事件。

    四、ZooKeeper的分布式锁提高版

       锁获取释放的工具类:

    package com.cfang.zkLockUtil;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    import org.apache.commons.lang3.StringUtils;
    
    import com.cfang.zkClient.MyZkSerializer;
    
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ZKLockImproveUtil implements Lock{
        
        private String znode;
        private ZkClient zkClient;
        private ThreadLocal<String> currentNode = new ThreadLocal<String>(); //当前节点
        private ThreadLocal<String> beforeNode = new ThreadLocal<String>();  //前一个节点
        
        public ZKLockImproveUtil(String znode) {
            if(StringUtils.isBlank(znode)) {
                throw new IllegalArgumentException("锁节点znode不能为空字符串");
            }
            this.znode = znode;
            this.zkClient = new ZkClient("111.231.51.200:2181,111.231.51.200:2182,111.231.51.200:2183");
            this.zkClient.setZkSerializer(new MyZkSerializer());
            
            try {
                if(!this.zkClient.exists(znode)) {
                    this.zkClient.createPersistent(znode, true); // true是否创建层级目录
                }
            } catch (Exception e) {
            }
        }
    
        @Override
        public void lock() {
            if(!tryLock()) {
                waitLock();
                lock();
            }
        }
        
        private void waitLock() {
            CountDownLatch latch = new CountDownLatch(1);
            IZkDataListener listener = new IZkDataListener() {
                
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    log.info("{}节点删除,锁释放...", dataPath);
                    latch.countDown();
                }
                
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                }
            };
            
            this.zkClient.subscribeDataChanges(beforeNode.get(), listener);
            
            try {
                if(this.zkClient.exists(beforeNode.get())) {
                    latch.await();
                }
            } catch (Exception e) {
            }
            
            this.zkClient.unsubscribeDataChanges(beforeNode.get(), listener);
        }
    
        @Override
        public boolean tryLock() {
            boolean result = false;
            // 创建顺序临时节点
            if(null == currentNode.get() || !this.zkClient.exists(currentNode.get())) {
                String enode = this.zkClient.createEphemeralSequential(znode + "/", "zk-locked");
                this.currentNode.set(enode);
            }
            // 获取znode节点下的所有子节点
            List<String> list = this.zkClient.getChildren(znode);
            Collections.sort(list);
            
            /**
             * 如果当前节点是第一个的话,则是为获取锁,继续执行
             * 不是头结点的话,则去查询其前面一个节点,然后准备监听前一个节点的删除释放操作
             */
            
            if(currentNode.get().equals(this.znode + "/" + list.get(0))) {
                log.info("{}节点为头结点,获得锁...", currentNode.get());
                result = true;
            } else {
                int currentIndex = list.indexOf(currentNode.get().substring(this.znode.length() + 1));
                String bnode = this.znode + "/" + list.get(currentIndex - 1);
                this.beforeNode.set(bnode);
            }
            return result;
        }
    
        @Override
        public void unlock() {
            if(null != this.currentNode) {
                this.zkClient.delete(currentNode.get());
                this.currentNode.set(null);
            }
        }
        
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            // TODO Auto-generated method stub
            return false;
        }
        
        @Override
        public void lockInterruptibly() throws InterruptedException {
            // TODO Auto-generated method stub
        }
    
        @Override
        public Condition newCondition() {
            // TODO Auto-generated method stub
            return null;
        }
    
        
    }

      service中更换实现:

    private Lock lock = new ZKLockImproveUtil("/pnode");

    五、小结

      主要是学习测试使用,并未考虑到生产实际的问题,比如 如果业务处理中假死状态,导致zk不释放锁,那么就会导致死锁问题(可以对锁节点来个有效期处理)。

      上述为部分代码片段,整体工程可以在github上获取,地址:https://github.com/qiuhan00/zkLock

  • 相关阅读:
    【NOIP 2003】 加分二叉树
    【POJ 1655】 Balancing Act
    【HDU 3613】Best Reward
    【POJ 3461】 Oulipo
    【POJ 2752】 Seek the Name, Seek the Fame
    【POJ 1961】 Period
    【POJ 2406】 Power Strings
    BZOJ3028 食物(生成函数)
    BZOJ5372 PKUSC2018神仙的游戏(NTT)
    BZOJ4836 二元运算(分治FFT)
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11837194.html
Copyright © 2011-2022 走看看