zoukankan      html  css  js  c++  java
  • 【Java并发编程】4、JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue的差别

    转自:http://blog.csdn.net/aitangyong/article/details/46472643

    JDK7对JDK5中的J.U.C并发工具进行了增强,其中之一就是新增了TransferQueue。Java并发相关的JSR规范,可以查看Doug Lea维护的blog。现在简单介绍下这个类的使用方式。

    [java] view plain copy
     
    1. public interface TransferQueue<E> extends BlockingQueue<E>  
    2. {  
    3.     /** 
    4.      * Transfers the element to a waiting consumer immediately, if possible. 
    5.      * 
    6.      * <p>More precisely, transfers the specified element immediately 
    7.      * if there exists a consumer already waiting to receive it (in 
    8.      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 
    9.      * otherwise returning {@code false} without enqueuing the element. 
    10.      * 
    11.      * @param e the element to transfer 
    12.      * @return {@code true} if the element was transferred, else 
    13.      *         {@code false} 
    14.      * @throws ClassCastException if the class of the specified element 
    15.      *         prevents it from being added to this queue 
    16.      * @throws NullPointerException if the specified element is null 
    17.      * @throws IllegalArgumentException if some property of the specified 
    18.      *         element prevents it from being added to this queue 
    19.      */  
    20.     boolean tryTransfer(E e);  
    21.   
    22.     /** 
    23.      * Transfers the element to a consumer, waiting if necessary to do so. 
    24.      * 
    25.      * <p>More precisely, transfers the specified element immediately 
    26.      * if there exists a consumer already waiting to receive it (in 
    27.      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 
    28.      * else waits until the element is received by a consumer. 
    29.      * 
    30.      * @param e the element to transfer 
    31.      * @throws InterruptedException if interrupted while waiting, 
    32.      *         in which case the element is not left enqueued 
    33.      * @throws ClassCastException if the class of the specified element 
    34.      *         prevents it from being added to this queue 
    35.      * @throws NullPointerException if the specified element is null 
    36.      * @throws IllegalArgumentException if some property of the specified 
    37.      *         element prevents it from being added to this queue 
    38.      */  
    39.     void transfer(E e) throws InterruptedException;  
    40.   
    41.     /** 
    42.      * Transfers the element to a consumer if it is possible to do so 
    43.      * before the timeout elapses. 
    44.      * 
    45.      * <p>More precisely, transfers the specified element immediately 
    46.      * if there exists a consumer already waiting to receive it (in 
    47.      * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), 
    48.      * else waits until the element is received by a consumer, 
    49.      * returning {@code false} if the specified wait time elapses 
    50.      * before the element can be transferred. 
    51.      * 
    52.      * @param e the element to transfer 
    53.      * @param timeout how long to wait before giving up, in units of 
    54.      *        {@code unit} 
    55.      * @param unit a {@code TimeUnit} determining how to interpret the 
    56.      *        {@code timeout} parameter 
    57.      * @return {@code true} if successful, or {@code false} if 
    58.      *         the specified waiting time elapses before completion, 
    59.      *         in which case the element is not left enqueued 
    60.      * @throws InterruptedException if interrupted while waiting, 
    61.      *         in which case the element is not left enqueued 
    62.      * @throws ClassCastException if the class of the specified element 
    63.      *         prevents it from being added to this queue 
    64.      * @throws NullPointerException if the specified element is null 
    65.      * @throws IllegalArgumentException if some property of the specified 
    66.      *         element prevents it from being added to this queue 
    67.      */  
    68.     boolean tryTransfer(E e, long timeout, TimeUnit unit)  
    69.         throws InterruptedException;  
    70.   
    71.     /** 
    72.      * Returns {@code true} if there is at least one consumer waiting 
    73.      * to receive an element via {@link #take} or 
    74.      * timed {@link #poll(long,TimeUnit) poll}. 
    75.      * The return value represents a momentary state of affairs. 
    76.      * 
    77.      * @return {@code true} if there is at least one waiting consumer 
    78.      */  
    79.     boolean hasWaitingConsumer();  
    80.   
    81.     /** 
    82.      * Returns an estimate of the number of consumers waiting to 
    83.      * receive elements via {@link #take} or timed 
    84.      * {@link #poll(long,TimeUnit) poll}.  The return value is an 
    85.      * approximation of a momentary state of affairs, that may be 
    86.      * inaccurate if consumers have completed or given up waiting. 
    87.      * The value may be useful for monitoring and heuristics, but 
    88.      * not for synchronization control.  Implementations of this 
    89.      * method are likely to be noticeably slower than those for 
    90.      * {@link #hasWaitingConsumer}. 
    91.      * 
    92.      * @return the number of consumers waiting to receive elements 
    93.      */  
    94.     int getWaitingConsumerCount();  
    95. }  

    可以看到TransferQueue同时也是一个阻塞队列,它具备阻塞队列的所有特性,主要介绍下上面5个新增API的作用。

     

    1.transfer(E e)若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。

    [java] view plain copy
     
    1. public class TransferQueueDemo {  
    2.   
    3.     private static TransferQueue<String> queue = new LinkedTransferQueue<String>();  
    4.   
    5.     public static void main(String[] args) throws Exception {  
    6.   
    7.         new Productor(1).start();  
    8.   
    9.         Thread.sleep(100);  
    10.   
    11.         System.out.println("over.size=" + queue.size());  
    12.     }  
    13.   
    14.     static class Productor extends Thread {  
    15.         private int id;  
    16.   
    17.         public Productor(int id) {  
    18.             this.id = id;  
    19.         }  
    20.   
    21.         @Override  
    22.         public void run() {  
    23.             try {  
    24.                 String result = "id=" + this.id;  
    25.                 System.out.println("begin to produce." + result);  
    26.                 queue.transfer(result);  
    27.                 System.out.println("success to produce." + result);  
    28.             } catch (InterruptedException e) {  
    29.                 e.printStackTrace();  
    30.             }  
    31.         }  
    32.     }  
    33. }  

    可以看到生产者线程会阻塞,因为调用transfer()的时候并没有消费者在等待获取数据。队列长度变成了1,说明元素e没有移交成功的时候,会被插入到阻塞队列的尾部。

    2.tryTransfer(E e)若当前存在一个正在等待获取的消费者线程,则该方法会即刻转移e,并返回true;若不存在则返回false,但是并不会将e插入到队列中。这个方法不会阻塞当前线程,要么快速返回true,要么快速返回false。

    3.hasWaitingConsumer()和getWaitingConsumerCount()用来判断当前正在等待消费的消费者线程个数。

     

    4.tryTransfer(E e, long timeout, TimeUnit unit) 若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将元素e插入到队列尾部,并且等待被消费者线程获取消费掉。若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素从队列中移除。

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. public class TransferQueueDemo {  
    2.   
    3.     private static TransferQueue<String> queue = new LinkedTransferQueue<String>();  
    4.   
    5.     public static void main(String[] args) throws Exception {  
    6.   
    7.         new Productor(1).start();  
    8.   
    9.         Thread.sleep(100);  
    10.   
    11.         System.out.println("over.size=" + queue.size());//1  
    12.           
    13.         Thread.sleep(1500);  
    14.   
    15.         System.out.println("over.size=" + queue.size());//0  
    16.     }  
    17.   
    18.     static class Productor extends Thread {  
    19.         private int id;  
    20.   
    21.         public Productor(int id) {  
    22.             this.id = id;  
    23.         }  
    24.   
    25.         @Override  
    26.         public void run() {  
    27.             try {  
    28.                 String result = "id=" + this.id;  
    29.                 System.out.println("begin to produce." + result);  
    30.                 queue.tryTransfer(result, 1, TimeUnit.SECONDS);  
    31.                 System.out.println("success to produce." + result);  
    32.             } catch (Exception e) {  
    33.                 e.printStackTrace();  
    34.             }  
    35.         }  
    36.     }  
    37. }  

    第一次还没到指定的时间,元素被插入到队列中了,所有队列长度是1;第二次指定的时间片耗尽,元素从队列中移除了,所以队列长度是0。

     

    这篇文章中讲了SynchronousQueue的使用方式,可以看到TransferQueue也具有SynchronousQueue的所有功能,但是TransferQueue的功能更强大。这篇文章中提到了这2个API的区别:

    [plain] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. TransferQueue is more generic and useful than SynchronousQueue however as it allows you to flexibly decide whether to use normal BlockingQueue   
    2. semantics or a guaranteed hand-off. In the case where items are already in the queue, calling transfer will guarantee that all existing queue   
    3. items will be processed before the transferred item.  
    4.   
    5.   
    6. SynchronousQueue implementation uses dual queues (for waiting producers and waiting consumers) and protects both queues with a single lock. The  
    7.  LinkedTransferQueue implementation uses CAS operations to form a nonblocking implementation and that is at the heart of avoiding serialization  
    8.  bottlenecks.  
  • 相关阅读:
    鼠标效果
    全选与全不选
    正则表达式
    下拉菜单
    图片轮播
    弹出层
    间隔与延时
    JS基础
    引入
    样式表 文字
  • 原文地址:https://www.cnblogs.com/wangzhongqiu/p/6441703.html
Copyright © 2011-2022 走看看