zoukankan      html  css  js  c++  java
  • SynchronousQueue和LinkedBlockingQueue区别

    在多线程开发中,会用到SynchronousQueue(new CachedThreadPool())和LinkedBlockingQueue(new FixedThreadPoll())

    我们来简单分析一下这两个队列的区别

    SynchronousQueue:

      offer():当线程offer操作时,当same mode时,加入队列失败,即时返回 (如果是put操作,元素会存储到队列中,并且阻塞等待);

    当complimentary mode时,立即把元素transfer给等待的take线程

            take():线程take操作,当same mode时,该线程把元素存储到队列中,并且阻塞等待(如果是poll操作,元素会加入队列失败,即时返回);

    当complimentary mode时,立即在队列中找到等待的put线程关联的元素,取出来,返回

    LinkedBlockingQueue

      offer(): 线程把元素放入队列中(多线程并发竞争),返回,超过bound,返回失败

     1 /** Lock held by take, poll, etc */
     2     private final ReentrantLock takeLock = new ReentrantLock();
     3 
     4     /** Wait queue for waiting takes */
     5     private final Condition notEmpty = takeLock.newCondition();
     6 
     7     /** Lock held by put, offer, etc */
     8     private final ReentrantLock putLock = new ReentrantLock();
     9 
    10     /** Wait queue for waiting puts */
    11     private final Condition notFull = putLock.newCondition();
    12 
    13 
    14 public boolean offer(E e) {
    15         if (e == null) throw new NullPointerException();
    16         final AtomicInteger count = this.count;
    17         if (count.get() == capacity)
    18             return false;
    19         int c = -1;
    20         Node<E> node = new Node<E>(e);
    21         final ReentrantLock putLock = this.putLock;
    22         putLock.lock();
    23         try {
    24             if (count.get() < capacity) {
    25                 enqueue(node);
    26                 c = count.getAndIncrement();
    27                 if (c + 1 < capacity)
    28                     notFull.signal();
    29             }
    30         } finally {
    31             putLock.unlock();
    32         }
    33         if (c == 0)
    34             signalNotEmpty();
    35         return c >= 0;
    36     }
    offer方法

           take(); 队列不为空时,获取元素(多线程并发竞争),为空时,阻塞等待

     1 public E take() throws InterruptedException {
     2         E x;
     3         int c = -1;
     4         final AtomicInteger count = this.count;
     5         final ReentrantLock takeLock = this.takeLock;
     6         takeLock.lockInterruptibly();
     7         try {
     8             while (count.get() == 0) {
     9                 notEmpty.await();
    10             }
    11             x = dequeue();
    12             c = count.getAndDecrement();
    13             if (c > 1)
    14                 notEmpty.signal();
    15         } finally {
    16             takeLock.unlock();
    17         }
    18         if (c == capacity)
    19             signalNotFull();
    20         return x;
    21     }
    View Code

    设想一种场景:

      当有大量线程在offer和take时,

      1、LinkedBlockingQueue在入队和出队时,并发竞争激烈,cpu线程切换频繁,性能较低;

       2、SynchronousQueue中每一个元素对应一个put线程、一个take线程,不会存在锁竞争

      但是反过来 SynchronousQueue需要的线程数较多,如果take消费不及时,会导致put线程阻塞(如果是使用offer的话,会加入队列失败),

    在new CachedThreadPool()中,当执行execute()时

     1 public void execute(Runnable command) {
     2         if (command == null)
     3             throw new NullPointerException();
     4         /*
     5          * Proceed in 3 steps:
     6          *
     7          * 1. If fewer than corePoolSize threads are running, try to
     8          * start a new thread with the given command as its first
     9          * task.  The call to addWorker atomically checks runState and
    10          * workerCount, and so prevents false alarms that would add
    11          * threads when it shouldn't, by returning false.
    12          *
    13          * 2. If a task can be successfully queued, then we still need
    14          * to double-check whether we should have added a thread
    15          * (because existing ones died since last checking) or that
    16          * the pool shut down since entry into this method. So we
    17          * recheck state and if necessary roll back the enqueuing if
    18          * stopped, or start a new thread if there are none.
    19          *
    20          * 3. If we cannot queue task, then we try to add a new
    21          * thread.  If it fails, we know we are shut down or saturated
    22          * and so reject the task.
    23          */
    24         int c = ctl.get();
    25         if (workerCountOf(c) < corePoolSize) {
    26             if (addWorker(command, true))
    27                 return;
    28             c = ctl.get();
    29         }
           //same mode时,入队失败,返回false
    //complimentary mode时 入队成功返回true
    30 if (isRunning(c) && workQueue.offer(command)) { 31 int recheck = ctl.get(); 32 if (! isRunning(recheck) && remove(command)) 33 reject(command); 34 else if (workerCountOf(recheck) == 0) 35 addWorker(null, false); 36 }
           //当大批量任务进来,command执行时间较长,会大量入队失败
           //,并且cachedThreadPool,最大线程数为MAX,会无节制的创建大量线程
    37 else if (!addWorker(command, false)) 38 reject(command); 39 }
  • 相关阅读:
    170619、springboot编程之HelloWorld
    170616、解决 java.lang.IllegalArgumentException: No converter found for return value of type: class java.util.ArrayList
    170615、spring不同数据库数据源动态切换
    pytest文档10-命令行传参
    pytest文档9-参数化parametrize
    pytest文档8-html报告报错截图+失败重跑
    pytest文档7-pytest-html生成html报告
    定位对应关系
    ADB 无线连接
    command failed shell "ps 'uiautomator'"的解决方式
  • 原文地址:https://www.cnblogs.com/toUpdating/p/10117191.html
Copyright © 2011-2022 走看看