zoukankan      html  css  js  c++  java
  • 生产者和消费者

    • 如果生产者的队列满了(while循环判断是否满),则等待。如果生产者的队列没满,则生产数据并唤醒消费者进行消费。

    • 如果消费者的队列空了(while循环判断是否空),则等待。如果消费者的队列没空,则消费数据并唤醒生产者进行生产。

    package test;

    import java.util.Random;

    import java.util.Vector;

    import java.util.concurrent.atomic.AtomicInteger;

    public class Producer implements Runnable {

        // true--->生产者一直执行,false--->停掉生产者

        private volatile boolean isRunning = true;

        // 公共资源

        private final Vector sharedQueue;

        // 公共资源的最大数量

        private final int SIZE;

        // 生产数据

        private static AtomicInteger count = new AtomicInteger();

        public Producer(Vector sharedQueue, int SIZE) {

            this.sharedQueue = sharedQueue;

            this.SIZE = SIZE;

        }

        @Override

        public void run() {

            int data;

            Random r = new Random();

            System.out.println("start producer id = " + Thread.currentThread().getId());

            try {

                while (isRunning) {

                    // 模拟延迟

                    Thread.sleep(r.nextInt(1000));

                    // 当队列满时阻塞等待

                    while (sharedQueue.size() == SIZE) {

                        synchronized (sharedQueue) {

                            System.out.println("Queue is full, producer " + Thread.currentThread().getId()

                                    + " is waiting, size:" + sharedQueue.size());

                            sharedQueue.wait();

                        }

                    }

                    // 队列不满时持续创造新元素

                    synchronized (sharedQueue) {

                        // 生产数据

                        data = count.incrementAndGet();

                        sharedQueue.add(data);

                        System.out.println("producer create data:" + data + ", size:" + sharedQueue.size());

                        sharedQueue.notifyAll();

                    }

                }

            } catch (InterruptedException e) {

                e.printStackTrace();

                Thread.currentThread().interrupted();

            }

        }

        public void stop() {

            isRunning = false;

        }

    }

    package test;

    import java.util.Random;

    import java.util.Vector;

    public class Consumer implements Runnable {

        // 公共资源

        private final Vector sharedQueue;

        public Consumer(Vector sharedQueue) {

            this.sharedQueue = sharedQueue;

        }

        @Override

        public void run() {

            Random r = new Random();

            System.out.println("start consumer id = " + Thread.currentThread().getId());

            try {

                while (true) {

                    // 模拟延迟

                    Thread.sleep(r.nextInt(1000));

                    // 当队列空时阻塞等待

                    while (sharedQueue.isEmpty()) {

                        synchronized (sharedQueue) {

                            System.out.println("Queue is empty, consumer " + Thread.currentThread().getId()

                                    + " is waiting, size:" + sharedQueue.size());

                            sharedQueue.wait();

                        }

                    }

                    // 队列不空时持续消费元素

                    synchronized (sharedQueue) {

                        System.out.println("consumer consume data:" + sharedQueue.remove(0) + ", size:" + sharedQueue.size());

                        sharedQueue.notifyAll();

                    }

                }

            } catch (InterruptedException e) {

                e.printStackTrace();

                Thread.currentThread().interrupt();

            }

        }

    }

    package test;

    import java.util.Vector;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    public class Test2 {

        public static void main(String[] args) throws InterruptedException {

            // 1.构建内存缓冲区

            Vector sharedQueue = new Vector();

            int size = 4;

            // 2.建立线程池和线程

            ExecutorService service = Executors.newCachedThreadPool();

            Producer prodThread1 = new Producer(sharedQueue, size);

            Producer prodThread2 = new Producer(sharedQueue, size);

            Producer prodThread3 = new Producer(sharedQueue, size);

            Consumer consThread1 = new Consumer(sharedQueue);

            Consumer consThread2 = new Consumer(sharedQueue);

            Consumer consThread3 = new Consumer(sharedQueue);

            service.execute(prodThread1);

            service.execute(prodThread2);

            service.execute(prodThread3);

            service.execute(consThread1);

            service.execute(consThread2);

            service.execute(consThread3);

            // 3.睡一会儿然后尝试停止生产者(结束循环)

            Thread.sleep(10 * 1000);

            prodThread1.stop();

            prodThread2.stop();

            prodThread3.stop();

            // 4.再睡一会儿关闭线程池

            Thread.sleep(3000);

            // 5.shutdown()等待任务执行完才中断线程(因为消费者一直在运行的,所以会发现程序无法结束)

            service.shutdown();

        }

    }

  • 相关阅读:
    HP惠普战66电源黄灯闪烁无法充电
    C#.NET rabbit mq 持久化时报错 durable
    手动解压安装mysql8.0 on windows my.ini
    C#.NET MySql8.0 EF db first
    EF MYSQL 出现:输入字符串的格式不正确
    EF MYSQL DB FIRST 出现2次数据库名
    mysql windows 下配置可远程连接
    团队项目的Git分支管理规范
    一个简单的软件测试流程
    微服务架构下的质量迷思——混沌工程
  • 原文地址:https://www.cnblogs.com/chinaifae/p/10443305.html
Copyright © 2011-2022 走看看