zoukankan      html  css  js  c++  java
  • thread:【生产者消费者模式】

    一对一

    public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 10000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            ReentrantLock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            //生产
            ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            //消费
            ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            downExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (true) {
                            if (fileList.size() > 0) {
                                if (list.size() < 1) {
                                    Integer first = fileList.get(0);
                                    list.add(first);
                                    fileList.remove(0);
                                    System.out.println("生产了:" + first);
                                    condition.signalAll();
                                } else {
                                    System.out.println("fileList:" + fileList.size() + "list:" + list.size());
                                    condition.await();
                                }
                            } else {
                                System.out.println("生产执行完毕~~~~~");
                                downExecutor.shutdownNow();
                                break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            });
    
            upExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (true) {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println("消费了:" + num);
                                condition.signalAll();
                            } else {
                                if (fileList.size() > 0) {
                                    System.out.println("list:" + list.size());
                                    condition.await();
                                } else {
                                    System.out.println("消费执行完毕~~~~~");
                                    upExecutor.shutdownNow();
                                    break;
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            });
        }

    多对多1

        public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 1000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            //生产
            ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            //消费
            ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            downExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            if (fileList.size() > 0) {
                                if (list.size() < 1) {
                                    Integer first = fileList.get(0);
                                    list.add(first);
                                    fileList.remove(0);
                                    System.out.println("生产了:" + first);
                                }
                            } else {
                                System.out.println("生产执行完毕~~~~~fileList:"+fileList.size());
                                downExecutor.shutdownNow();
                                break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            upExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println("消费了:" + num);
                            } else {
                                if (fileList.size() == 0) {
                                    System.out.println("消费执行完毕~~~~~list:"+list.size());
                                    upExecutor.shutdownNow();
                                    break;
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    多对多2

     public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 1000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            //生产
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
           
            while (true) {
                if (fileList.size() > 0) {
                    poolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            if (fileList.size() > 0) {
                                Integer first = fileList.get(0);
                                list.add(first);
                                fileList.remove(0);
                                System.out.println(Thread.currentThread().getName()+"生产了:" + first);
                            }
                        }
                    });
                } else {
                    System.out.println("生产执行完毕~~~~~fileList:" + fileList.size());
                    break;
                }
            }
    
            while (true) {
                if (list.size() > 0) {
                    poolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println(Thread.currentThread().getName()+"消费了:" + num);
                            }
                        }
                    });
                } else {
                    System.out.println("消费执行完毕~~~~~list:" + list.size());
                    poolExecutor.shutdown();
                    break;
                }
            }
        }

    一学就会,一用就懵系列,多线程还是要多练习~

  • 相关阅读:
    bzoj 3438: 小M的作物
    bzoj 4445 [SCOI2015] 小凸想跑步
    hdu 4899 Hero meet devil
    hdu 4898 The Revenge of the Princess’ Knight
    【NOIP1999】拦截导弹
    【OpenJudge】2991:2011 题解
    【cqbzoj】1785:残缺棋盘上放车的方案数 --状压dp --输入毁一生
    【cqbzoj】:1330 Prime DP(Ahio2001 质数和分解)
    【Openjudge:Noi】7891:一元三次方程求解 c++
    【USACO FEB 2010 SILVER】吃巧克力(Chocolate Eating)
  • 原文地址:https://www.cnblogs.com/fangyanr/p/12171899.html
Copyright © 2011-2022 走看看