zoukankan      html  css  js  c++  java
  • thread:【生产者消费者模式】

    一对一

    public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 10000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            ReentrantLock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            //生产
            ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            //消费
            ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            downExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (true) {
                            if (fileList.size() > 0) {
                                if (list.size() < 1) {
                                    Integer first = fileList.get(0);
                                    list.add(first);
                                    fileList.remove(0);
                                    System.out.println("生产了:" + first);
                                    condition.signalAll();
                                } else {
                                    System.out.println("fileList:" + fileList.size() + "list:" + list.size());
                                    condition.await();
                                }
                            } else {
                                System.out.println("生产执行完毕~~~~~");
                                downExecutor.shutdownNow();
                                break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            });
    
            upExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    lock.lock();
                    try {
                        while (true) {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println("消费了:" + num);
                                condition.signalAll();
                            } else {
                                if (fileList.size() > 0) {
                                    System.out.println("list:" + list.size());
                                    condition.await();
                                } else {
                                    System.out.println("消费执行完毕~~~~~");
                                    upExecutor.shutdownNow();
                                    break;
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            });
        }

    多对多1

        public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 1000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            //生产
            ThreadPoolExecutor downExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            //消费
            ThreadPoolExecutor upExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
            downExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            if (fileList.size() > 0) {
                                if (list.size() < 1) {
                                    Integer first = fileList.get(0);
                                    list.add(first);
                                    fileList.remove(0);
                                    System.out.println("生产了:" + first);
                                }
                            } else {
                                System.out.println("生产执行完毕~~~~~fileList:"+fileList.size());
                                downExecutor.shutdownNow();
                                break;
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
    
            upExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println("消费了:" + num);
                            } else {
                                if (fileList.size() == 0) {
                                    System.out.println("消费执行完毕~~~~~list:"+list.size());
                                    upExecutor.shutdownNow();
                                    break;
                                }
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }

    多对多2

     public static void main(String[] args) {
            LinkedBlockingQueue list = new LinkedBlockingQueue();
            List<Integer> fileList = Collections.synchronizedList(new ArrayList<>());
            for (int i = 0, size = 1000; i < size; i++) {
                int num = new Random().nextInt();
                fileList.add(num);
            }
            ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
            //生产
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10), factory, new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    try {
                        if (!executor.isShutdown()) {
                            executor.getQueue().put(r);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.error("队列阻塞策略异常,切换到CallerRunsPolicy!");
                        if (!executor.isShutdown()) {
                            r.run();
                        }
                    }
                }
            });
    
           
            while (true) {
                if (fileList.size() > 0) {
                    poolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            if (fileList.size() > 0) {
                                Integer first = fileList.get(0);
                                list.add(first);
                                fileList.remove(0);
                                System.out.println(Thread.currentThread().getName()+"生产了:" + first);
                            }
                        }
                    });
                } else {
                    System.out.println("生产执行完毕~~~~~fileList:" + fileList.size());
                    break;
                }
            }
    
            while (true) {
                if (list.size() > 0) {
                    poolExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            if (list.size() > 0) {
                                int num = (int) list.remove();
                                System.out.println(Thread.currentThread().getName()+"消费了:" + num);
                            }
                        }
                    });
                } else {
                    System.out.println("消费执行完毕~~~~~list:" + list.size());
                    poolExecutor.shutdown();
                    break;
                }
            }
        }

    一学就会,一用就懵系列,多线程还是要多练习~

  • 相关阅读:
    每月碎碎念 | 2019.7
    聊聊HTML5中的Web Notification桌面通知
    Python的海龟绘图法小知识
    面向对象是什么意思?通俗易懂
    HTML实体
    gcc错误[Error] ld returned 1 exit status
    Markdown怎么使用制表符TAB键?为什么TAB失灵了?
    力扣题解——2的幂
    Jquery中的Ajax
    7个你可能不认识的CSS单位
  • 原文地址:https://www.cnblogs.com/fangyanr/p/12171899.html
Copyright © 2011-2022 走看看