zoukankan      html  css  js  c++  java
  • 阻塞队列和并发队列

      在并发编程中我们有时候需要使用线程安全的队列。要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法

      使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,其中阻塞队列的典型是:BlockingQueue;

      非阻塞的实现方式则可以使用循环CAS的方式来实现,非阻塞队列的典型例子是ConcurrentLinkedQueue。

    注:并行与并发的区别:

    1、并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑; 2、并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

    一、阻塞队列

       常见的阻塞队列如下:

       关于阻塞队例及其说明如下表所示:

      

    基于阻塞队列的生产者-消费者模型

    package cn.thread;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * 多线程模拟实现生产者/消费者模型
     */
    public class BlockingQueueTest2 {
        /**
         * 
         * 定义装苹果的篮子
         * 
         */
        public class Basket {
            // 篮子,能够容纳3个苹果
            BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
    
            // 生产苹果,放入篮子
            public void produce() throws InterruptedException {
                // put方法放入一个苹果,若basket满了,等到basket有位置
                basket.put("An apple");
            }
    
            // 消费苹果,从篮子中取走
            public String consume() throws InterruptedException {
                // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
                return basket.take();
            }
        }
    
        // 定义苹果生产者
        class Producer implements Runnable {
            private String instance;
            private Basket basket;
    
            public Producer(String instance, Basket basket) {
                this.instance = instance;
                this.basket = basket;
            }
    
            public void run() {
                try {
                    while (true) {
                        // 生产苹果
                        System.out.println("生产者准备生产苹果:" + instance);
                        basket.produce();
                        System.out.println("!生产者生产苹果完毕:" + instance);
                        // 休眠300ms
                        Thread.sleep(300);
                    }
                } catch (InterruptedException ex) {
                    System.out.println("Producer Interrupted");
                }
            }
        }
    
        // 定义苹果消费者
        class Consumer implements Runnable {
            private String instance;
            private Basket basket;
    
            public Consumer(String instance, Basket basket) {
                this.instance = instance;
                this.basket = basket;
            }
    
            public void run() {
                try {
                    while (true) {
                        // 消费苹果
                        System.out.println("消费者准备消费苹果:" + instance);
                        System.out.println(basket.consume());
                        System.out.println("!消费者消费苹果完毕:" + instance);
                        // 休眠1000ms
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException ex) {
                    System.out.println("Consumer Interrupted");
                }
            }
        }
    
        public static void main(String[] args) {
            BlockingQueueTest2 test = new BlockingQueueTest2();
    
            // 建立一个装苹果的篮子
            Basket basket = test.new Basket();
    
            ExecutorService service = Executors.newCachedThreadPool();
            Producer producer = test.new Producer("生产者001", basket);
            Producer producer2 = test.new Producer("生产者002", basket);
            Consumer consumer = test.new Consumer("消费者001", basket);
            service.submit(producer);
            service.submit(producer2);
            service.submit(consumer);
        }
    
    }

    二、并发队列

      ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。

      ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现 。

    基于ConcurrentLinkedQueue实现的生产者-消费者模型:

    package cn.thread;
    
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ConcurrentLinkedQueueTest {
        private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
        private static int count = 2; // 线程个数
        //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
        private static CountDownLatch latch = new CountDownLatch(count);
    
        public static void main(String[] args) throws InterruptedException {
            long timeStart = System.currentTimeMillis();
            ExecutorService es = Executors.newFixedThreadPool(4);
            ConcurrentLinkedQueueTest.offer();
            for (int i = 0; i < count; i++) {
                es.submit(new Poll());
            }
            latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
            System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
            es.shutdown();
        }
        
        /**
         * 生产
         */
        public static void offer() {
            for (int i = 0; i < 100000; i++) {
                queue.offer(i);
            }
        }
    
    
        /**
         * 消费
         *  
         */
        static class Poll implements Runnable {
            public void run() {
                // while (queue.size()>0) {
                while (!queue.isEmpty()) {
                    System.out.println(queue.poll());
                }
                latch.countDown();
            }
        }
    }

    参见:http://ifeve.com/concurrentlinkedqueue/

  • 相关阅读:
    [bzoj1023][SHOI2008]cactus仙人掌图【仙人掌】
    [bzoj1022][SHOI2008]小约翰的游戏John【博弈论】
    [bzoj1021][SHOI2008]Debt 循环的债务【dp】
    [bzoj1020][SHOI2008]安全的航线flight【迭代】【计算几何】
    [bzoj1019][SHOI2008]汉诺塔【dp】
    [bzoj1018][SHOI2008]堵塞的交通traffic【线段树】
    [bzoj1017][JSOI2008]魔兽地图DotR【dp】
    Kafka API使用
    zookeeper单机模式安装配置
    Hadoop伪分布式安装
  • 原文地址:https://www.cnblogs.com/moonandstar08/p/5373399.html
Copyright © 2011-2022 走看看