zoukankan      html  css  js  c++  java
  • 分布式锁

    1.redis锁

    1.工具类代码:

    package com.lhw.chche;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisCommands;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class RedisLock {
        private final static Logger LOG = LoggerFactory.getLogger(RedisLock.class);
        static StringBuilder sb = new StringBuilder();
        static {
            sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call("del",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
        }
        public static final String UNLOCK_LUA = sb.toString();
        
        /**
         * redis加锁
         * @param key lock
         * @param expire
         * @return
         */
        public static String setLock(final String key, final long expire) {
            //System.out.println("1--------------setLock,key="+key);
            try {
                RedisCallback<String> callback = new RedisCallback<String>() {
                    @Override
                    public String doInRedis(RedisConnection connection) throws DataAccessException {
                        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                        return commands.set(key, key, "NX", "PX", expire);
                    }
                };
                String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
                //System.out.println("1--------------setLock,result="+result);
                if (StringUtils.isEmpty(result)) {
                    LOG.warn("key="+key+",获取锁失败。。。。。");
                    return null;
                }else {
                    if(LOG.isInfoEnabled()) {
                        LOG.info("key="+key+",获取锁成功。。。。。");
                    }
                    return key;
                }
            } catch (Exception e) {
                LOG.error("",e);
                LOG.warn("key="+key+",获取锁异常。。。。。");
            }
            return null;
        }
        /**
         * redis 解锁
         * @param key
         * @param requestId
         * @return
         */
        public static boolean releaseLock(String key, String requestId) {
            try {
                final List keys = new ArrayList();
                keys.add(key);
                final List args = new ArrayList();
                args.add(requestId);
    
                RedisCallback callback = new RedisCallback() {
                    @Override
                    public Long doInRedis(RedisConnection connection) throws DataAccessException {
                        Object nativeConnection = connection.getNativeConnection();
    
                        if ((nativeConnection instanceof JedisCluster)) {
                            return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
    
                        if ((nativeConnection instanceof Jedis)) {
                            return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
                        return Long.valueOf(0L);
                    }
                };
                Long result = (Long) RedisOperationConfig.getStringRedisTemplate().execute(callback);
    
                boolean bool = (result != null) && (result.longValue() > 0L);
                return bool;
            } catch (Exception e) {
    
            } finally {
            }
            return false;
        }
    
        /**
         * 获取redis
         * @param key
         * @return
         */
        public static String getLockValue(final String key){
            try {
                RedisCallback<String> callback = new RedisCallback<String>() {
                    @Override
                    public String doInRedis(RedisConnection connection) throws DataAccessException {
                        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                        return commands.get(key);
                    }
                };
                String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
                if (StringUtils.isEmpty(result)) {
                    LOG.warn("key="+key+",获取锁值失败。。。。。");
                    return null;
                }else {
                    if(LOG.isInfoEnabled()) {
                        LOG.info("key="+key+",获取锁值成功。。。。。");
                    }
                    return result;
                }
            } catch (Exception e) {
                LOG.error("",e);
                LOG.warn("key="+key+",获取锁异常。。。。。");
            }
            return null;
        }
    }

      

    2.使用场景顺序

    1 --> taskNo是一个任务的唯一值,先进行上锁.看是否成功,成功则继续

    2 --> taskNo+时间(年月日),先进行上锁.看是否成功,成功则继续,执行具体的任务

    3 --> 设置 2里面 taskNo+时间(年月日)的过期时间

    4 --> 释放 1里面 taskNo 的锁

    1.
            String rediskey = "com.lhw.lock."+taskNo;
            String lock = RedisLock.setLock(rediskey,outTime);
    2.
            String redisDealkey = "com.lhw.dealLock."+taskNo+"."+ymd;
            String dealLock = RedisLock.getLockValue(redisDealkey);
    3.
            //设置当天任务已执行的开关值,默认12小时过期时间
            RedisLock.setLock(redisDealkey,12*60*60*1000);
    4.
         RedisLock.releaseLock(rediskey,rediskey);

    2.zookeeper锁

    图解:公平锁和可重入锁 模型

    分布式锁的概念和原理,比较抽象难懂。如果用一个简单的故事来类比,估计就简单多了。

    很久以前,在一个村子有一口井,水质非常的好,村民们都抢着取井里的水。井就那么一口,村里的人很多,村民为争抢取水打架斗殴,甚至头破血流。

    问题总是要解决,于是村长绞尽脑汁,最终想出了一个凭号取水的方案。井边安排一个看井人,维护取水的秩序。

    说起来,秩序很简单,取水之前,先取号。号排在前面的,就可以先取水。先到的排在前面,那些后到的,没有排在最前面的人,一个一个挨着,在井边排成一队。取水示意图如下 :

    这种排队取水模型,就是一种锁的模型。排在最前面的号,拥有取水权,就是一种典型的独占锁。另外,先到先得,号排在前面的人先取到水,取水之后就轮到下一个号取水,至少,看起来挺公平的,说明它是一种公平锁。

    在公平独占锁的基础上,再进一步,看看可重入锁的模型。

    假定,取水时以家庭为单位,哪个家庭任何人拿到号,就可以排号取水,而且如果一个家庭有一个人拿到号,其它家人这时候过来打水不用再取号。新的排号取水示意图如下 :

    如上图的1号,老公有号,他的老婆来了,直接排第一个,妻凭夫贵。再看上图的2号,父亲正在打水,他的儿子和女儿也到井边了,直接排第二个,这个叫做子凭父贵。 等等,如果是同一个家庭,可以直接复用排号,不用重新取号从后面排起。

    以上这个故事模型,就是可以重入锁的模型。只要满足条件,同一个排号,可以用来多次取水。在锁的模型中,相当于一把锁,可以被多次锁定,这就叫做可重入锁。

    zookeeper分布式锁的原理

    理解了锁的原理后,就会发现,Zookeeper 天生就是一副分布式锁的胚子。

    首先,Zookeeper的每一个节点,都是一个天然的顺序发号器。

    在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

    比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。

     

    其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。

    一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

    第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。

    每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

    Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听linsten或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。

    为什么说Zookeeper的节点监听机制,可以说是非常完美呢?

    一条龙式的首尾相接,后面监视前面,就不怕中间截断吗?比如,在分布式环境下,由于网络的原因,或者服务器挂了或则其他的原因,如果前面的那个节点没能被程序删除成功,后面的节点不就永远等待么?

    其实,Zookeeper的内部机制,能保证后面的节点能够正常的监听到删除和获得锁。在创建取号节点的时候,尽量创建临时znode 节点而不是永久znode 节点,一旦这个 znode 的客户端与Zookeeper集群服务器失去联系,这个临时 znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。

    说Zookeeper的节点监听机制,是非常完美的。还有一个原因。

    Zookeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反映,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反映。

    1.简单的分布式锁

    代码实现

    AbstractLock.java

    package zklock;
     
    import org.I0Itec.zkclient.ZkClient;
     
    public abstract class AbstractLock {
     
        //zk地址和端口
        public static final String ZK_ADDR = "192.168.0.230:2181";
        //超时时间
        public static final int SESSION_TIMEOUT = 10000;
        //创建zk
        protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
        
        
        /**
         * 可以认为是模板模式,两个子类分别实现它的抽象方法
         * 1,简单的分布式锁
         * 2,高性能分布式锁
         */
        public void getLock() {
            String threadName = Thread.currentThread().getName();
            if (tryLock()) {
                System.out.println(threadName+"-获取锁成功");
            }else {
                System.out.println(threadName+"-获取锁失败,进行等待...");
                waitLock();
                //递归重新获取锁
                getLock();
            }
        }
        
        public abstract void releaseLock();
        
        public abstract boolean tryLock();
        
        public abstract void waitLock();
    }

     AbstractLock类是个抽象类,里面getLock使用模板模式,子类分别是简单的zk锁和高性能的zk锁

    SimpleZkLock.java

    package zklock;
     
    import java.util.concurrent.CountDownLatch;
     
    import org.I0Itec.zkclient.IZkDataListener;
     
    /**
     * 简单的分布式锁的实现
     */
    public class SimpleZkLock extends AbstractLock {
     
        private static final String NODE_NAME = "/test_simple_lock";
        
        private CountDownLatch countDownLatch;
        
        @Override
        public void releaseLock() {
            if (null != zkClient) {
                //删除节点
                zkClient.delete(NODE_NAME);
                zkClient.close();
                System.out.println(Thread.currentThread().getName()+"-释放锁成功");
            }
            
        }
     
        //直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常
        @Override
        public boolean tryLock() {
            if (null == zkClient) return false;
            try {
                zkClient.createEphemeral(NODE_NAME);
                return true;
            } catch (Exception e) {
                return false;
            }
        }
     
        @Override
        public void waitLock() {
            //监听器
            IZkDataListener iZkDataListener = new IZkDataListener() {
                //节点被删除回调
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                }
                //节点改变被回调
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    // TODO Auto-generated method stub
                    
                }
            };
            zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
            //如果存在则阻塞
            if (zkClient.exists(NODE_NAME)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName()+" 等待获取锁...");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            //删除监听
            zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
        }
     
    }

    SimpleZkLock 表示简单的zk分布式锁,逻辑还是相对比较简单,下面看下测试

    LockTest.java

    package zklock;
     
    public class LockTest {
        public static void main(String[] args) {
            //模拟多个10个客户端
            for (int i=0;i<10;i++) {
                Thread thread = new Thread(new LockRunnable());
                thread.start();
            }
            
        }
        
        static class LockRunnable implements Runnable{
     
            @Override
            public void run() {
                AbstractLock zkLock = new SimpleZkLock();
                //AbstractLock zkLock = new HighPerformanceZkLock();
                zkLock.getLock();
                //模拟业务操作
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                zkLock.releaseLock();
            }
            
        }
    }

    zk实现的简单的分布式锁存在的性能问题

    二。高性能分布式锁

    1、思路:客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁操作,这个时候我们就需要创建顺序节点了

     

     HighPerformanceZkLock .java

    package zklock;
     
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
     
    import org.I0Itec.zkclient.IZkDataListener;
     
    /**
     * 高性能分布式锁
     * @author hongtaolong
     *
     */
    public class HighPerformanceZkLock extends AbstractLock {
     
        private static final String PATH = "/highPerformance_zklock";
        //当前节点路径
        private String currentPath;
        //前一个节点的路径
        private String beforePath;
        
        private CountDownLatch countDownLatch = null;
        
        public HighPerformanceZkLock() {
            //如果不存在这个节点,则创建持久节点
            if (!zkClient.exists(PATH)) {        
                zkClient.createPersistent(PATH);
            }
        }
        
        @Override
        public void releaseLock() {
            if (null != zkClient) {
                zkClient.delete(currentPath);
                zkClient.close();
            }
     
        }
     
        @Override
        public boolean tryLock() {
            //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            if (null == currentPath || "".equals(currentPath)) {
                //在path下创建一个临时的顺序节点
                currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
            }
            //获取所有的临时节点,并排序
            List<String> childrens = zkClient.getChildren(PATH);
            Collections.sort(childrens);
            if (currentPath.equals(PATH+"/"+childrens.get(0))) {
                return true;
            }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
                int pathLength = PATH.length();
                int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
                beforePath = PATH+"/"+childrens.get(wz-1);
            }
            return false;
        }
     
        @Override
        public void waitLock() {
            IZkDataListener lIZkDataListener = new IZkDataListener() {
                
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (null != countDownLatch){
                        countDownLatch.countDown();
                    }
                }
                
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    
                }
            };
            //监听前一个节点的变化
            zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
            if (zkClient.exists(beforePath)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
        }
     
    }

    这里只要帖高性能锁的代码了,AbstractLock没变化,LockTest中只要修改一行代码

    //AbstractLock zkLock = new SimpleZkLock();

    AbstractLock zkLock = new HighPerformanceZkLock();

     测试结果:

     转载自:https://blog.csdn.net/crazymakercircle/article/details/85956246

        https://blog.csdn.net/hongtaolong/java/article/details/88898875

    3.数据库锁

    mysql行锁。

    存储引擎使用innoDB

    行锁的使用方式为:

    Select * from student_game where id=3 for update;

    这样我们的业务代码大概是这样的:

    conn.setAutoCommit(false);//关闭自动提交
    select scores from test.student where id= 4 for update;//上锁
    //获取学生id,scores数据,查询es记录,等到总分。
    //更新学生得分表scores记录
    Conn.commit;//释放锁

    原文链接:https://blog.csdn.net/humanity11/java/article/details/93356344

    mysql分布式锁

    要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
         当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
         创建这样一张数据库表:
    CREATE TABLE `methodLock` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
      `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

    当我们想要锁住某个方法时,执行以下SQL:

    insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
    因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
    当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
    delete from methodLock where method_name ='method_name'

    上面这种简单的实现有以下几个问题:

    1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
    2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
    3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
    4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

    当然,我们也可以有其他方式解决上面的问题。
    1.数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
    2.没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
    3.非阻塞的?搞一个while循环,直到insert成功再返回成功。
    4.非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。


    链接:https://www.jianshu.com/p/7b57ebb25900

  • 相关阅读:
    Kafka单机环境部署
    kafka启动出现:Unsupported major.minor version 52.0 错误
    CentOs7.3 搭建 Redis-4.0.1 Cluster 集群服务
    Python ZKPython 安装
    zookeeper伪集群安装
    系统吞吐量(TPS)、用户并发量、性能测试概念和公式
    XDebug安装配置教程
    48 条高效率的 PHP 优化写法
    待处理bug
    phpstudy composer 使用安装
  • 原文地址:https://www.cnblogs.com/linhongwenBlog/p/13344031.html
Copyright © 2011-2022 走看看