当前流行的系统,就是分布式系统。所谓分布式,我个人理解,是很多的服务分布在不同的机器上,都是相同功能模块。但是容易出现一个问题,就是并发时的问题。
我们传统的锁,只能锁住一个服务器上的方法,让其在一个服务上同步,然后,分布式。怎么办。经上网查询资料后,抄袭+整理,得到这个结果:redis + spring 实现 注解式锁。
实现后,我们需要同步的方法,只需要加上标签@RedisLock()就可以了。
以下是实现步骤。在此,搭建spring + redis 的工程就不再详细说明。
首先是spring的配置加上
<!-- 切面开关 --> <aop:aspectj-autoproxy />
然后是redis相应配置。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="maxIdle" value="${redis.maxIdle}" /> <property name="maxTotal" value="${redis.maxActive}" /> <property name="maxWaitMillis" value="${redis.maxWait}" /> <property name="testOnBorrow" value="${redis.testOnBorrow}" /> </bean> <bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <property name="poolConfig" ref="poolConfig" /> <property name="port" value="${redis.port}" /> <property name="hostName" value="${redis.host}" /> <property name="password" value="${redis.pass}" /> </bean> <bean id="stringSerializer" class="org.springframework.data.redis.serializer.StringRedisSerializer"/> <bean id="jdkSerializationRedisSerializer" class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer"/> <bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate"> <property name="connectionFactory" ref="jedisConnectionFactory" /> <property name="keySerializer" ref="stringSerializer" /> <!-- <property name="valueSerializer" ref="jdkSerializationRedisSerializer"/> --> </bean> </beans>
再试pom文件引入相应jar包。在此不说。
直接上代码。
1、是共享类文件,就是我们会有很多方法都去调用的。
package com.iafclub.demo.commonData; import org.springframework.stereotype.Service; import com.iafclub.demo.aop.RedisLock; /**共享数据 * * @author chenweixian * */ @Service public class Mydata { private static int i = 0; @RedisLock() public void add(String key1) { i++; System.out.println(key1+"=-===========" + i); } }
2、是注解标签 声明实现@RedisLock(),具体锁的释放时间什么的,根据自己项目需要更改。
package com.iafclub.demo.aop; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <b>同步锁:</b><br/> * 主要作用是在服务器集群环境下保证方法的synchronize;<br/> * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/> * 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;<br/> * <br/> * <b>注意:</b><br/> * 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;<br/> * 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。 * * @author partner4java * */ @Target({ ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface RedisLock { /** * 锁的key<br/> * 如果想增加坑的个数添加非固定锁,可以在参数上添加@RedisLock注解,但是本参数是必写选项<br/> * redis key的拼写规则为 当前注解类+所在的方法 + synKey + @RedisLock<br/> * */ String synKey() default ""; /** * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/> * 单位毫秒,默认20秒<br/> * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/> * 但是没有比较强的业务要求下,不建议设置为0 */ long keepMills() default 20 * 1000; /** * 当获取锁失败,是继续等待还是放弃<br/> * 默认为继续等待 */ boolean toWait() default true; /** * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/> * 默认为10毫秒 * * @return */ long sleepMills() default 10; /** * 锁获取超时时间:<br/> * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出 * {@link java.util.concurrent.TimeoutException.TimeoutException} * ,可捕获此异常做相应业务处理;<br/> * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去; * * @return */ long maxSleepMills() default 60 * 1000; }
3、切面实现,注意,需要加上切面的注解,与annotation的拦截service或Component
package com.iafclub.demo.aop; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.log4j.Logger; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import com.opensymphony.oscache.util.StringUtil; /** * 锁的切面编程<br/> * 针对添加@RedisLock 注解的方法进行加锁 * * @author chenweixain * */ @Aspect @Component public class RedisLockAspect { private Logger logger = Logger.getLogger(getClass()); @Autowired private StringRedisTemplate stringRedisTemplate; @Pointcut("@annotation(com.iafclub.demo.aop.RedisLock)") // 用于拦截标签 private void anyMethod(){} @Around("anyMethod() && @annotation(lockInfo)") public Object lock(ProceedingJoinPoint pjp, RedisLock lockInfo) throws Throwable { String synKey = getSynKey(pjp, lockInfo.synKey()); boolean lock = false; Object obj = null; try { // 超时时间 long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills(); while (!lock) { long keepMills = System.currentTimeMillis() + lockInfo.keepMills(); lock = setIfAbsent(synKey, keepMills); // 得到锁,没有人加过相同的锁 if (lock) { obj = pjp.proceed(); } // 锁设置了没有超时时间 else if (lockInfo.keepMills() <= 0) { // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { throw new TimeoutException("获取锁资源等待超时"); } TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); } else { break; } } // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁 else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) { lock = true; obj = pjp.proceed(); } // 没有得到任何锁 else { // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { throw new TimeoutException("获取锁资源等待超时"); } TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); } // 放弃等待 else { break; } } } } catch (Exception e) { logger.error("锁异常", e); throw e; } finally { // 如果获取到了锁,释放锁 if (lock) { releaseLock(synKey); } } return obj; } /**获取包括方法参数上的key * * @param pjp * @param synKey * @return */ private String getSynKey(ProceedingJoinPoint pjp, String synKey) { StringBuffer synKeyBuffer = new StringBuffer(pjp.getSignature().getDeclaringTypeName()); synKeyBuffer.append(".").append(pjp.getSignature().getName()); if (!StringUtil.isEmpty(synKey)){ synKeyBuffer.append(".").append(synKey); } return synKeyBuffer.toString(); } public BoundValueOperations<String, String> getOperations(String key) { return stringRedisTemplate.boundValueOps(key); } /** * Set {@code value} for {@code key}, only if {@code key} does not exist. * <p> * See http://redis.io/commands/setnx * * @param key * must not be {@literal null}. * @param value * must not be {@literal null}. * @return */ public boolean setIfAbsent(String key, Long value) { return getOperations(key).setIfAbsent(value.toString()); } public long getLock(String key) { String time = getOperations(key).get(); if (time == null) { return 0; } return Long.valueOf(time); } public long getSet(String key, Long value) { String time = getOperations(key).getAndSet(value.toString()); if (time == null) { return 0; } return Long.valueOf(time); } public void releaseLock(String key) { stringRedisTemplate.delete(key); } }
4、测试类:
package test.iafclub.redis; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import test.iafclub.BaseTest; import com.iafclub.demo.commonData.Mydata; public class RedisTest2 extends BaseTest{ @Autowired private Mydata sysTest; @Test public void testHello() throws InterruptedException { for (int i = 0; i < 100; i++) { new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } sysTest.add("CHEN"); } }).start(); } TimeUnit.SECONDS.sleep(20); } @Test public void testHello2() throws InterruptedException{ sysTest.add("xxxxx"); TimeUnit.SECONDS.sleep(10); } }
加上切面与不加切面控制测试结果: