zoukankan      html  css  js  c++  java
  • 分布式锁

    1.redis锁

    1.工具类代码:

    package com.lhw.chche;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.dao.DataAccessException;
    import org.springframework.data.redis.connection.RedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisCluster;
    import redis.clients.jedis.JedisCommands;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class RedisLock {
        private final static Logger LOG = LoggerFactory.getLogger(RedisLock.class);
        static StringBuilder sb = new StringBuilder();
        static {
            sb.append("if redis.call("get",KEYS[1]) == ARGV[1] ");
            sb.append("then ");
            sb.append("    return redis.call("del",KEYS[1]) ");
            sb.append("else ");
            sb.append("    return 0 ");
            sb.append("end ");
        }
        public static final String UNLOCK_LUA = sb.toString();
        
        /**
         * redis加锁
         * @param key lock
         * @param expire
         * @return
         */
        public static String setLock(final String key, final long expire) {
            //System.out.println("1--------------setLock,key="+key);
            try {
                RedisCallback<String> callback = new RedisCallback<String>() {
                    @Override
                    public String doInRedis(RedisConnection connection) throws DataAccessException {
                        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                        return commands.set(key, key, "NX", "PX", expire);
                    }
                };
                String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
                //System.out.println("1--------------setLock,result="+result);
                if (StringUtils.isEmpty(result)) {
                    LOG.warn("key="+key+",获取锁失败。。。。。");
                    return null;
                }else {
                    if(LOG.isInfoEnabled()) {
                        LOG.info("key="+key+",获取锁成功。。。。。");
                    }
                    return key;
                }
            } catch (Exception e) {
                LOG.error("",e);
                LOG.warn("key="+key+",获取锁异常。。。。。");
            }
            return null;
        }
        /**
         * redis 解锁
         * @param key
         * @param requestId
         * @return
         */
        public static boolean releaseLock(String key, String requestId) {
            try {
                final List keys = new ArrayList();
                keys.add(key);
                final List args = new ArrayList();
                args.add(requestId);
    
                RedisCallback callback = new RedisCallback() {
                    @Override
                    public Long doInRedis(RedisConnection connection) throws DataAccessException {
                        Object nativeConnection = connection.getNativeConnection();
    
                        if ((nativeConnection instanceof JedisCluster)) {
                            return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
    
                        if ((nativeConnection instanceof Jedis)) {
                            return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, keys, args);
                        }
                        return Long.valueOf(0L);
                    }
                };
                Long result = (Long) RedisOperationConfig.getStringRedisTemplate().execute(callback);
    
                boolean bool = (result != null) && (result.longValue() > 0L);
                return bool;
            } catch (Exception e) {
    
            } finally {
            }
            return false;
        }
    
        /**
         * 获取redis
         * @param key
         * @return
         */
        public static String getLockValue(final String key){
            try {
                RedisCallback<String> callback = new RedisCallback<String>() {
                    @Override
                    public String doInRedis(RedisConnection connection) throws DataAccessException {
                        JedisCommands commands = (JedisCommands) connection.getNativeConnection();
                        return commands.get(key);
                    }
                };
                String result = RedisOperationConfig.getStringRedisTemplate().execute(callback);
                if (StringUtils.isEmpty(result)) {
                    LOG.warn("key="+key+",获取锁值失败。。。。。");
                    return null;
                }else {
                    if(LOG.isInfoEnabled()) {
                        LOG.info("key="+key+",获取锁值成功。。。。。");
                    }
                    return result;
                }
            } catch (Exception e) {
                LOG.error("",e);
                LOG.warn("key="+key+",获取锁异常。。。。。");
            }
            return null;
        }
    }

      

    2.使用场景顺序

    1 --> taskNo是一个任务的唯一值,先进行上锁.看是否成功,成功则继续

    2 --> taskNo+时间(年月日),先进行上锁.看是否成功,成功则继续,执行具体的任务

    3 --> 设置 2里面 taskNo+时间(年月日)的过期时间

    4 --> 释放 1里面 taskNo 的锁

    1.
            String rediskey = "com.lhw.lock."+taskNo;
            String lock = RedisLock.setLock(rediskey,outTime);
    2.
            String redisDealkey = "com.lhw.dealLock."+taskNo+"."+ymd;
            String dealLock = RedisLock.getLockValue(redisDealkey);
    3.
            //设置当天任务已执行的开关值,默认12小时过期时间
            RedisLock.setLock(redisDealkey,12*60*60*1000);
    4.
         RedisLock.releaseLock(rediskey,rediskey);

    2.zookeeper锁

    图解:公平锁和可重入锁 模型

    分布式锁的概念和原理,比较抽象难懂。如果用一个简单的故事来类比,估计就简单多了。

    很久以前,在一个村子有一口井,水质非常的好,村民们都抢着取井里的水。井就那么一口,村里的人很多,村民为争抢取水打架斗殴,甚至头破血流。

    问题总是要解决,于是村长绞尽脑汁,最终想出了一个凭号取水的方案。井边安排一个看井人,维护取水的秩序。

    说起来,秩序很简单,取水之前,先取号。号排在前面的,就可以先取水。先到的排在前面,那些后到的,没有排在最前面的人,一个一个挨着,在井边排成一队。取水示意图如下 :

    这种排队取水模型,就是一种锁的模型。排在最前面的号,拥有取水权,就是一种典型的独占锁。另外,先到先得,号排在前面的人先取到水,取水之后就轮到下一个号取水,至少,看起来挺公平的,说明它是一种公平锁。

    在公平独占锁的基础上,再进一步,看看可重入锁的模型。

    假定,取水时以家庭为单位,哪个家庭任何人拿到号,就可以排号取水,而且如果一个家庭有一个人拿到号,其它家人这时候过来打水不用再取号。新的排号取水示意图如下 :

    如上图的1号,老公有号,他的老婆来了,直接排第一个,妻凭夫贵。再看上图的2号,父亲正在打水,他的儿子和女儿也到井边了,直接排第二个,这个叫做子凭父贵。 等等,如果是同一个家庭,可以直接复用排号,不用重新取号从后面排起。

    以上这个故事模型,就是可以重入锁的模型。只要满足条件,同一个排号,可以用来多次取水。在锁的模型中,相当于一把锁,可以被多次锁定,这就叫做可重入锁。

    zookeeper分布式锁的原理

    理解了锁的原理后,就会发现,Zookeeper 天生就是一副分布式锁的胚子。

    首先,Zookeeper的每一个节点,都是一个天然的顺序发号器。

    在每一个节点下面创建子节点时,只要选择的创建类型是有序(EPHEMERAL_SEQUENTIAL 临时有序或者PERSISTENT_SEQUENTIAL 永久有序)类型,那么,新的子节点后面,会加上一个次序编号。这个次序编号,是上一个生成的次序编号加一

    比如,创建一个用于发号的节点“/test/lock”,然后以他为父亲节点,可以在这个父节点下面创建相同前缀的子节点,假定相同的前缀为“/test/lock/seq-”,在创建子节点时,同时指明是有序类型。如果是第一个创建的子节点,那么生成的子节点为/test/lock/seq-0000000000,下一个节点则为/test/lock/seq-0000000001,依次类推,等等。

     

    其次,Zookeeper节点的递增性,可以规定节点编号最小的那个获得锁。

    一个zookeeper分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT类型),然后每个要获得锁的线程都会在这个节点下创建个临时顺序节点,由于序号的递增性,可以规定排号最小的那个获得锁。所以,每个线程在尝试占用锁之前,首先判断自己是排号是不是当前最小,如果是,则获取锁。

    第三,Zookeeper的节点监听机制,可以保障占有锁的方式有序而且高效。

    每个线程抢占锁之前,先抢号创建自己的ZNode。同样,释放锁的时候,就需要删除抢号的Znode。抢号成功后,如果不是排号最小的节点,就处于等待通知的状态。等谁的通知呢?不需要其他人,只需要等前一个Znode 的通知就可以了。当前一个Znode 删除的时候,就是轮到了自己占有锁的时候。第一个通知第二个、第二个通知第三个,击鼓传花似的依次向后。

    Zookeeper的节点监听机制,可以说能够非常完美的,实现这种击鼓传花似的信息传递。具体的方法是,每一个等通知的Znode节点,只需要监听linsten或者 watch 监视排号在自己前面那个,而且紧挨在自己前面的那个节点。 只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,则获得锁。

    为什么说Zookeeper的节点监听机制,可以说是非常完美呢?

    一条龙式的首尾相接,后面监视前面,就不怕中间截断吗?比如,在分布式环境下,由于网络的原因,或者服务器挂了或则其他的原因,如果前面的那个节点没能被程序删除成功,后面的节点不就永远等待么?

    其实,Zookeeper的内部机制,能保证后面的节点能够正常的监听到删除和获得锁。在创建取号节点的时候,尽量创建临时znode 节点而不是永久znode 节点,一旦这个 znode 的客户端与Zookeeper集群服务器失去联系,这个临时 znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。

    说Zookeeper的节点监听机制,是非常完美的。还有一个原因。

    Zookeeper这种首尾相接,后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是每个节点挂掉,所有节点都去监听,然后做出反映,这样会给服务器带来巨大压力,所以有了临时顺序节点,当一个节点挂掉,只有它后面的那一个节点才做出反映。

    1.简单的分布式锁

    代码实现

    AbstractLock.java

    package zklock;
     
    import org.I0Itec.zkclient.ZkClient;
     
    public abstract class AbstractLock {
     
        //zk地址和端口
        public static final String ZK_ADDR = "192.168.0.230:2181";
        //超时时间
        public static final int SESSION_TIMEOUT = 10000;
        //创建zk
        protected ZkClient zkClient = new ZkClient(ZK_ADDR, SESSION_TIMEOUT);
        
        
        /**
         * 可以认为是模板模式,两个子类分别实现它的抽象方法
         * 1,简单的分布式锁
         * 2,高性能分布式锁
         */
        public void getLock() {
            String threadName = Thread.currentThread().getName();
            if (tryLock()) {
                System.out.println(threadName+"-获取锁成功");
            }else {
                System.out.println(threadName+"-获取锁失败,进行等待...");
                waitLock();
                //递归重新获取锁
                getLock();
            }
        }
        
        public abstract void releaseLock();
        
        public abstract boolean tryLock();
        
        public abstract void waitLock();
    }

     AbstractLock类是个抽象类,里面getLock使用模板模式,子类分别是简单的zk锁和高性能的zk锁

    SimpleZkLock.java

    package zklock;
     
    import java.util.concurrent.CountDownLatch;
     
    import org.I0Itec.zkclient.IZkDataListener;
     
    /**
     * 简单的分布式锁的实现
     */
    public class SimpleZkLock extends AbstractLock {
     
        private static final String NODE_NAME = "/test_simple_lock";
        
        private CountDownLatch countDownLatch;
        
        @Override
        public void releaseLock() {
            if (null != zkClient) {
                //删除节点
                zkClient.delete(NODE_NAME);
                zkClient.close();
                System.out.println(Thread.currentThread().getName()+"-释放锁成功");
            }
            
        }
     
        //直接创建临时节点,如果创建成功,则表示获取了锁,创建不成功则处理异常
        @Override
        public boolean tryLock() {
            if (null == zkClient) return false;
            try {
                zkClient.createEphemeral(NODE_NAME);
                return true;
            } catch (Exception e) {
                return false;
            }
        }
     
        @Override
        public void waitLock() {
            //监听器
            IZkDataListener iZkDataListener = new IZkDataListener() {
                //节点被删除回调
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                    }
                }
                //节点改变被回调
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    // TODO Auto-generated method stub
                    
                }
            };
            zkClient.subscribeDataChanges(NODE_NAME, iZkDataListener);
            //如果存在则阻塞
            if (zkClient.exists(NODE_NAME)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName()+" 等待获取锁...");
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            //删除监听
            zkClient.unsubscribeDataChanges(NODE_NAME, iZkDataListener);
        }
     
    }

    SimpleZkLock 表示简单的zk分布式锁,逻辑还是相对比较简单,下面看下测试

    LockTest.java

    package zklock;
     
    public class LockTest {
        public static void main(String[] args) {
            //模拟多个10个客户端
            for (int i=0;i<10;i++) {
                Thread thread = new Thread(new LockRunnable());
                thread.start();
            }
            
        }
        
        static class LockRunnable implements Runnable{
     
            @Override
            public void run() {
                AbstractLock zkLock = new SimpleZkLock();
                //AbstractLock zkLock = new HighPerformanceZkLock();
                zkLock.getLock();
                //模拟业务操作
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                zkLock.releaseLock();
            }
            
        }
    }

    zk实现的简单的分布式锁存在的性能问题

    二。高性能分布式锁

    1、思路:客户端在抢锁的时候进行排队,客户端只要监听它前一个节点的变化就行,如果前一个节点释放了锁,客户端才去进行抢锁操作,这个时候我们就需要创建顺序节点了

     

     HighPerformanceZkLock .java

    package zklock;
     
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
     
    import org.I0Itec.zkclient.IZkDataListener;
     
    /**
     * 高性能分布式锁
     * @author hongtaolong
     *
     */
    public class HighPerformanceZkLock extends AbstractLock {
     
        private static final String PATH = "/highPerformance_zklock";
        //当前节点路径
        private String currentPath;
        //前一个节点的路径
        private String beforePath;
        
        private CountDownLatch countDownLatch = null;
        
        public HighPerformanceZkLock() {
            //如果不存在这个节点,则创建持久节点
            if (!zkClient.exists(PATH)) {        
                zkClient.createPersistent(PATH);
            }
        }
        
        @Override
        public void releaseLock() {
            if (null != zkClient) {
                zkClient.delete(currentPath);
                zkClient.close();
            }
     
        }
     
        @Override
        public boolean tryLock() {
            //如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
            if (null == currentPath || "".equals(currentPath)) {
                //在path下创建一个临时的顺序节点
                currentPath = zkClient.createEphemeralSequential(PATH+"/", "lock");
            }
            //获取所有的临时节点,并排序
            List<String> childrens = zkClient.getChildren(PATH);
            Collections.sort(childrens);
            if (currentPath.equals(PATH+"/"+childrens.get(0))) {
                return true;
            }else {//如果当前节点不是排名第一,则获取它前面的节点名称,并赋值给beforePath
                int pathLength = PATH.length();
                int wz = Collections.binarySearch(childrens, currentPath.substring(pathLength+1));
                beforePath = PATH+"/"+childrens.get(wz-1);
            }
            return false;
        }
     
        @Override
        public void waitLock() {
            IZkDataListener lIZkDataListener = new IZkDataListener() {
                
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    if (null != countDownLatch){
                        countDownLatch.countDown();
                    }
                }
                
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    
                }
            };
            //监听前一个节点的变化
            zkClient.subscribeDataChanges(beforePath, lIZkDataListener);
            if (zkClient.exists(beforePath)) {
                countDownLatch = new CountDownLatch(1);
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            zkClient.unsubscribeDataChanges(beforePath, lIZkDataListener);
        }
     
    }

    这里只要帖高性能锁的代码了,AbstractLock没变化,LockTest中只要修改一行代码

    //AbstractLock zkLock = new SimpleZkLock();

    AbstractLock zkLock = new HighPerformanceZkLock();

     测试结果:

     转载自:https://blog.csdn.net/crazymakercircle/article/details/85956246

        https://blog.csdn.net/hongtaolong/java/article/details/88898875

    3.数据库锁

    mysql行锁。

    存储引擎使用innoDB

    行锁的使用方式为:

    Select * from student_game where id=3 for update;

    这样我们的业务代码大概是这样的:

    conn.setAutoCommit(false);//关闭自动提交
    select scores from test.student where id= 4 for update;//上锁
    //获取学生id,scores数据,查询es记录,等到总分。
    //更新学生得分表scores记录
    Conn.commit;//释放锁

    原文链接:https://blog.csdn.net/humanity11/java/article/details/93356344

    mysql分布式锁

    要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。
         当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。
         创建这样一张数据库表:
    CREATE TABLE `methodLock` (
      `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `method_name` varchar(64) NOT NULL DEFAULT '' COMMENT '锁定的方法名',
      `desc` varchar(1024) NOT NULL DEFAULT '备注信息',
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '保存数据时间,自动生成',
      PRIMARY KEY (`id`),
      UNIQUE KEY `uidx_method_name` (`method_name `) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

    当我们想要锁住某个方法时,执行以下SQL:

    insert into methodLock(method_name,desc) values (‘method_name’,‘desc’)
    因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
    当方法执行完毕之后,想要释放锁的话,需要执行以下Sql:
    delete from methodLock where method_name ='method_name'

    上面这种简单的实现有以下几个问题:

    1、这把锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
    2、这把锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
    3、这把锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
    4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

    当然,我们也可以有其他方式解决上面的问题。
    1.数据库是单点?搞两个数据库,数据之前双向同步。一旦挂掉快速切换到备库上。
    2.没有失效时间?只要做一个定时任务,每隔一定时间把数据库中的超时数据清理一遍。
    3.非阻塞的?搞一个while循环,直到insert成功再返回成功。
    4.非重入的?在数据库表中加个字段,记录当前获得锁的机器的主机信息和线程信息,那么下次再获取锁的时候先查询数据库,如果当前机器的主机信息和线程信息在数据库可以查到的话,直接把锁分配给他就可以了。


    链接:https://www.jianshu.com/p/7b57ebb25900

  • 相关阅读:
    形象理解ERP(转)
    禁用windows server 2008 域密码复杂性要求策略
    How to adding find,filter,remove filter on display method Form
    Windows Server 2008 R2激活工具
    How to using bat command running VS development SSRS report
    Creating Your First Mac AppGetting Started
    Creating Your First Mac AppAdding a Track Object 添加一个 Track 对象
    Creating Your First Mac AppImplementing Action Methods 实现动作方法
    Creating Your First Mac AppReviewing the Code 审查代码
    Creating Your First Mac AppConfiguring the window 设置窗口
  • 原文地址:https://www.cnblogs.com/linhongwenBlog/p/13344031.html
Copyright © 2011-2022 走看看