zoukankan      html  css  js  c++  java
  • Redis: 用redis实现分布式锁,秒杀案例(转)

      分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。虽然网上已经有各种介绍Redis分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇将介绍如何正确地实现Redis分布式锁。 

      首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

    1. 互斥性。在任意时刻,只有一个客户端能持有锁。

    2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

    3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。

    4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。 


     分布式锁的简单实现代码:

    package com.gdut.redis.lock.test1;
    
    import java.util.Collections;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    
    public class DistributedLock {
        private static final String LOCK_SUCCESS = "OK";
        private static final String SET_IF_NOT_EXIST = "NX";
        private static final String SET_WITH_EXPIRE_TIME = "PX";
        private static final Long RELEASE_SUCCESS = 1L;
    
        private static void validParam(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
            if (null == jedisPool) {
                throw new IllegalArgumentException("jedisPool obj is null");
            }
    
            if (null == lockKey || "".equals(lockKey)) {
                throw new IllegalArgumentException("lock key  is blank");
            }
    
            if (null == requestId || "".equals(requestId)) {
                throw new IllegalArgumentException("requestId is blank");
            }
    
            if (expireTime < 0) {
                throw new IllegalArgumentException("expireTime is not allowed less zero");
            }
        }
    
        /**
         * 
         * @param jedis
         * @param lockKey
         * @param requestId
         * @param expireTime
         * @return
         */
        public boolean tryLock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
    
            validParam(jedisPool, lockKey, requestId, expireTime);
    
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    
                if (LOCK_SUCCESS.equals(result)) {
                    return true;
                }
            } catch (Exception e) {
                throw e;
            } finally {
                if (null != jedis) {
                    jedis.close();
                }
            }
    
            return false;
        }
    
        /**
         * 
         * @param jedis
         * @param lockKey
         * @param requestId
         * @param expireTime
         */
        public void lock(JedisPool jedisPool, String lockKey, String requestId, int expireTime) {
    
            validParam(jedisPool, lockKey, requestId, expireTime);
    
            while (true) {
                if (tryLock(jedisPool, lockKey, requestId, expireTime)) {
                    System.out.println("lock  "+ Thread.currentThread().getName()+ " requestId:" + requestId);
                    return;
                }
            }
        }
    
        /**
         * 
         * @param jedis
         * @param lockKey
         * @param requestId
         * @return
         */
        public boolean unLock(JedisPool jedisPool, String lockKey, String requestId) {
    
            validParam(jedisPool, lockKey, requestId, 1);
    
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    
            Jedis jedis = null;
            try {
    
                jedis = jedisPool.getResource();
                Object result = jedis.eval(script, Collections.singletonList(lockKey),
                        Collections.singletonList(requestId));
    
                if (RELEASE_SUCCESS.equals(result)) {
                    System.out.println("unlock  "+ Thread.currentThread().getName()+ " requestId:" + requestId);
                    return true;
                }
    
            } catch (Exception e) {
                throw e;
            } finally {
                if (null != jedis) {
                    jedis.close();
                }
            }
    
            return false;
    
        }
    
    }

     说明:String redis.clients.jedis.Jedis.set(String key, String value, String nxxx, String expx, int time)  方法参数说明

    • 其中前面两个是key,value值;
    • nxxx为模式,这里我们设置为NX,意思是说如果key不存在则插入该key对应的value并返回OK,否者什么都不做返回null;
    • 参数expx这里我们设置为PX,意思是设置key的过期时间为time 毫秒

      通过tryLock方法尝试获取锁,内部是具体调用Redis的set方法,多个线程同时调用tryLock时候会同时调用set方法,但是set方法本身是保证原子性的,对应同一个key来说,多个线程调用set方法时候只有一个线程返回OK,其它线程因为key已经存在会返回null,所以返回OK的线程就相当与获取到了锁,其它返回null的线程则相当于获取锁失败。

      另外这里我们要保证value(requestId)值唯一是为了保证只有获取到锁的线程才能释放锁,这个下面释放锁时候会讲解。

      通过lock 方法让使用tryLock获取锁失败的线程本地自旋转重试获取锁,这类似JUC里面的CAS。

      Redis有一个叫做eval的函数,支持Lua脚本执行,并且能够保证脚本执行的原子性,也就是在执行脚本期间,其它执行redis命令的线程都会被阻塞。这里解锁时候使用下面脚本:

    if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
    

      其中keys[1]为unLock方法传递的key,argv[1]为unLock方法传递的requestId;脚本redis.call(‘get’, KEYS[1])的作用是获取key对应的value值,这里会返回通过Lock方法传递的requetId, 然后看当前传递的RequestId是否等于key对应的值,等于则说明当前要释放锁的线程就是获取锁的线程,则继续执行redis.call(‘del’, KEYS[1])脚本,删除key对应的值。


     测试刚才实现的分布式锁

      例子中使用50个线程模拟秒杀一个商品,使用–运算符来实现商品减少,从结果有序性就可以看出是否为加锁状态。

      模拟秒杀服务,在其中配置了jedis线程池,在初始化的时候传给分布式锁,供其使用。

    package com.gdut.redis.lock.test1;
    
    import java.util.UUID;
    
    import com.gdut.redis.lock.test1.DistributedLock;
    
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class Service1 {
        private static JedisPool pool = null;
        private DistributedLock lock = new DistributedLock();
        
        static {
            JedisPoolConfig config = new JedisPoolConfig();
            // 设置最大连接数
            config.setMaxTotal(500);
            // 设置最大空闲数
            config.setMaxIdle(100);
            // 设置最大等待时间
            config.setMaxWaitMillis(1000 * 100);
            // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
            config.setTestOnBorrow(true);
            pool = new JedisPool(config, "127.0.0.1", 6379, 300000);
        }
        
        public void seckill() throws InterruptedException {
            String requestId = UUID.randomUUID().toString();
            lock.lock(pool, "resource", requestId, 3000);
            lock.unLock(pool, "resource", requestId);
        }
    }

    模拟线程进行秒杀服务:

    package com.gdut.redis.lock.test1;
    
    import com.gdut.redis.lock.test1.Service1;
    
    public class TaskThread extends Thread {
        private Service1 service;
        
        public TaskThread(Service1 service) {
            this.service = service;
        }
        
        @Override
        public void run() {
            try {    
                synchronized (this) {
                    service.seckill();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
            Service1 service = new Service1();
            for (int i = 0; i < 400; i++) {
                TaskThread thread = new TaskThread(service);
                thread.start();
            }
        }
    
    }

    console结果:

    一共800行输出,lock 和unlock的输出都是400行,表示400个线程都获得了锁和释放了锁


     总结:

      本文使用redis单实例结合redis的set方法和eval函数实现了一个简单的分布式锁,但是这个实现还是明显有问题的。虽然使用set方法设置了超时时间,以避免线程获取到锁后redis挂了后锁没有被释放的情况,但是超时时间设置为多少合适那?如果设置太小,可能会存在线程获取锁后执行业务逻辑时间大于锁超时时间,那么就会存在逻辑还没执行完,锁已经因为超时自动释放了,而其他线程可能获取到锁,那么之前获取锁的线程的业务逻辑的执行就没有保证原子性。

      另外还有一个问题是Lock方法里面是自旋调用tryLock进行重试,这就会导致像JUC中的AtomicLong一样,在高并发下多个线程竞争同一个资源时候造成大量线程占用cpu进行重试操作。这时候其实可以随机生成一个等待时间,等时间到后在进行重试,以减少潜在的同时对一个资源进行竞争的并发量。

    资料:http://ifeve.com/redis-distributedlock/

  • 相关阅读:
    【APUE | 10】函数signal
    【C++ Primer | 15】C++虚函数表剖析②
    【C++ Primer | 15】C++类内存分布
    VMware虚拟机 Ubuntu 16.04 安装
    主题
    【C++ Primer | 15】构造函数与拷贝控制
    08 IO库
    001 库函数【01】
    DataTable序列化及反序列化Json
    DbHelper简单的使用
  • 原文地址:https://www.cnblogs.com/myseries/p/10785275.html
Copyright © 2011-2022 走看看