场景:我们对于需要大量计算的场景,希望将结果缓存起来,然后我们一起来实现一个缓存服务。即对于一个相同的输入,它的输出是不变的(也可以短时间不变)。
实现说明:这里实现采用GuavaCache+装饰器模式。
首先设计一个缓存服务接口。
public interface CacheableService<I, O> { /** * 计算服务 * @param i * @return * @throws Exception */ O doService(I i) throws Exception; }
这里定义了一个缓存服务接口,这里的key和Hashmap的key一样,需要覆写equals和hashcode方法。
public class CacheableServiceWrapper<I , O> implements CacheableService<I, O>, GlobalResource { /** * 日志 */ private final static Logger LOGGER = LoggerFactory .getLogger(CacheableServiceWrapper.class); /** * 缓存大小 */ private int MAX_CACHE_SIZE = 20; /** * 出现异常的时候重试,默认不重试 */ private boolean retryOnExp = false; /** * 重试次数,默认为0,即不重试 */ private int retryTimes = 0; /** * 默认30分钟 */ private long expireTimeWhenAccess = 30 * 60; /** * 缓存 */ private LoadingCache<I, Future<O>> cache = null; private CacheableService<I, O> cacheableService = null; /** * Calculate o. * * @param i the * @return the o * @throws Exception the exception */ public O doService(final I i) throws Exception { Assert.notNull(cacheableService, "请设置好实例"); int currentTimes = 0; while (currentTimes <= retryTimes) { try { Future<O> oFuture = cache.get(i); return oFuture.get(); } catch (Exception e) { if (!retryOnExp) { throw e; } currentTimes++; LoggerUtils.info(LOGGER, "第", currentTimes, "重试,key=", i); } } throw new Exception("任务执行失败"); } /** * 提交计算任务 * * @param i * @return */ private Future<O> createTask(final I i) { Assert.notNull(cacheableService, "请设置好实例"); LoggerUtils.info(LOGGER, "提交任务,key=", i); LoggerUtils.info(LOGGER, "当前cache=", JSON.toJSONString(cache)); Future<O> resultFuture = THREAD_POOL.submit(new Callable<O>() { public O call() throws Exception { return cacheableService.doService(i); } }); return resultFuture; } /** * 构造函数 */ public CacheableServiceWrapper(CacheableService<I, O> cacheableService, int maxCacheSize, long expireTime) { this.cacheableService = cacheableService; this.MAX_CACHE_SIZE = maxCacheSize; this.expireTimeWhenAccess = expireTime; cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS) .build(new CacheLoader<I, Future<O>>() { public Future<O> load(I key) throws ExecutionException { LoggerUtils.warn(LOGGER, "get Element from cacheLoader"); return createTask(key); } ; }); } /** * 构造函数 */ public CacheableServiceWrapper(CacheableService<I, O> cacheableService) { this.cacheableService = cacheableService; cache = CacheBuilder.newBuilder().maximumSize(MAX_CACHE_SIZE) .expireAfterAccess(expireTimeWhenAccess, TimeUnit.SECONDS) .build(new CacheLoader<I, Future<O>>() { public Future<O> load(I key) throws ExecutionException { LoggerUtils.warn(LOGGER, "get Element from cacheLoader"); return createTask(key); } ; }); } /** * Setter method for property <tt>retryTimes</tt>. * * @param retryTimes value to be assigned to property retryTimes */ public void setRetryTimes(int retryTimes) { this.retryTimes = retryTimes; } /** * Setter method for property <tt>retryOnExp</tt>. * * @param retryOnExp value to be assigned to property retryOnExp */ public void setRetryOnExp(boolean retryOnExp) { this.retryOnExp = retryOnExp; } }
这个装饰器就是最主要的内容了,实现了对缓存服务的输入和输出的缓存。这里先说明下中间几个重要的属性:
MAX_CACHE_SIZE :缓存空间的大小
retryOnExp :当缓存服务发生异常的时候,是否发起重试
retryTimes :当缓存服务异常需要重试的时候,重新尝试的最大上限。
expireTimeWhenAccess : 缓存失效时间,当key多久没有访问的时候,淘汰数据
然后是doService采用了Guava的缓存机制,当获取缓存为空的时候,会自动去build缓存,这个操作是原子化的,所以不用自己去采用ConcurrentHashmap的putIfAbsent方法去做啦~~~
这里面实现了最主要的逻辑,就是获取缓存,然后去get数据,然后如果异常,根据配置去重试。
好啦现在咱们去测试啦
public class CacheableCalculateServiceTest { private CacheableService<String, String> calculateService; @Before public void before() { CacheableServiceWrapper<String, String> wrapper = new CacheableServiceWrapper<String, String>( new CacheableService<String, String>() { public String doService(String i) throws Exception { Thread.sleep(999); return i + i; } }); wrapper.setRetryOnExp(true); wrapper.setRetryTimes(2); calculateService = wrapper; } @Test public void test() throws Exception { MutiThreadRun.init(5).addTaskAndRun(300, new Callable<String>() { public String call() throws Exception { return calculateService.doService("1"); } }); }
这里我们为了模拟大量计算的场景,我们将线程暂停了999ms,然后使用5个线程,执行任务999次,结果如下:
2016-08-24 02:00:18:848 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader 2016-08-24 02:00:20:119 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任务,key=1 2016-08-24 02:00:20:122 com.zhangwei.learning.calculate.CacheableServiceWrapper 当前cache={} 2016-08-24 02:00:21:106 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0 2016-08-24 02:00:21:914 com.zhangwei.learning.run.MutiThreadRun 任务执行完毕,执行时间3080ms,共有300个任务,执行异常0次
可以看到,由于key一样,只执行了一次计算,然后剩下299都是从缓存中获取的。
现在我们修改为5个线程,执行300000次。
2016-08-24 02:03:15:013 com.zhangwei.learning.calculate.CacheableServiceWrapper get Element from cacheLoader 2016-08-24 02:03:16:298 com.zhangwei.learning.calculate.CacheableServiceWrapper 提交任务,key=1 2016-08-24 02:03:16:300 com.zhangwei.learning.calculate.CacheableServiceWrapper 当前cache={} 2016-08-24 02:03:17:289 com.zhangwei.learning.jedis.JedisPoolMonitorTask poolSize=500 borrowed=0 idle=0 2016-08-24 02:03:18:312 com.zhangwei.learning.run.MutiThreadRun 任务执行完毕,执行时间3317ms,共有300000个任务,执行异常0次
发现,执行时间没啥区别啦~~~~缓存的效果真是棒棒的~~
PS:我的个人svn地址:http://code.taobao.org/p/learningIT/wiki/index/ 有兴趣的可以看下啦~
后面我们再看基于注解去实现缓存~~~
好啦继续更新,我们使用注解,来实现缓存,首先我们的前提还是跟上面的一样,是对方法做缓存,也就是将方法的输入到输出的映射做缓存。
首先来个注解:
@Target({ ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Cache { /** * 是否打印 * @return */ public boolean enabled() default true; /** * Cache type cache type. * * @return the cache type */ public CacheType cacheType() default CacheType.LOCAL; }
该注解是注解在方法上的
package com.zhangwei.learning.utils.cache; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.zhangwei.learning.model.ToString; import com.zhangwei.learning.utils.log.LoggerUtils; import org.slf4j.LoggerFactory; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; /** * 用于缓存的key,如果接口需要缓存,那么复杂对象参数都需要实现这个接口 * Created by Administrator on 2016/8/22. */ public class CacheKey extends ToString { /** * The A class. */ private String classPath; /** * The Method. */ private Method method; /** * The Input params. */ private List<Object> inputParams; /** * Instantiates a new Cache key. * * @param clazz the clazz * @param method the method * @param inputs the inputs */ public CacheKey(Class clazz, Method method, Object[] inputs) { this.classPath = clazz.getName(); this.method = method; List<Object> list = Lists.newArrayList(); if(inputs==null || inputs.length==0){ inputParams = list; } for(Object o : inputs){ list.add(o); } inputParams = list; } /** * Equals boolean. * * @param obj the obj * @return the boolean */ @Override public boolean equals(Object obj) { if (obj == null || !(obj instanceof CacheKey)) { return false; } CacheKey key = (CacheKey) obj; if (classPath.equals(key.getClassPath()) && method.equals(key.getMethod())) { if (key.getInputParams().size() != getInputParams().size()) { return false; } for (int i = 0; i < inputParams.size(); i++) { Object param = getInputParams().get(i); //如果有自定义的convertor,那么使用自定义的convertor ObjEqualsConvertor convertor = CacheInterceptor.getConvertors().get(param.getClass().getName()); if(convertor !=null){ if(!convertor.extraEquals(param,key.getInputParams().get(i))){ return false; } continue; } if (!getInputParams().get(i).equals(key.getInputParams().get(i))) { return false; } } return true; } return false; } /** * Hash code int. * * @return the int */ @Override public int hashCode() { return classPath.hashCode()+method.hashCode()+inputParams.hashCode(); } /** * Gets class path. * * @return the class path */ public String getClassPath() { return classPath; } /** * Sets class path. * * @param classPath the class path */ public void setClassPath(String classPath) { this.classPath = classPath; } /** * Gets method. * * @return the method */ public Method getMethod() { return method; } /** * Sets method. * * @param method the method */ public void setMethod(Method method) { this.method = method; } /** * Gets input params. * * @return the input params */ public List<Object> getInputParams() { return inputParams; } /** * Sets input params. * * @param inputParams the input params */ public void setInputParams(List<Object> inputParams) { this.inputParams = inputParams; } }
我们要做缓存,肯定要有个key,这里就是我们定义的key,最主要的是我们使用了一个专门的类,主要包含调用的类、方法、以及入参。这里有下面几个需要注意的点:
1、需要修改equals方法,这点跟hashmap自定义key一样。
2、比较类的时候直接用class全名。如果用class的equals方法,有可能class地址不一致导致判断有问题。这里method的equals方法已经是覆写了,所以没问题。
3、hashcode使用三个参数合起来的hashcode,这样尽量让key散列到不同的捅,如果用classpath的,那么如果这个类调用量很大,其他的类调用很少,那么桶分布就很不均匀了。
4、入参都需要注意下equals方法,但是对于有些类我们没有办法修改它的equals方法,这个时候我们有个转换map,可以自定义对某个类的equal比较器,然后可以在不对类的修改的情况下,达到比较的效果。
上面实现了注解和缓存的key,下面来拦截器啦
/** * Alipay.com Inc. * Copyright (c) 2004-2016 All Rights Reserved. */ package com.zhangwei.learning.utils.cache; import com.google.common.cache.CacheBuilder; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Maps; import com.zhangwei.learning.resource.GlobalResource; import com.zhangwei.learning.utils.log.LoggerUtils; import org.aopalliance.intercept.Invocation; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.util.Map; import java.util.concurrent.*; /** * 可以对接口做缓存的拦截器 * * @author Administrator * @version $Id: CacheInterceptor.java, v 0.1 2016年8月22日 上午2:50:32 Administrator Exp $ */ public class CacheInterceptor implements MethodInterceptor, InitializingBean, GlobalResource { /** * The constant logger. */ private static final Logger logger = LoggerFactory .getLogger(CacheInterceptor.class); /** * 本地缓存大小. */ private long maxCacheSize = 300; /** * The constant expireTimeWhenAccess. */ private long expireTimeWhenAccess = 20; /** * The Local Cache. */ private com.google.common.cache.Cache<CacheKey, FutureTask<Object>> cache = null; /** * The equal Convertors. */ private static Map<String, ObjEquality> convertors = Maps.newHashMap(); /** * @see org.aopalliance.intercept.MethodInterceptor#invoke(org.aopalliance.intercept.MethodInvocation) */ @Override public Object invoke(MethodInvocation invocation) throws Throwable { Cache cacheAnnotation = invocation.getMethod().getAnnotation(Cache.class); if (cacheAnnotation == null || !cacheAnnotation.enabled()) { return invocation.proceed(); } //需要cache CacheKey cacheKey = new CacheKey(invocation.getMethod().getDeclaringClass(), invocation.getMethod(), invocation.getArguments()); CacheType cacheType = cacheAnnotation.cacheType(); if (cacheType == CacheType.LOCAL) { Object result = getLocalCacheResult(cacheKey, invocation); return result; } throw new RuntimeException("not supported cacheType"); } /** * Get local cache result object. * * @param key the key * @return the object */ private Object getLocalCacheResult(CacheKey key, final Invocation i) throws ExecutionException, InterruptedException { FutureTask<Object> f = new FutureTask<Object>(new Callable<Object>() { @Override public Object call() throws Exception { try { return i.proceed(); } catch (Throwable throwable) { throw new ExecutionException(throwable); } } }); FutureTask<Object> result = cache.asMap().putIfAbsent(key, f); if (result == null) { f.run(); result = f; LoggerUtils.debug(logger,"提交任务,key=",key); }else { LoggerUtils.debug(logger, "从缓存获取,key=", key); } return result.get(); } /** * Sets expire time when access. * * @param expireTimeWhenAccess the expire time when access */ public void setExpireTimeWhenAccess(long expireTimeWhenAccess) { this.expireTimeWhenAccess = expireTimeWhenAccess; } @Override public void afterPropertiesSet() throws Exception { cache = CacheBuilder .newBuilder() .maximumSize(maxCacheSize) .expireAfterAccess( expireTimeWhenAccess, TimeUnit.SECONDS).removalListener(new RemovalListener<CacheKey, Future<Object>>() { @Override public void onRemoval(RemovalNotification<CacheKey, Future<Object>> notification) { LoggerUtils.info(logger, "移除key=", notification.getKey(), ",value=", notification.getValue(), ",cause=", notification.getCause()); } }) .build(); } /** * Sets convertors. * * @param convertors the convertors */ public void setConvertors(Map<String, ObjEquality> convertors) { this.convertors = convertors; } /** * Gets convertors. * * @return the convertors */ public static Map<String, ObjEquality> getConvertors() { return convertors; } /** * Sets max cache size. * * @param maxCacheSize the max cache size */ public void setMaxCacheSize(long maxCacheSize) { this.maxCacheSize = maxCacheSize; } }
这里我们实现了缓存的拦截器,缓存采用Guava cache,这里我们在使用上主要是使用了guava的缓存自动淘汰、原子化的功能。我们可以看到,缓存的是CacheKey--->FutureTask<Object>的映射,这里我们采用了FutureTask的异步执行的功能。并且将Guava 作为ConcurrentHashMap来使用。
好了我们来配置下。
<bean id="cacheInteceptor" class="com.zhangwei.learning.utils.cache.CacheInterceptor"> <property name="maxCacheSize" value="300"/> <property name="expireTimeWhenAccess" value="300"/> <property name="convertors"> <map> <entry key="java.lang.String" value-ref="stringConvertor" /> </map> </property> </bean> <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator"> <property name="order" value="90"></property> <property name="interceptorNames"> <list> <value>digestInteceptor</value> <value>cacheInteceptor</value> </list> </property> <property name="beanNames"> <value>*</value> </property> </bean>
上面的那个map就是配置的自定义equals比较器
上测试类
@Component @Digest public class TestBean { @Cache(cacheType = CacheType.LOCAL, enabled = true) public String test(String one, String two) throws Exception { Thread.sleep(999); // throw new Exception("lalal"); return one + two; } } public class CacheTest { private final static Logger LOGGER = LoggerFactory.getLogger(CacheTest.class); @org.junit.Test public void Test() { final TestBean client = GlobalResourceUtils.getBean(TestBean.class); LoggerUtils.info(LOGGER, "获取到的client=", JSON.toJSONString(client)); MutiThreadRun.init(5).addTaskAndRun(10, new Callable<Object>() { @Override public Object call() throws Exception { return client.test("aaa","bbb"); } }); } } public class StringConvertor extends ObjEquality { @Override public boolean extraEquals(Object a, Object b) { return false; } }
这里我们就是讲一个方法多线程执行了10次,该方法中间会将线程暂停1s,所以可以看每次方法的执行时间就知道是否走缓存了。我们这里自定义了一个equal比较器,总是返回false,所以这里我们理论上每次都不会走缓存的,因为比较的时候key不同。
2016-08-28 23:38:09:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1043ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1035ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1034ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1036ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:09:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1033ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:527 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:530 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:531 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:534 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1000ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:38:10:534 com.zhangwei.learning.run.MutiThreadRun 任务执行完毕,执行时间2051ms,共有10个任务,执行异常0次
可以看到 每次执行时间都超过了1s,因为没走缓存,每次线程都暂停了1s。
然后我们把那个String比较器删掉。理论上这次调用的就是String的equals方法,就能走上缓存了。
2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,986ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1020ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,987ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1026ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:419 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,2ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:418 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1037ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,0ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:420 com.zhangwei.learning.utils.digest.DigestInterceptor (TestBean,test,1ms,No Exception,aaa^bbb,aaabbb) 2016-08-28 23:52:27:421 com.zhangwei.learning.run.MutiThreadRun 任务执行完毕,执行时间1043ms,共有10个任务,执行异常0次
可以看到,除了5个结果执行时间超过1s,其他的都很快,为啥呢?因为方法是多线程执行的,5个线程,最开始执行,5个线程中一个线程会执行方法,并且把结果放到缓存里面。然后5个线程一起等待方法执行完成然后把结果返回,然后后面的所有的都只需要从缓存获取就好了,这似不似很赞~~~~