zoukankan      html  css  js  c++  java
  • 并发编程中几种常见的设计模式及代码实现

    1.Future模式
    (1)将客户端请求的处理过程从同步改为异步,以便将客户端解放出来,在服务端程序处理期间可以去干点其他事情,最后再来取请求的结果。 
    (2)类似于商品订单。比如网购,看中一件商品时,可以提交订单,当订单处理完成后,等待商品送货上门即可。或者说,类似当我们发送Ajax请求时,页面是异步的进行后台处理,用户无须一直等待请求的结果,可以继续浏览或操作其他内容。
    (3)客户端发送一个长时间的请求,服务端不需等待该数据处理完成便立即返回一个伪造的代理数据(相当于商品订单,不是商品本身),用户也无需等待,先去执行其他的若干操作后,再去调用服务器已经完成组装的真实数据。该模型充分利用了等待的时间片段。
    (4)代码实现
    /**********FutureClient.java***************/
    package com.cx.futuredesign;
    
    public class FutureClient {
        public Data request(final String queryStr) {
            //1.代理对象(Data接口的实现类),先返回给发送请求的客户端,告诉他请求已经收到,可以做其他事情
            final FutureData futureData=new FutureData();
            //2.启动一个新线程,去加载真实的数据,传递给代理对象
            new Thread(()->{
                //3.这个新线程可以慢慢的去加载真实对象,然后传递给代理对象
                RealData realData=new RealData(queryStr);
                futureData.setRealData(realData);
            }) .start();
            //自己启动一个线程去加载对象,直接给主程序返回一个假数据null
            return futureData;
        }
    }
    
    /*************Data.java*********************/
    package com.cx.futuredesign;
    
    public interface Data {
    String getRequest();
    }
    
    
    /*************FutureData.java*********************/
    package com.cx.futuredesign;
    
    //包装类
    public class FutureData implements Data{
    
        private RealData realData;
        private boolean isReady=false;
        public synchronized void setRealData(RealData realData) {
            if(isReady) {
                return;
            }
            this.realData = realData;
            isReady=true;
            //通知
            notify();
        }
        @Override
        public synchronized String getRequest() {
            while(!isReady) {
                try {
                    //如果数据没装载好,程序一直处于阻塞状态
                    wait();
                } catch (Exception e) {
                e.printStackTrace();
                }
            }
            //真正获取数据的方法
            return this.realData.getRequest();
        }
    
    }
    
    /***********RealData.java*****************/
    package com.cx.futuredesign;
    
    public class RealData implements Data{
        private String result;
        public RealData(String queryStr) {
            System.out.println("根据"+queryStr+"进行查询,这是一个很耗时的操作..");
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("操作完毕,获取结果");
            result="返回真实的结果";
        }
        
        public  String getRequest() {
            return result;
        }
    }
    
    /***********Main.java**************/
    package com.cx.futuredesign;
    
    import java.util.concurrent.Future;
    
    public class Main {
    
        public static void main(String[] args) {
            FutureClient fc=new FutureClient();
            Data data=fc.request("请求参数");
            System.out.println("请求发送成功!");
            System.out.println("做其他事情...");
            
            String result=data.getRequest();
            System.out.println(result);
        }
    
    }
     
     
    2.Master-Worker模式(比如hadoop和Storm都是这种模式)
    (1)一种常用的并行计算的模式,系统由两类进程协作工作:Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完后,会将结果返回给Master,由它做归纳总结。其好处能将一个大任务分解为若干个小任务,并行执行,从而提高系统的吞吐量。
    (2)具体实现(策略)
     

    (3)代码实现 

        假定100个任务,每个任务单线程时需要500ms,这里启动10个Worker来运行。 运行结果:运行时间是5009,结果是45557。也就是差不多为500*100/10

    /************Master.java*******************/
    package com.cx.masterworker;
    
    import java.util.HashMap;
    import java.util.Map.Entry;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class Master {
        //1.承装任务的集合
        private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<Task>(); 
        //2.承装所有的worker对象
        private HashMap<String, Thread> workers=new HashMap<String, Thread>();
        //3.使用容器承装每个worker并发执行任务的结果集
        private ConcurrentHashMap<String,Object> resultMap=new ConcurrentHashMap<String, Object>();
        //4.构造方法
        public Master(Worker worker,int workerCount) {
            //每一个worker对象有Master的引用
            //workQueue用于任务的领取,resultMap用于任务的提交
            worker.setWorkerQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            for(int i=0;i<workerCount;i++) {
                //key表示每个worker的名字,value表示线程执行对象
                workers.put("子节点"+i, new Thread(worker));
            }
        }
        //5.提交任务
        public void submit(Task task) {
            this.workQueue.add(task);
        }
        //6.需要一个执行的方法(启动应用程序,让所有的worker工作)
        public void execute() {
            for(Entry<String, Thread> me:workers.entrySet()) {
                me.getValue().start();
            }
        }
        //7.判断线程是否执行完毕
        public boolean isComplete() {
            for(Entry<String, Thread> me:workers.entrySet()) {
                //只要有一个线程不是TERMINATED,就没有执行完毕
                if(me.getValue().getState()!=Thread.State.TERMINATED)
                    return false;
            }
            return true;
        }
        //8.返回结果集数据
        public int getResult() {
            int ret=0;
            for(Entry<String, Object>me:resultMap.entrySet()) {
                ret+=(Integer)me.getValue();
            }
            return ret;
        }
    
    }
    
    
    /*****************Worker.java**********************/
    package com.cx.masterworker;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class Worker implements Runnable{
    
        private ConcurrentLinkedQueue<Task> workQueue;
        private ConcurrentHashMap<String, Object> resultMap;    
        
        public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue=workQueue;
        }
    
        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap=resultMap;
            
        }
    
        @Override
        public void run() {
            while(true) {
                Task input=this.workQueue.poll();
                if(input==null) break;
                //做业务处理
                Object output=MyWorker.handle(input);
                this.resultMap.put(input.getId()+"",output);            
            }
        }
        //处理业务,可能是数据的加工,也可能是操作数据库..
        public static Object handle(Task input) {
            return null;
        }
    
    }
    
    /**************MyWorker.java*******************/
    package com.cx.masterworker;
    
    public class MyWorker extends Worker{
    
        //处理业务,可能是数据的加工,也可能是操作数据库..
        public static Object handle(Task input) {
            Object output=null;
            try {
                Thread.sleep(500);
                output=input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    }
    
    
    /**************Task.java*******************/
    
    package com.cx.masterworker;
    
    public class Task {
        private int id;
        private String name;
        private int price;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public int getPrice() {
            return price;
        }
        public void setPrice(int price) {
            this.price = price;
        }
    }
    
    /*************Main.java*****************/
    package com.cx.masterworker;
    
    import java.util.Random;
    
    public class Main {
    
        public static void main(String[] args) {
            //10个worker,并行
            Master master=new Master(new MyWorker(), 10);
            Random random=new Random();
            //100个任务
            for(int i=1;i<100;i++){
                Task task=new Task();
                task.setId(i);
                task.setName("任务"+i);
                task.setPrice(random.nextInt(1000));
                master.submit(task);
            }
            master.execute();
            long startTime=System.currentTimeMillis();
            while(true) {
                if(master.isComplete()) {
                    long dur=System.currentTimeMillis()-startTime;
                    //所有的线程运行完,返回结果
                    int result=master.getResult();
                    System.out.println("运行时间是"+dur+",结果是"+result);
                    break;
                }
            }
        }
    
    }
    3.生产者-消费者模式
    (1)适合于MQ消息中间件的场景。生产者线程负责提交用户请求,消费者线程负责处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。
    (2)代码实现
    /*******Provider.java*************/
    package com.cx.ProviderandCon;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Provider implements Runnable {
    
        //共享缓存区
        private BlockingQueue<Data> queue;
        //用于强制线程停止
        private volatile boolean isRunning=true;
        //id生成器
        private static AtomicInteger count=new AtomicInteger();
        //随机对象
        private static Random random=new Random();
        
        public Provider(BlockingQueue<Data> queue) {
            this.queue=queue;
        }
    
        public void run() {
            while(isRunning) {
                try {
                    //表示产生数据的耗时
                    Thread.sleep(random.nextInt(1000));
                    int id=count.incrementAndGet();
                    Data data=new Data(id+"", "数据"+id);
                    System.out.println("当前线程"+Thread.currentThread().getName()+",获取了数据,id为"+id+",进行装载到缓冲区中..");
                    if(!this.queue.offer(data, 2, TimeUnit.SECONDS)) {
                        System.out.println("提交缓冲区数据失败..");
                        //做一些事,比如重新提交
                    }
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void stop() {
            this.isRunning=false;
            
        }
    
    }
    
    /************Consumer.java*****************/
    package com.cx.ProviderandCon;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable {
    
        private BlockingQueue<Data> queue;
        public Consumer(BlockingQueue<Data> queue) {
            this.queue=queue;
        }
    
        private static Random r=new Random();
        @Override
        public void run() {
            while(true) {
                try {
                    //获取数据
                    Data data=queue.take();
                    //模拟数据处理
                    Thread.sleep(r.nextInt(1000));
                    System.out.println("当前消费线程:"+Thread.currentThread().getName()+",消费成功,消费数据id:"+data.getId());
                    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }
    
    /***************Data.java********************/
    package com.cx.ProviderandCon;
    
    public class Data {
        private String id;
        private String name;
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public Data(String id,String name) {
            this.id=id;
            this.name=name;
        }
        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return "{id:"+id+",name:"+name+"}";
        }
    }
    
    /******************Main.java********************/
    package com.cx.ProviderandCon;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Main {
    
        public static void main(String[] args) {
            //内存缓冲区
            BlockingQueue<Data> queue=new LinkedBlockingQueue<Data>(10);
            //生产者
            Provider p1=new Provider(queue);
            Provider p2=new Provider(queue);
            Provider p3=new Provider(queue);
            //消费者
            Consumer c1=new Consumer(queue);
            Consumer c2=new Consumer(queue);
            Consumer c3=new Consumer(queue);
            
            //创建线程池运行,这是一个缓存线程池,可以创建无穷大的线程,没有任务的时候,不创建线程
            //空闲线程的默认存活时间为60s
            ExecutorService cachePool=Executors.newCachedThreadPool();
            cachePool.execute(p1);
            cachePool.execute(p2);
            cachePool.execute(p3);
            cachePool.execute(c1);
            cachePool.execute(c2);
            cachePool.execute(c3);
            
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            p1.stop();
            p2.stop();
            p3.stop();
            try {
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
     
     
     
  • 相关阅读:
    HDU 1527 取石子游戏 (威佐夫博奕)
    HDU 1159 Common Subsequence (LCS)
    HDU 1160 FatMouse's Speed (LIS)
    HDU 2577 How to Type (DP)
    csu 1395: Timebomb (模拟)
    csu 1556: Jerry's trouble(大数取模)
    csu 1553: Good subsequence (最长连续子序列)
    csu 1548: Design road (三分)
    csu 1547: Rectangle (01背包)
    csu 1541: There is No Alternative(Kruskal 最小生成树)
  • 原文地址:https://www.cnblogs.com/sunnyCx/p/7976181.html
Copyright © 2011-2022 走看看