zoukankan      html  css  js  c++  java
  • SynchronousQueue和LinkedBlockingQueue区别

    在多线程开发中,会用到SynchronousQueue(new CachedThreadPool())和LinkedBlockingQueue(new FixedThreadPoll())

    我们来简单分析一下这两个队列的区别

    SynchronousQueue:

      offer():当线程offer操作时,当same mode时,加入队列失败,即时返回 (如果是put操作,元素会存储到队列中,并且阻塞等待);

    当complimentary mode时,立即把元素transfer给等待的take线程

            take():线程take操作,当same mode时,该线程把元素存储到队列中,并且阻塞等待(如果是poll操作,元素会加入队列失败,即时返回);

    当complimentary mode时,立即在队列中找到等待的put线程关联的元素,取出来,返回

    LinkedBlockingQueue

      offer(): 线程把元素放入队列中(多线程并发竞争),返回,超过bound,返回失败

     1 /** Lock held by take, poll, etc */
     2     private final ReentrantLock takeLock = new ReentrantLock();
     3 
     4     /** Wait queue for waiting takes */
     5     private final Condition notEmpty = takeLock.newCondition();
     6 
     7     /** Lock held by put, offer, etc */
     8     private final ReentrantLock putLock = new ReentrantLock();
     9 
    10     /** Wait queue for waiting puts */
    11     private final Condition notFull = putLock.newCondition();
    12 
    13 
    14 public boolean offer(E e) {
    15         if (e == null) throw new NullPointerException();
    16         final AtomicInteger count = this.count;
    17         if (count.get() == capacity)
    18             return false;
    19         int c = -1;
    20         Node<E> node = new Node<E>(e);
    21         final ReentrantLock putLock = this.putLock;
    22         putLock.lock();
    23         try {
    24             if (count.get() < capacity) {
    25                 enqueue(node);
    26                 c = count.getAndIncrement();
    27                 if (c + 1 < capacity)
    28                     notFull.signal();
    29             }
    30         } finally {
    31             putLock.unlock();
    32         }
    33         if (c == 0)
    34             signalNotEmpty();
    35         return c >= 0;
    36     }
    offer方法

           take(); 队列不为空时,获取元素(多线程并发竞争),为空时,阻塞等待

     1 public E take() throws InterruptedException {
     2         E x;
     3         int c = -1;
     4         final AtomicInteger count = this.count;
     5         final ReentrantLock takeLock = this.takeLock;
     6         takeLock.lockInterruptibly();
     7         try {
     8             while (count.get() == 0) {
     9                 notEmpty.await();
    10             }
    11             x = dequeue();
    12             c = count.getAndDecrement();
    13             if (c > 1)
    14                 notEmpty.signal();
    15         } finally {
    16             takeLock.unlock();
    17         }
    18         if (c == capacity)
    19             signalNotFull();
    20         return x;
    21     }
    View Code

    设想一种场景:

      当有大量线程在offer和take时,

      1、LinkedBlockingQueue在入队和出队时,并发竞争激烈,cpu线程切换频繁,性能较低;

       2、SynchronousQueue中每一个元素对应一个put线程、一个take线程,不会存在锁竞争

      但是反过来 SynchronousQueue需要的线程数较多,如果take消费不及时,会导致put线程阻塞(如果是使用offer的话,会加入队列失败),

    在new CachedThreadPool()中,当执行execute()时

     1 public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         /*
     5          * Proceed in 3 steps:
     6          *
     7          * 1. If fewer than corePoolSize threads are running, try to
     8          * start a new thread with the given command as its first
     9          * task.  The call to addWorker atomically checks runState and
    10          * workerCount, and so prevents false alarms that would add
    11          * threads when it shouldn't, by returning false.
    12          *
    13          * 2. If a task can be successfully queued, then we still need
    14          * to double-check whether we should have added a thread
    15          * (because existing ones died since last checking) or that
    16          * the pool shut down since entry into this method. So we
    17          * recheck state and if necessary roll back the enqueuing if
    18          * stopped, or start a new thread if there are none.
    19          *
    20          * 3. If we cannot queue task, then we try to add a new
    21          * thread.  If it fails, we know we are shut down or saturated
    22          * and so reject the task.
    23          */
    24         int c = ctl.get();
    25         if (workerCountOf(c) < corePoolSize) {
    26             if (addWorker(command, true))
    27                 return;
    28             c = ctl.get();
    29         }
           //same mode时,入队失败,返回false
    //complimentary mode时 入队成功返回true
    30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 }
           //当大批量任务进来,command执行时间较长,会大量入队失败
           //,并且cachedThreadPool,最大线程数为MAX,会无节制的创建大量线程
    37 else if (!addWorker(command, false)) 38 reject(command); 39 }
  • 相关阅读:
    Centos 7 zabbix 实战应用
    Centos7 Zabbix添加主机、图形、触发器
    Centos7 Zabbix监控部署
    Centos7 Ntp 时间服务器
    Linux 150命令之查看文件及内容处理命令 cat tac less head tail cut
    Kickstart 安装centos7
    Centos7与Centos6的区别
    Linux 150命令之 文件和目录操作命令 chattr lsattr find
    Linux 发展史与vm安装linux centos 6.9
    Linux介绍
  • 原文地址:https://www.cnblogs.com/toUpdating/p/10117191.html
Copyright © 2011-2022 走看看