zoukankan      html  css  js  c++  java
  • Java核心知识点学习----多线程中的阻塞队列,ArrayBlockingQueue介绍

    1.什么是阻塞队列?

    所谓队列,遵循的是先进先出原则(FIFO),阻塞队列,即是数据共享时,A在写数据时,B想读同一数据,那么就将发生阻塞了.

    看一下线程的四种状态,首先是新创建一个线程,然后,通过start方法启动线程--->线程变为可运行可执行状态,然后通过数据产生共享,线程产生互斥---->线程状态变为阻塞状态---->阻塞状态想打开的话可以调用notify方法.

    这里Java5中提供了封装好的类,可以直接调用然后构造阻塞状态,以保证数据的原子性.

    2.如何实现?

    主要是实现BlockingQueue接口.

    比较常见的实现有:ArrayBlockingQueue,LinkedBlockingQueue,DelayedWorkQueue等竺

    这里简单介绍ArrayBlockingQueue;

    //方式1
    new ArrayBlockingQueue(int capacity);//Parameters:capacity the capacity of this queue
    
    //方式2
    public ArrayBlockingQueue(int capacity, boolean fair) ;//fair if true then queue accesses for threads blocked on insertion or removal, are processed in 
         FIFO order; if false the access order is unspecified.
    
    //方式3
     public ArrayBlockingQueue(int capacity, boolean fair,
                                  Collection<? extends E> c) ;//c the collection of elements to initially contain
    
        

    三种构造函数,比较常用的是1,2两种,2比1只是多了要不要排序,如果排序,那就是FIFO原则,即先进先出.

    3.举例:

    package com.amos.concurrent;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    /** 
    * @ClassName: BlockingQueueTest 
    * @Description: Java5新特性,阻塞队列
    * @author: amosli
    * @email:hi_amos@outlook.com
    * @date Apr 27, 2014 10:01:51 PM  
    */
    public class BlockingQueueTest {
        public static void main(String[] args) {
            final BlockingQueue queue = new ArrayBlockingQueue(3);
            for(int i=0;i<2;i++){
                new Thread(){
                    public void run(){
                        while(true){
                            try {
                                Thread.sleep((long)(Math.random()*1000));
                                System.out.println(Thread.currentThread().getName() + "准备放数据!");                            
                                queue.put(1);
                                System.out.println(Thread.currentThread().getName() + "已经放了数据," +                             
                                            "队列目前有" + queue.size() + "个数据");
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                        }
                    }
                    
                }.start();
            }
            
            new Thread(){
                public void run(){
                    while(true){
                        try {
                            //将此处的睡眠时间分别改为100和1000,观察运行结果
                            Thread.sleep(100);
                            System.out.println(Thread.currentThread().getName() + "准备取数据!");
                            queue.take();
                            System.out.println(Thread.currentThread().getName() + "已经取走数据," +                             
                                    "队列目前有" + queue.size() + "个数据");                    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
                
            }.start();            
        }
    }

    效果如下图所示:

    说明:这里新建一个队列后,主要调用的是put和take两种方法,一个是存,一个是取,这里由于将取的间隔时间设置的比较短,所以基本队列就没放满过.

    4.改写之前的代码

    方法1:Java核心知识点学习----多线程并发之线程间的通信,notify,wait

    方法2:Java核心知识点学习----使用Condition控制线程通信

    方法3:使用ArrayBlockingQueue

    package com.amos.concurrent;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /** 
    * @ClassName: BlockingQueueCondition 
    * @Description: 在前面用Condition实现的同步通知的例子的基础上,改为用阻塞队列来实现。
    第一个线程:A.take()……..B.put()
    第二个线程:B.take()……..A.put()
    * @author: amosli
    * @email:hi_amos@outlook.com
    * @date Apr 28, 2014 12:57:51 AM  
    */
    public class BlockingQueueCondition {
    
        public static void main(String[] args) {
            ExecutorService service = Executors.newSingleThreadExecutor();
            final Business3 business = new Business3();
            service.execute(new Runnable(){
    
                public void run() {
                    for(int i=0;i<2;i++){
                        business.sub();
                    }
                }
                
            });
            
            for(int i=0;i<3;i++){
                business.main();
            }
        }
    
    }
    
    class Business3{
        BlockingQueue subQueue = new ArrayBlockingQueue(1);
        BlockingQueue mainQueue = new ArrayBlockingQueue(1);
        {
            try {
                mainQueue.put(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public void sub(){
            try
            {
                mainQueue.take();
                for(int i=0;i<10;i++){
                    System.out.println(Thread.currentThread().getName() + " : " + i);
                }
                subQueue.put(1);
            }catch(Exception e){
    
            }
        }
        
        public void main(){
            
            try
            {
                subQueue.take();
                for(int i=0;i<5;i++){
                    System.out.println(Thread.currentThread().getName() + " : " + i);
                }
                mainQueue.put(1);
            }catch(Exception e){
            }        
        }
    }

    功能的实现都是完全一样的,不同的是,使用ArrayBlockingQueue会更简单.

    5.官方---代码示例

    典型的生产者消费者模型:

    package com.amos.concurrent;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    /** 
    * @ClassName: BlockingSample 
    * @Description: 生产者消费者模型,阻塞,只有生产好了,才能去消费
    * @author: amosli
    * @email:hi_amos@outlook.com
    * @date Apr 28, 2014 1:44:07 AM  
    */
    public class BlockingSample {
        public static void main(String[] args) {
            new BlockingSample().new Setup().main();
        }
        class Producer implements Runnable {
               private final BlockingQueue queue;
               Producer(BlockingQueue q) { queue = q; }
               public void run() {
                 try {
                   while (true) { queue.put(produce());
                   System.out.println(Thread.currentThread().getName()+" 现在正在生产!");
                   }
                 } catch (Exception ex) { ex.printStackTrace();}
               }
               String produce() { System.out.println("produce now ....");
                return "produce"; }
             }
    
             class Consumer implements Runnable {
               private final BlockingQueue queue;
               Consumer(BlockingQueue q) { queue = q; }
               public void run() {
                 try {
                   while (true) { consume(queue.take()); 
                   System.out.println(Thread.currentThread().getName()+" 现在正在消费!");
                   }
                 } catch (InterruptedException ex) { ex.printStackTrace();}
               }
               void consume(Object x) {System.out.println("consume...");}
             }
    
             class Setup {
               void main() {
                 BlockingQueue q = new ArrayBlockingQueue<String>(1);
                 Producer p = new Producer(q);
                 Consumer c1 = new Consumer(q);
                 Consumer c2 = new Consumer(q);
                 new Thread(p).start();
                 new Thread(c1).start();
                 new Thread(c2).start();
               }
             }
             
    }

    效果如下图所示:

    由图上可以看出,只有生产了才能消费,否则会形成阻塞.

     

     

     

  • 相关阅读:
    Shell Sort 希尔排序
    Quick Sort 快速排序
    Merge Sort 归并排序
    Insertion Sort
    Bubble Sort
    dubbo的异常栈问题
    IoC 容器
    .Net Core集成RabbitMQ
    .NET CORE Skywalking的集成
    制造业的信息化之路
  • 原文地址:https://www.cnblogs.com/amosli/p/3695558.html
Copyright © 2011-2022 走看看