去年写过很多高质量的代码,由于没有时间写博客,这几天集中时间上传之。去年有一篇博客,详细介绍了CAS算法和ThreadPoolExecutor源代码。现在要上传的是,利用CAS算法和双检索机制解决缓存计算机制的代码。假设有1000个线程去调用公共计算类,这1000个线程传递的值是一样的,要求得出传递的值的最后计算结果。达到的效果类似于单例化模式,即1000个线程,只有一个线程计算结果,其他线程直接拿出结果,避免重复运算。
首先定义公共类的接口:
package com.txq.test;
/**
* 公共类接口,计算传递的数值
* @author XueQiang Tong
*
*/
public interface Computable {
Double compute(String arg);
}
实现类:
package com.txq.test;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* 公共计算类,利用双检索机制解决重复运算的问题
* @author XueQiang Tong
*
*/
public class Memoizer implements Computable {
private final ConcurrentMap<String, FutureTask<Double>> cacher = new ConcurrentHashMap<String, FutureTask<Double>>();
public Double cache(final String arg) throws Throwable {
FutureTask<Double> f = cacher.get(arg);
if (f == null) {
Callable<Double> eval = new Callable<Double>() {
@Override
public Double call() throws Exception {
return compute(arg);
}
};
FutureTask<Double> ft = new FutureTask<Double>(eval);
f = cacher.putIfAbsent(arg, ft);//使用FutureTask来保证线程间的同步,当A线程在执行此处时,其他线程等待,执行结束时,
//运行后面的代码,此时其他线程执行此处,然后直接跳到f.get()处,等待ft.run(),一旦运行结束,立刻返回结果。
if (f == null) {
f = ft;
ft.run();//执行计算
}
try {//返回结果,典型的双检索机制
Double value = f.get();
return value;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
f.cancel(true);
} catch (ExecutionException e) {
throw launderException(e.getCause());
}
}
return null;
}
private static Throwable launderException(Throwable cause) {
if (cause instanceof RuntimeException) {
return (RuntimeException) cause;
} else if (cause instanceof Error) {
return (Error) cause;
} else {
throw new IllegalStateException("Not checked type", cause);
}
}
@Override
public Double compute(String arg) {
Double value = Double.parseDouble(arg);
Double result = 2*value*value;
return result;
}
}
现在,定义任务类:
package com.txq.test;
import java.util.concurrent.Callable;
/**
* 计算任务类
* @author XueQiang Tong
*
*/
public class ComputeTask implements Callable<Double>{
Memoizer memoizer;
String arg;
public ComputeTask(Memoizer memoizer,String arg){
this.memoizer = memoizer;
this.arg = arg;
}
@Override
public Double call() throws Exception {
try {
return memoizer.cache(arg);
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}
}
最后,定义任务调度类:
package com.txq.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
/**
* 计算任务调度类
* @author XueQiang Tong
*
*/
public class ComputeCaller {
private static final int TASKS = 1000;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(1 << 10);
ExecutorService exe = new ThreadPoolExecutor(12,30,120,TimeUnit.MILLISECONDS,queue,new CallerRunsPolicy());
CompletionService<Double> serv = new ExecutorCompletionService<Double>(exe);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch endLatch = new CountDownLatch(TASKS);
public void caller(String arg) throws InterruptedException {
for (int i = 0;i < TASKS;i++) {
serv.submit(new ComputeTask(new Memoizer(),startLatch,endLatch,arg));
}
exe.awaitTermination(100, TimeUnit.MILLISECONDS);
exe.shutdown();
startLatch.countDown();
endLatch.await();
for (int i = 0;i < TASKS;i++) {
try {
Future<Double> future = serv.take();
Double result = future.get();
System.out.println(result + " ");
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
测试类:
package com.txq.test;
public class Test {
public static void main(String[] args) throws Throwable {
ComputeCaller cc = new ComputeCaller();
cc.caller("12");
}
}
佟氏出品,必属精品!坚持写高质量代码,坚持深度学习,数据挖掘算法理论的研究,坚持理论和编程相结合。后面的博客,将上传,最近写的垃圾邮件识别优化算法(朴素贝叶斯)……