下面的例子全部来自<Java程序性能优化-让你的程序更快更稳定>书中的第4章并发程序开发及优化。
future模式:
同时进行两个线程的业务,最终用的时间是耗时最长的线程的时间。
package cn.com.fzk.book; public class FutureTest { public static void main(String[] args) { Client client = new Client(); Data data = client.request("name"); System.out.println("ok, i will sleep one second"); try { Thread.sleep(1000); } catch (Exception e) { } System.out.println("data :" + data.getResult()); } } class Client { public Data request(final String queryStr) { final FutureData future = new FutureData(); new Thread() { public void run() { RealData realData = new RealData(queryStr); future.setRealData(realData); } }.start(); return future; }; } interface Data { public String getResult(); } class FutureData implements Data { protected RealData realData = null; protected boolean isReady = false; public synchronized void setRealData(RealData realData) { if (isReady) { return; } this.realData = realData; isReady = true; notifyAll(); } @Override public String getResult() { while (!isReady) { try { wait(); } catch (Exception e) { } } return realData.result; } } class RealData implements Data { protected final String result; public RealData(String para) { StringBuffer sb = new StringBuffer(); for (int i = 0; i < 10; i++) { sb.append(para); } try { Thread.sleep(2000); } catch (Exception e) { } result = sb.toString(); } @Override public String getResult() { return result; } }
future模式非常常用,jdk自带了一套。
package cn.com.fzk.book; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; public class JDKFutrueTest { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<String> future = new FutureTask<>(new JDKRealData("a")); ExecutorService executor = Executors.newFixedThreadPool(1); executor.submit(future); System.out.println("ok"); try { Thread.sleep(1000); } catch (Exception e) { } System.out.println("data : " + future.get()); } } class JDKRealData implements Callable<String> { private String para; public JDKRealData(String para) { this.para = para; } @Override public String call() throws Exception { StringBuffer sb = new StringBuffer(); for (int i = 0; i < 10; i++) { sb.append(para); } try { Thread.sleep(100); } catch (Exception e) { } return sb.toString(); } }
MasterWorker模式
将master的任务分配到n个线程中同时执行一个大任务。下面计算1-100的立方和。ConcurrentLinkedQueue是线程安全的队列。
package cn.com.fzk.book; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class MasterWorder { public static void main(String[] args) { Master master = new Master(new PlusWorker(), 5); for (int i = 0; i < 100; i++) { master.summit(i); } master.execute(); int result = 0; Map<String, Object> resultMap = master.getResultMap(); while (resultMap.size() > 0 || !master.isComplete()) { Set<String> keys = resultMap.keySet(); String key = null; for (String k : keys) { key = k; break; } Integer i = null; if (key != null) { i = (Integer) resultMap.get(key); } if (i != null) { result += i; } if (key != null) { resultMap.remove(key); } } System.out.println(result); } } class Master { Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); Map<String, Thread> threadMap = new HashMap<String, Thread>(); Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } public Master(Worker worker, int countWorker) { worker.setWordQueue(workQueue);; worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } public void summit(Object job) { workQueue.add(job); } public Map<String, Object> getResultMap() { return resultMap; } public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } } } abstract class Worker implements Runnable { Queue<Object> workQueue; Map<String, Object> resultMap; public void setWordQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } public abstract Object handle(Object input); @Override public void run() { while (true) { Object input = workQueue.poll(); if (input == null) { break; } Object result = handle(input); resultMap.put(Integer.toString(input.hashCode()), result); } } } class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i = (Integer) input; return i * i * i; } }
Guarded Suspension模式
消息队列等都是用的这种模式,client端将任务提交,worker端根据系统性能进行处理任务,如果不需要等待结果,client端的线程就已经直接完成了。
下面的例子client端等待server端的返回结果,用到的Data等是最上面的Future模式定义的类。
package cn.com.fzk.book; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; public class GuardedSuspension { public static void main(String[] args) { RequestQueue requestQueue = new RequestQueue(); for (int i = 0; i < 10; i++) { new ServerThread(requestQueue, "ServerThread " + i).start(); } for (int i = 0; i < 10; i++) { new ClientThread(requestQueue, "ClientThread " + i).start(); } } } class Request { private String name; private Data response; public Request(String name) { this.name = name; } public String getName() { return this.name; } public synchronized Data getResponse() { return response; } public synchronized void setResponse(Data response) { this.response = response; } public String toString() { return "[ Request " + name + " ]"; } } class RequestQueue { private LinkedList<Request> queue = new LinkedList<Request>(); public synchronized Request getRequest() { while (queue.size() == 0) { try { wait(); } catch (Exception e) { } } return (Request) queue.remove(); } public synchronized void addRequest(Request request) { queue.add(request); notifyAll(); } } class ServerThread extends Thread { private RequestQueue requestQueue; public ServerThread(RequestQueue requestQueue, String name) { super(name); this.requestQueue = requestQueue; } @Override public void run() { while (true) { final Request request = requestQueue.getRequest(); final FutureData future = (FutureData) request.getResponse(); RealData realData = new RealData(request.getName()); future.setRealData(realData); System.out.println(Thread.currentThread().getName() + "handles" + request); } } } class ClientThread extends Thread { private RequestQueue requestQueue; private List<Request> myRequest = new ArrayList<Request>(); public ClientThread(RequestQueue requestQueue, String name) { super(name); this.requestQueue = requestQueue; } @Override public void run() { for (int i = 0; i < 10; i++) { Request request = new Request("RequestID:" + i + " ThreadName" + Thread.currentThread().getName()); System.out.println(Thread.currentThread().getName() + " requests " + request); request.setResponse(new FutureData()); requestQueue.addRequest(request); myRequest.add(request); try { Thread.sleep(1000); } catch (Exception e) { } for (Request res : myRequest) { System.out.println("ClientThread Name is " + Thread.currentThread().getName() + " Response is: " + res.getResponse().getResult()); } } } }
还生产者消费者模式、作家读者模式、哲学家模式。