zoukankan      html  css  js  c++  java
  • 14.多线程设计模式

    多线程设计模式 - Master-Worker模式

     并发设计模式属于设计优化的一部分,它对于一些常用的多线程结构的总结和抽象。与串行相比并行程序结构通常较为复杂,因此合理的使用并行模式在多线程并发中更具有意义。


    1. Master-Worker模式

    • - Master-Worker模式是常用的并行模式。它的核心思想是系统由两类进程协作工作:Master进程和Worker进程。Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结。
    • - 其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量。

    示例:下
    说明:看类注释即可明了 不明白可以去屎了

      1 //Task.java  
      2   public class Task {
      3 
      4       private int id;
      5       private String name;
      6       private int price;
      7       public int getId() {
      8           return id;
      9       }
     10       public void setId(int id) {
     11           this.id = id;
     12       }
     13       public String getName() {
     14           return name;
     15       }
     16       public void setName(String name) {
     17           this.name = name;
     18       }
     19       public int getPrice() {
     20           return price;
     21       }
     22       public void setPrice(int price) {
     23           this.price = price;
     24       }
     25   }
     26   //Worker.java   
     27   public class Worker implements Runnable{
     28 
     29       private ConcurrentLinkedQueue<Task> workQueue;
     30       private ConcurrentHashMap<String,Object> resultMap;
     31 
     32       @Override
     33       public void run() {
     34           while(true){
     35               Task input = this.workQueue.poll();
     36               if(input==null)break;
     37               //真正的去做业务处理
     38               Object output = MyWorker.handle(input);
     39               this.resultMap.put(Integer.toString(input.getId()), output);
     40           }
     41       }
     42 
     43       public static Object handle(Task input){
     44           return null;
     45       }
     46 
     47       public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
     48           this.workQueue=workQueue;
     49       }
     50 
     51       public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
     52           this.resultMap=resultMap;
     53 
     54       }
     55   }
     56 
     57   //MyWorker.java继承Worker.java,重写handle方法
     58   public class MyWorker extends Worker{
     59 
     60       public static Object handle(Task input) {
     61           Object output = null;
     62           try {
     63               //处理task的耗时,可能是数据的加工,也可能是操作数据库
     64               Thread.sleep(500);
     65               output = input.getPrice();
     66           } catch (InterruptedException e) {
     67               e.printStackTrace();
     68           }
     69           return output;
     70       }
     71 
     72   }
     73   //Master.java *
     74   public class Master {
     75 
     76       //1 应该有一个承装任务的集合
     77       private ConcurrentLinkedQueue<Task> workQueue = new  ConcurrentLinkedQueue<Task>();
     78       //2 使用普通的HashMap去承装所有的worker对象
     79       private HashMap<String,Thread> workers = new HashMap<String,Thread>();
     80       //3 使用一个容器承装每一个worker并发执行的结果集
     81       private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>();
     82       //4 构造方法
     83       public Master(Worker worker,int workerCount){
     84           //每一个worker对象都需要有master的应用workQueue用于任务的领取,resultMap用于任务的提交
     85           worker.setWorkerQueue(this.workQueue);
     86           worker.setResultMap(this.resultMap);
     87 
     88           for(int i=0;i<workerCount;i++){
     89               //key表示每一个worker的名字,value表示线程执行对象
     90               workers.put("子节点"+Integer.toString(i), new Thread(worker));
     91           }
     92       }
     93       //5 提交方法
     94       public void submit(Task task){
     95           this.workQueue.add(task);
     96       }
     97       //6 需要有一个执行的方法(启动应用程序让所有的worker工作)
     98       public void execute(){
     99           for(Map.Entry<String,Thread> me:workers.entrySet()){
    100               me.getValue().start();
    101           }
    102       }
    103       //8 判断线程是否执行完毕
    104       public boolean isComplete() {
    105           for(Map.Entry<String, Thread> me:workers.entrySet()){
    106               if(me.getValue().getState()!=Thread.State.TERMINATED){
    107                   return false;
    108               }
    109           }
    110           return true;
    111       }
    112       //9 返回结果集数据
    113       public int getResult() {
    114           int ret = 0;
    115           for(Map.Entry<String,Object> me:resultMap.entrySet()){
    116               //汇总逻辑
    117               ret+=(Integer)me.getValue();
    118           }
    119           return ret;
    120       }
    121   }
    122   //主函数
    123   public class Main {
    124 
    125       public static void main(String[] args) {
    126           System.out.println("我的机器可用processor的数量:"+Runtime.getRuntime().availableProcessors());
    127           Master master = new Master(new MyWorker(),10);
    128 
    129           Random r = new Random();
    130           for(int i=0;i<100;i++){
    131               Task task = new Task();
    132               task.setId(i);
    133               task.setName("任务"+i);
    134               task.setPrice(r.nextInt(1000));
    135               master.submit(task);
    136           }
    137           master.execute();
    138 
    139           long start = System.currentTimeMillis();
    140           while(true){
    141               if(master.isComplete()){
    142                   long end = System.currentTimeMillis() -start;
    143                   int ret = master.getResult();
    144                   System.out.println("最终结果:"+ret+",执行耗时:"+end+"毫秒");
    145                   break;
    146               }
    147           }
    148       }
    149 
    150   }
  • 相关阅读:
    正则表达式
    抽象
    面向对象
    this关键字
    http协议
    URL
    Ajax
    PHP命名空间
    PDO
    异常
  • 原文地址:https://www.cnblogs.com/Mao-admin/p/9989152.html
Copyright © 2011-2022 走看看