jedis原生方式
相关依赖
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.5.0</version> </dependency>
代码如下
package jedisLock; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; /** * @author asd on 2019-07-16 * @description jedisUtil */ public class JedisUtil { private static JedisPool jedisPool; static { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(10000); jedisPoolConfig.setMaxWaitMillis(10000); jedisPoolConfig.setMaxIdle(1000); jedisPool = new JedisPool(jedisPoolConfig,"localhost",6379,100000); } public static Jedis getRerouse(){ return jedisPool.getResource(); } }
package jedisLock; import redis.clients.jedis.Jedis; import java.util.Collections; /** * @author hehang on 2019-07-16 * @description 使用jedis */ public class JedisDistributeLock { private static final String SUCCESS ="OK"; private static final String SET_IF_NOT_EXIST ="NX"; private static final String SET_WITH_EXPIRE_TIME ="EX"; //释放锁成功标示 private static final Long RELEASE_SUCCESS = 1L; //获取锁时的睡眠等待时间片,单位毫秒 private static final long SLEEP_PER = 5; //默认过期时间 public static final int DEFAULT_EXPIRE_1000_Milliseconds = 1000; private static Boolean tryGetLock(Jedis jedis,String key,String requestId,int expireTime){ String result = jedis.set(key,requestId,SET_IF_NOT_EXIST,SET_WITH_EXPIRE_TIME,expireTime); if(SUCCESS.equals(result)){ return true; } return false; } private static Boolean releaseLock(Jedis jedis,String key,String requestId){ String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(key),Collections.singletonList(requestId)); if(RELEASE_SUCCESS.equals(result)){ return true; } return false; } public static void lock(String key,String value,int expireTime){ try (Jedis jedis = JedisUtil.getRerouse()){ while(!tryGetLock(jedis,key,value,expireTime)){ //缺点,睡眠期间其它线程释放锁不能及时收到 try { Thread.sleep(SLEEP_PER); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static boolean tryLock(String key,String value,int expireTime){ try (Jedis jedis = JedisUtil.getRerouse()){ return tryGetLock(jedis,key,value,expireTime); } } public static void unlock(String key,String value){ try (Jedis jedis = JedisUtil.getRerouse()){ releaseLock(jedis,key,value); } } }
package jedisLock; import java.util.UUID; /** * @author hehang on 2019-07-17 * @description redisService */ public class JedisLockService { private static String lockkey = "lock"; private ThreadLocal<String> threadLocal = new ThreadLocal<>(); private void setThreadLocal(String uuid){ threadLocal.set(uuid); } private String getThreadLocal(){ return threadLocal.get(); } public void lock(){ String value = UUID.randomUUID().toString(); JedisDistributeLock.lock(lockkey,value,5000); setThreadLocal(value); } public void unlock(){ String value = getThreadLocal(); JedisDistributeLock.unlock(lockkey,value); } }
测试相关类,为了方便在SellTicketTask定义一个成员变量作为共享变量,同时三个线程取改变其值,来模拟分布式场景
package jedisLock; /** * @author hehang on 2019-07-17 * @description 卖票 */ public class SellTicketTask implements Runnable { private int tickets = 100; JedisLockService jedisLockService = new JedisLockService(); @Override public void run() { while (tickets>0){ try { jedisLockService.lock(); if(tickets>0){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "正在出售第 " + tickets-- + " 张票"); } }finally { jedisLockService.unlock(); } } System.out.println(Thread.currentThread().getName()+"线程结束"); } }
package jedisLock; /** * @author hehang on 2019-07-17 * @description 测试类 */ public class JedisLockTest { public static void main(String[] args) { SellTicketTask sellTicketTask = new SellTicketTask(); for (int i = 0; i <3 ; i++) { new Thread(sellTicketTask,"窗口"+i).start(); } } }
redission方式
相关依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.9.1</version> </dependency>
redission方式使用相对简单,其底层也是采用lua脚本
package redission; import org.redisson.Redisson; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.redisson.config.Config; /** * @author hehang on 2019-07-17 * @description Redisson lock * 推荐使用成熟的开源项目,实现分布式锁 */ public class SellTicketTask implements Runnable{ private int tickets = 100; private RLock lock = getRlock(); private RLock getRlock(){ Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); RedissonClient client = Redisson.create(config); RLock rLock = client.getLock("zxczxc"); return rLock; } @Override public void run() { while (tickets>0){ try { lock.lock(); if(tickets>0){ try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "正在出售第 " + tickets-- + " 张票"); } }finally { lock.unlock(); } } System.out.println(Thread.currentThread().getName()+"线程结束"); } }
package redission; /** * @author hehang on 2019-07-17 * @description asd */ public class TestRedissonLock { public static void main(String[] args) { SellTicketTask sellTicketTask = new SellTicketTask(); for (int i = 0; i < 3; i++) { new Thread(sellTicketTask,"窗口" +i).start(); } } }
springboot2.0.x配置lettuce
相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <!-- 2.0以上版本默认连接池是lettuce --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.5.0</version> </dependency>
相关代码,关键点是通过lua脚本来获取和释放锁、redis发布订阅机制来通知其它锁释放、本地lock来避免同一个服务内竞争锁减少redis压力。
package com.jlwj.redislock.Config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author hehang on 2019-07-17 * @description asd */ @Configuration @EnableCaching public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { //设置序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer); // key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // value序列化 redisTemplate.setHashKeySerializer(stringSerializer); // Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); // Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
package com.jlwj.redislock.springRedisLock; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * @author hehang on 2019-07-17 * @description asd * */ public class SpringRedisLock { private static final RedisScript<Long> lua_lock = new DefaultRedisScript<>("if redis.call("setnx", KEYS[1], KEYS[2]) == 1 then return redis.call("pexpire", KEYS[1], KEYS[3]) else return 0 end", Long.class); private static final RedisScript<Long> lua_unlock = new DefaultRedisScript<>("if redis.call("get",KEYS[1]) == KEYS[2] then return redis.call("del",KEYS[1]) else return -1 end", Long.class); RedisTemplate redisTemplate; String resourceName; Integer timeOut =1; ThreadLocal<String> threadLocal = new ThreadLocal<>(); // 多台机器的情况下,会出现大量的等待,加重redis的压力。 在lock方法上,加入同步关键字。单机同步,多机用redis Lock lock = new ReentrantLock(); public SpringRedisLock(String resourceName,RedisTemplate redisTemplate,int timeOut){ this.resourceName ="lock_" + resourceName; this.redisTemplate = redisTemplate; this.timeOut = timeOut; } public void lock() { try { lock.lock(); while(!tryLock()){ redisTemplate.execute(new RedisCallback<Boolean>() { @Override public Boolean doInRedis(RedisConnection connection) throws DataAccessException { CountDownLatch waiter = new CountDownLatch(1); // 等待通知结果,使用jedis在此处会阻塞 connection.subscribe((message, pattern) -> { // 收到通知,不管结果,立刻再次抢锁 waiter.countDown(); }, (resourceName + "_unlock_channel").getBytes()); try { // 等待一段时间,超过这个时间都没收到消息,肯定有问题 waiter.await(timeOut, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return true; //随便返回一个值都没问题 } }); System.out.println("继续下一次循环"); } } finally { lock.unlock(); } } public boolean tryLock() { String value =UUID.randomUUID().toString(); List<String> keys = Arrays.asList(resourceName, value, String.valueOf(1000)); Long result = (Long) redisTemplate.execute(lua_lock,keys); if(result==1){ System.out.println(Thread.currentThread().getName() + "获取到锁"); threadLocal.set(value); return true; }else{ return false; } } public void unlock() { //1、 要比对内部的值,同一个线程,才能够去释放锁。 2、 同时发出通知 String value = threadLocal.get(); try { List<String> keys = Arrays.asList(resourceName, value); Long result = (Long) redisTemplate.execute(lua_unlock,keys); if(result !=-1){ System.out.println(Thread.currentThread().getName() + "释放锁"); redisTemplate.execute(new RedisCallback() { @Override public Object doInRedis(RedisConnection redisConnection) throws DataAccessException { redisConnection.publish((resourceName + "_unlock_channel").getBytes(),"".getBytes()); return null; } }); } }finally { threadLocal.remove(); } } }
package com.jlwj.redislock.springRedisLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; /** * @author hehang on 2019-07-17 * */ @Service public class SellTicketSpringService{ @Autowired RedisTemplate redisTemplate; private SpringRedisLock springRedisLock; private Long tickets = 100L; @PostConstruct public void init() { springRedisLock = new SpringRedisLock("test_lock",redisTemplate, 1); } public boolean buyTicket(String userId) { Boolean status = false; try { springRedisLock.lock(); if(tickets>0){ System.out.println(userId + "正在买第 " + tickets-- + " 张票"); status = true; } } catch (Exception e) { e.printStackTrace(); } finally { springRedisLock.unlock(); } return status; } }
测试类
package com.jlwj.redislock; import com.jlwj.redislock.springRedisLock.SellTicketSpringService; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.CountDownLatch; @RunWith(SpringRunner.class) @SpringBootTest public class RedisLockApplicationTests { long timed = 0L; @Before public void start() { System.out.println("开始测试"); timed =System.currentTimeMillis(); } @After public void end() { System.out.println("结束测试,执行时长:" + (System.currentTimeMillis() - timed)); } @Autowired private SellTicketSpringService sellTicketSpringService; /** * 注意切换redis客户端为lettuce */ @Test public void buy() throws InterruptedException { // 模拟的请求数量 final int threadNum = 100; // 倒计数器,用于模拟高并发(信号枪机制) CountDownLatch cdl = new CountDownLatch(threadNum); // 循环创建N个线程 Thread[] threads = new Thread[threadNum]; for (int i = 0; i < threadNum; i++) { String userId = "Tony" +i; Thread thread = new Thread(() -> { try { // 等待cdl值为0,也就是其他线程就绪后,再运行后续的代码 cdl.await(); // http请求实际上就是多线程调用这个方法 sellTicketSpringService.buyTicket(userId); } catch (InterruptedException e) { e.printStackTrace(); } }); threads[i] = thread; thread.start(); // 倒计时器 减一 cdl.countDown(); } // 等待上面所有线程执行完毕之后,结束测试 for (Thread thread : threads) { thread.join(); } } }