zoukankan      html  css  js  c++  java
  • JUC 并发编程--10, 阻塞队列之--LinkedBlockingDeque 工作窃取, 代码演示

    直接上代码

      class LinkedBlockingDequeDemo {
        // 循环是否结束的开关
        private static volatile boolean flag1 = true;
        private static volatile boolean flag2 = true;
        // 生成者生产的产品
        private static AtomicInteger atomicInteger = new AtomicInteger(1);
        //二个 双端阻塞队列
        private LinkedBlockingDeque linkedBlockingDeque1;
        private LinkedBlockingDeque linkedBlockingDeque2;
    
        public LinkedBlockingDequeDemo(LinkedBlockingDeque linkedBlockingDeque1, LinkedBlockingDeque linkedBlockingDeque2) {
            this.linkedBlockingDeque1 = linkedBlockingDeque1;
            this.linkedBlockingDeque2 = linkedBlockingDeque2;
            System.out.println(linkedBlockingDeque1.getClass().getName());
            System.out.println(linkedBlockingDeque2.getClass().getName());
        }
    
        // 生产者
        public void producer() throws InterruptedException {
            String data = "";
            while (flag1) {
                data = atomicInteger.getAndIncrement() + "";//这是产品
                //TimeUnit.SECONDS.sleep(1);// 1秒生成一个产品
                if (Integer.valueOf(data) <= 10) {
                    //存到 linkedBlockingDeque1 队列中
                    linkedBlockingDeque1.put(data + "队列1");
                    System.out.println(Thread.currentThread().getName() + "添加元素到  阻塞队列 linkedBlockingDeque1,成功,元素为: " + data + "队列1");
                } else {
                    //存到 linkedBlockingDeque2 队列中
                    linkedBlockingDeque2.put(data + "队列2");
                    System.out.println(Thread.currentThread().getName() + "添加元素到  阻塞队列 linkedBlockingDeque2,成功,元素为: " + data + "队列2");
                }
            }
        }
    
        // 消费者
        public void consumer1() throws InterruptedException {
            while (flag2) {
                try {
                    TimeUnit.MILLISECONDS.sleep(3);
                    if (!linkedBlockingDeque1.isEmpty()) {
                        // 自己的队列不为空, 就从自己的队列中取数据消费, 从头开始消费数据
                        System.out.println(Thread.currentThread().getName() + "从自己队列 Deque1中 的头部消费了一个产品:" + linkedBlockingDeque1.takeFirst());
                    } else if (!linkedBlockingDeque2.isEmpty()) {
                        // 如果 另一个队列不为空, 就从他的尾开始消费数据
                        System.out.println(Thread.currentThread().getName() + "从别人队列 Deque2中 的尾部消费了一个产品:" + linkedBlockingDeque2.takeLast());
                    } else {
                        flag2 = false;//这里结束消费
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("------------------------------------------");
            }
        }
    
        public void consumer2() throws InterruptedException {
            while (flag2) {
                try {
                    TimeUnit.MILLISECONDS.sleep(3);
                    if (!linkedBlockingDeque2.isEmpty()) {
                        // 自己的队列不为空, 就从自己的队列中取数据消费, 从头开始消费数据
                        System.out.println(Thread.currentThread().getName() + "从自己队列 Deque2中 的头部消费了一个产品:" + linkedBlockingDeque2.takeFirst());
                    } else if (!linkedBlockingDeque1.isEmpty()) {
                        // 如果 另一个队列不为空, 就从他的尾开始消费数据
                        System.out.println(Thread.currentThread().getName() + "从别人队列 Deque1中 的尾部消费了一个产品:" + linkedBlockingDeque1.takeLast());
                    } else {
                        flag2 = false;//这里结束消费
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("------------------------------------------");
            }
        }
    
        // 停止生产的方法
        public void stop() {
            this.flag1 = false;
        }
    
        public static void main(String[] args) throws InterruptedException {
            //工作窃取代码演示: 什么是工作窃取? 在生产者和消费者模型中,
            // 每个消费者都对应一个阻塞队列(LinkedBlockingDeque),当消费者当前的队列中的任务消费完了后, 不会就此结束,
            // 他会从另一个队列中获取对应的任务来进行消费,
            // 这就是 LinkedBlockingDeque的好处,因为他是双端队列,可以从头和尾 来获取元素
    
            /**
             *
             * 看到的效果就是: 消费者线程1, 从自己的队列Deque1 中, 从头部开始消费1-10, 之后,开始从 别人的队列Deque2中消费产品了
             * 
             * 这里让生产者 1毫秒之后停止生产,然后消费者开始消费
             */
            LinkedBlockingDeque<String> linkedBlockingDeque1 = new LinkedBlockingDeque<>();
            LinkedBlockingDeque<String> linkedBlockingDeque2 = new LinkedBlockingDeque<>();
            LinkedBlockingDequeDemo linkedBlockingDequeDemo = new LinkedBlockingDequeDemo(linkedBlockingDeque1, linkedBlockingDeque2);
    
            new Thread(() -> {
                try {
                    linkedBlockingDequeDemo.producer();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "生产者线程").start();
    
    
            new Thread(() -> {
                try {
                    linkedBlockingDequeDemo.consumer1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "消费者线程1").start();
    
    
            new Thread(() -> {
                try {
                    linkedBlockingDequeDemo.consumer2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "消费者线程2").start();
    
    
            TimeUnit.MILLISECONDS.sleep(1);
            linkedBlockingDequeDemo.stop();
        }
    }
    

    运行结果 : 可以看到 线程1,在自己的阻塞队列中消费完之后, 并没有结束,从别人的队列中来 获取任务来执行




  • 相关阅读:
    ubuntu关闭和开启防火墙
    supervisor 从安装到使用
    suse11/12关闭防火墙
    [Linux实用工具]Linux监控工具munin的展示(Nginx)
    [Linux实用工具]Linux监控工具munin的安装和配置
    [Linux]阿里云免费试用体验(在阿里云的ubuntu上部署个人服务)
    [Linux基础环境/软件]Linux下安装resin web服务器(涉及gcc、jdk环境部署)
    [Linux基础环境/软件]Linux下安装mysql
    [Linux实用工具]Windows下同步Linux文件(Linux安装Samba和配置)
    [Linux实用工具]Ubuntu环境下SSH的安装及使用
  • 原文地址:https://www.cnblogs.com/lvcai/p/13633491.html
Copyright © 2011-2022 走看看