zoukankan      html  css  js  c++  java
  • 21.使用LinkedBlockingQueue模拟生产者与消费者

    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 生产者
     */
    public class ProducerThread implements Runnable {
        private BlockingQueue queue;
        private volatile boolean flag = true;
        private static AtomicInteger count = new AtomicInteger();
    
        public ProducerThread(BlockingQueue queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("生产线程启动...");
                while (flag){
                    System.out.println("正在生产...");
                    String data = count.incrementAndGet() + "";
                    boolean offer = queue.offer(data, 2, TimeUnit.SECONDS);
                    if (offer){
                        System.out.println("生产者存入"+data+"到队列成功");
                    }else {
                        System.out.println("生产者存入"+data+"到队列失败");
                    }
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("生产结束");
            }
        }
        public void stop(){
            this.flag = false;
        }
    }
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 消费者
     */
    public class ConsumerThread implements Runnable{
        private BlockingQueue<String> queue;
        private volatile boolean flag = true;
    
        public ConsumerThread(BlockingQueue<String> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("消费线程启动...");
                while (flag){
                    System.out.println("消费者正在获取数据...");
                    String data = queue.poll(2, TimeUnit.SECONDS);
                    if (data!=null){
                        System.out.println("消费者拿到队列中的数据:"+data);
                        Thread.sleep(1000);
                    }else {
                        System.out.println("消费者未拿到队列中的数据");
                        flag = false;
                    }
    
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                System.out.println("消费者结束");
            }
        }
    }
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,
     * 如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
     * 和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部
     */
    public class Main {
        public static void main(String[] args) throws InterruptedException{
            BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
            ProducerThread producerThread1 = new ProducerThread(queue);
            ProducerThread producerThread2 = new ProducerThread(queue);
            ConsumerThread consumerThread = new ConsumerThread(queue);
            Thread t1 = new Thread(producerThread1);
            Thread t2 = new Thread(producerThread2);
            Thread t3 = new Thread(consumerThread);
            t1.start();
            t2.start();
            t3.start();
            Thread.sleep(10000);
            producerThread1.stop();
            producerThread2.stop();
        }
        //生产线程启动...
        //正在生产...
        //消费线程启动...
        //消费者正在获取数据...
        //生产线程启动...
        //正在生产...
        //生产者存入1到队列成功
        //生产者存入2到队列成功
        //消费者拿到队列中的数据:2
        //正在生产...
        //生产者存入3到队列成功
        //正在生产...
        //生产者存入4到队列成功
        //消费者正在获取数据...
        //消费者拿到队列中的数据:1
        //正在生产...
        //消费者正在获取数据...
        //正在生产...
        //消费者拿到队列中的数据:3
        //生产者存入5到队列成功
        //生产者存入6到队列成功
        //消费者正在获取数据...
        //正在生产...
        //正在生产...
        //生产者存入8到队列成功
        //生产者存入7到队列成功
        //消费者拿到队列中的数据:4
        //正在生产...
        //消费者正在获取数据...
        //正在生产...
        //消费者拿到队列中的数据:5
        //生产者存入9到队列成功
        //生产者存入10到队列成功
        //消费者正在获取数据...
        //消费者拿到队列中的数据:6
        //正在生产...
        //正在生产...
        //生产者存入11到队列成功
        //生产者存入12到队列成功
        //消费者正在获取数据...
        //正在生产...
        //正在生产...
        //生产者存入13到队列成功
        //消费者拿到队列中的数据:7
        //生产者存入14到队列成功
        //正在生产...
        //生产者存入15到队列成功
        //正在生产...
        //消费者正在获取数据...
        //生产者存入16到队列成功
        //消费者拿到队列中的数据:8
        //正在生产...
        //消费者正在获取数据...
        //正在生产...
        //消费者拿到队列中的数据:9
        //生产者存入17到队列成功
        //生产者存入18到队列成功
        //消费者正在获取数据...
        //正在生产...
        //生产者存入19到队列成功
        //正在生产...
        //消费者拿到队列中的数据:10
        //生产者存入20到队列成功
        //消费者正在获取数据...
        //生产结束
        //生产结束
        //消费者拿到队列中的数据:11
        //消费者正在获取数据...
        //消费者拿到队列中的数据:12
        //消费者正在获取数据...
        //消费者拿到队列中的数据:13
        //消费者正在获取数据...
        //消费者拿到队列中的数据:14
        //消费者正在获取数据...
        //消费者拿到队列中的数据:15
        //消费者正在获取数据...
        //消费者拿到队列中的数据:16
        //消费者正在获取数据...
        //消费者拿到队列中的数据:17
        //消费者正在获取数据...
        //消费者拿到队列中的数据:18
        //消费者正在获取数据...
        //消费者拿到队列中的数据:19
        //消费者正在获取数据...
        //消费者拿到队列中的数据:20
        //消费者正在获取数据...
        //消费者未拿到队列中的数据
        //消费者结束
    }
    

    案例二:

    
    /**
     * 任务 生产者向缓冲区提交的数据
     */
    public final class PCData {
        private final int intData;
    
        public PCData(int d) {
            this.intData = d;
        }
        public int getData(){
            return intData;
        }
    
        @Override
        public String toString() {
            return "data:"+intData;
        }
    }
    
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 生产者 用于提交用户请求,提取用户任务,并装入内存缓冲区
     */
    public class Producer implements Runnable{
        private volatile boolean isRunning = true;
        // 共享内存缓存区 缓存生产者提交的任务或数据,供消费者使用
        private BlockingQueue<PCData> queue;
        // 总数
        private static AtomicInteger count = new AtomicInteger();
        private static final int SLEEPTIME = 1000;
    
        public Producer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            PCData data = null;
            Random r = new Random();
            System.out.println("start Producer id="+Thread.currentThread().getId());
            try {
                while (isRunning){
                    Thread.sleep(r.nextInt(SLEEPTIME));
                    //构造任务数据
                    data = new PCData(count.incrementAndGet());
                    System.out.println(data+" is put into queue");
                    //提交数据到缓冲区
                    if (!queue.offer(data,2, TimeUnit.SECONDS)){
                        System.out.println("failed to put data: " + data);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
        public void stop(){
            isRunning = false;
        }
    }
    
    
    import java.text.MessageFormat;
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    
    /**
     * 消费者 在内存缓冲区中提取处理任务
     * poll -->【若队列为空,返回null】
     * remove >【若队列为空,抛出NoSuchElementException异常】
     * take -->【若队列为空,发生阻塞,等待有元素】
     */
    public class Consumer implements Runnable{
        private BlockingQueue<PCData> queue;
        private static final int SLEEPTIME = 1000;
    
        public Consumer(BlockingQueue<PCData> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            Random r = new Random();
            System.out.println("start Consumer id="+Thread.currentThread().getId());
            try {
                while (true){
                    PCData data = queue.take();//提取任务
                    int re = data.getData()*data.getData(); //计算平方
                    System.out.println(MessageFormat.format("{0}*{1}={2}"
                                        ,data.getData(),data.getData(),re));
                    Thread.sleep(r.nextInt(SLEEPTIME));
                }
            } catch (Exception e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
    
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * 使用生产者和消费者的客户端
     */
    public class Main {
        public static void main(String[] args) throws InterruptedException {
            //建立缓冲区
            BlockingQueue<PCData> q = new LinkedBlockingQueue<>();
            //建立生产者
            Producer p1 = new Producer(q);
            Producer p2 = new Producer(q);
            Producer p3 = new Producer(q);
            //建立消费者
            Consumer c1 = new Consumer(q);
            Consumer c2 = new Consumer(q);
            Consumer c3 = new Consumer(q);
            //建立线程池
            ExecutorService service = Executors.newCachedThreadPool();
            service.execute(p1);
            service.execute(p2);
            service.execute(p3);
            service.execute(c1);
            service.execute(c2);
            service.execute(c3);
            Thread.sleep(10000);
            //停止生产者
            p1.stop();
            p2.stop();
            p3.stop();
            Thread.sleep(3000);
            service.shutdown();
        }
    }
    
  • 相关阅读:
    HDU 6125
    HDU 6129
    Super Jumping! Jumping! Jumping!
    HDU 1040 As Easy As A+B(排序)
    VS2015转VS2008
    HDU 1329 Hanoi Tower Troubles Again!(乱搞)
    HDU 1062 Text Reverse(字符串)
    HDU 1013 Digital Roots(字符串)
    HDU 1003 Max Sum(动态规划)
    HDU 1203 I NEED A OFFER!(01背包)
  • 原文地址:https://www.cnblogs.com/fly-book/p/11451562.html
Copyright © 2011-2022 走看看