zoukankan      html  css  js  c++  java
  • 并发的几个模式

    下面的例子全部来自<Java程序性能优化-让你的程序更快更稳定>书中的第4章并发程序开发及优化。

    future模式:

    同时进行两个线程的业务,最终用的时间是耗时最长的线程的时间。

    package cn.com.fzk.book;
    
    public class FutureTest {
      public static void main(String[] args) {
        Client client = new Client();
    
        Data data = client.request("name");
        System.out.println("ok, i will sleep one second");
        try {
          Thread.sleep(1000);
        } catch (Exception e) {
        }
        System.out.println("data :" + data.getResult());
      }
    }
    
    
    class Client {
      public Data request(final String queryStr) {
        final FutureData future = new FutureData();
        new Thread() {
          public void run() {
            RealData realData = new RealData(queryStr);
            future.setRealData(realData);
          }
        }.start();
        return future;
      };
    }
    
    
    
    interface Data {
      public String getResult();
    }
    
    
    class FutureData implements Data {
      protected RealData realData = null;
      protected boolean isReady = false;
    
      public synchronized void setRealData(RealData realData) {
        if (isReady) {
          return;
        }
        this.realData = realData;
        isReady = true;
        notifyAll();
      }
    
      @Override
      public String getResult() {
        while (!isReady) {
          try {
            wait();
          } catch (Exception e) {
          }
    
        }
        return realData.result;
      }
    
    }
    
    
    class RealData implements Data {
      protected final String result;
    
      public RealData(String para) {
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 10; i++) {
          sb.append(para);
        }
        try {
          Thread.sleep(2000);
        } catch (Exception e) {
        }
    
        result = sb.toString();
      }
    
      @Override
      public String getResult() {
        return result;
      }
    
    }

    future模式非常常用,jdk自带了一套。

    package cn.com.fzk.book;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    
    public class JDKFutrueTest {
      public static void main(String[] args) throws InterruptedException, ExecutionException {
        FutureTask<String> future = new FutureTask<>(new JDKRealData("a"));
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.submit(future);
        System.out.println("ok");
        try {
          Thread.sleep(1000);
        } catch (Exception e) {
        }
    
        System.out.println("data : " + future.get());
      }
    }
    
    
    class JDKRealData implements Callable<String> {
      private String para;
    
      public JDKRealData(String para) {
        this.para = para;
      }
    
      @Override
      public String call() throws Exception {
        StringBuffer sb = new StringBuffer();
        for (int i = 0; i < 10; i++) {
          sb.append(para);
        }
        try {
          Thread.sleep(100);
        } catch (Exception e) {
        }
        return sb.toString();
      }
    
    }

    MasterWorker模式

    将master的任务分配到n个线程中同时执行一个大任务。下面计算1-100的立方和。ConcurrentLinkedQueue是线程安全的队列。

    package cn.com.fzk.book;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Queue;
    import java.util.Set;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class MasterWorder {
      public static void main(String[] args) {
        Master master = new Master(new PlusWorker(), 5);
        for (int i = 0; i < 100; i++) {
          master.summit(i);
        }
        master.execute();
        int result = 0;
        Map<String, Object> resultMap = master.getResultMap();
        while (resultMap.size() > 0 || !master.isComplete()) {
          Set<String> keys = resultMap.keySet();
          String key = null;
          for (String k : keys) {
            key = k;
            break;
          }
          Integer i = null;
          if (key != null) {
            i = (Integer) resultMap.get(key);
          }
          if (i != null) {
            result += i;
          }
          if (key != null) {
            resultMap.remove(key);
          }
        }
    
        System.out.println(result);
      }
    }
    
    
    class Master {
      Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
      Map<String, Thread> threadMap = new HashMap<String, Thread>();
      Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
    
      public boolean isComplete() {
        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
          if (entry.getValue().getState() != Thread.State.TERMINATED) {
            return false;
          }
        }
    
        return true;
      }
    
      public Master(Worker worker, int countWorker) {
        worker.setWordQueue(workQueue);;
        worker.setResultMap(resultMap);
        for (int i = 0; i < countWorker; i++) {
          threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
        }
      }
    
      public void summit(Object job) {
        workQueue.add(job);
      }
    
      public Map<String, Object> getResultMap() {
        return resultMap;
      }
    
      public void execute() {
        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
          entry.getValue().start();
        }
      }
    }
    
    
    abstract class Worker implements Runnable {
      Queue<Object> workQueue;
      Map<String, Object> resultMap;
    
      public void setWordQueue(Queue<Object> workQueue) {
        this.workQueue = workQueue;
      }
    
      public void setResultMap(Map<String, Object> resultMap) {
        this.resultMap = resultMap;
      }
    
      public abstract Object handle(Object input);
    
      @Override
      public void run() {
        while (true) {
          Object input = workQueue.poll();
          if (input == null) {
            break;
          }
          Object result = handle(input);
          resultMap.put(Integer.toString(input.hashCode()), result);
        }
    
      }
    }
    
    
    class PlusWorker extends Worker {
    
      @Override
      public Object handle(Object input) {
        Integer i = (Integer) input;
        return i * i * i;
      }
    
    }

    Guarded Suspension模式

    消息队列等都是用的这种模式,client端将任务提交,worker端根据系统性能进行处理任务,如果不需要等待结果,client端的线程就已经直接完成了。
    下面的例子client端等待server端的返回结果,用到的Data等是最上面的Future模式定义的类。

    package cn.com.fzk.book;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    
    public class GuardedSuspension {
      public static void main(String[] args) {
        RequestQueue requestQueue = new RequestQueue();
        for (int i = 0; i < 10; i++) {
          new ServerThread(requestQueue, "ServerThread " + i).start();
        }
        for (int i = 0; i < 10; i++) {
          new ClientThread(requestQueue, "ClientThread " + i).start();
        }
      }
    }
    
    
    class Request {
      private String name;
      private Data response;
    
      public Request(String name) {
        this.name = name;
      }
    
      public String getName() {
        return this.name;
      }
    
      public synchronized Data getResponse() {
        return response;
      }
    
      public synchronized void setResponse(Data response) {
        this.response = response;
      }
    
      public String toString() {
        return "[ Request " + name + " ]";
      }
    }
    
    
    class RequestQueue {
      private LinkedList<Request> queue = new LinkedList<Request>();
    
      public synchronized Request getRequest() {
        while (queue.size() == 0) {
          try {
            wait();
          } catch (Exception e) {
          }
        }
        return (Request) queue.remove();
      }
    
      public synchronized void addRequest(Request request) {
        queue.add(request);
        notifyAll();
      }
    }
    
    
    class ServerThread extends Thread {
      private RequestQueue requestQueue;
    
      public ServerThread(RequestQueue requestQueue, String name) {
        super(name);
        this.requestQueue = requestQueue;
      }
    
      @Override
      public void run() {
        while (true) {
          final Request request = requestQueue.getRequest();
          final FutureData future = (FutureData) request.getResponse();
          RealData realData = new RealData(request.getName());
          future.setRealData(realData);
          System.out.println(Thread.currentThread().getName() + "handles" + request);
        }
      }
    }
    
    
    class ClientThread extends Thread {
      private RequestQueue requestQueue;
      private List<Request> myRequest = new ArrayList<Request>();
    
      public ClientThread(RequestQueue requestQueue, String name) {
        super(name);
        this.requestQueue = requestQueue;
      }
    
      @Override
      public void run() {
        for (int i = 0; i < 10; i++) {
          Request request =
              new Request("RequestID:" + i + " ThreadName" + Thread.currentThread().getName());
          System.out.println(Thread.currentThread().getName() + " requests " + request);
          request.setResponse(new FutureData());
          requestQueue.addRequest(request);
          myRequest.add(request);
          try {
            Thread.sleep(1000);
          } catch (Exception e) {
          }
          for (Request res : myRequest) {
            System.out.println("ClientThread Name is " + Thread.currentThread().getName()
                + " Response is: " + res.getResponse().getResult());
          }
        }
      }
    }

    还生产者消费者模式、作家读者模式、哲学家模式。

  • 相关阅读:
    openstack-ntp时间同步服务
    如何将icon图标库引入自己的项目中
    微信小程序实现滑动tab切换和点击tab切换并显示相应的数据(附源代码)
    微信小程序分享至朋友圈的方法
    微信小程序--分享功能
    mpvue-新建页面、页面跳转、自适应单位
    微信小程序mpvue-动态改变navigationBarTitleText值
    mpvue中使用flyjs全局拦截
    H5 布局 -- 让容器充满屏幕高度或自适应剩余高度
    使用mpvue开发小程序如何定义全局变量
  • 原文地址:https://www.cnblogs.com/badboyf/p/6596586.html
Copyright © 2011-2022 走看看