zoukankan      html  css  js  c++  java
  • SynchronousQueue类

    使用示例:

    public static void main(String[] args) {
        final SynchronousQueue<String> q = new SynchronousQueue<String>();
        //put线程
        class Putter implements Runnable {
            String title;
            public Putter(String title) {
                this.title = title;
            }
            @Override
            public void run() {
                try {
                    q.put(this.title);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }  
        new Thread(new Putter("h1")).start();
        new Thread(new Putter("h2")).start();
        
        //take线程
        class Taker implements Runnable {
            @Override
            public void run() {
                String take;
                try {
                    take = q.take();
                    System.out.printf("%s take %s
    ", Thread.currentThread().getName(), take);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }  
        
        new Thread(new Taker()).start();
        new Thread(new Taker()).start();
    }

     SynchronousQueue的put和take是阻塞的,一个线程put,然后阻塞,等待另一个线程take;或者一个线程take,然后阻塞,等待另一个线程put。

    阻塞线程:

    SNode awaitFulfill(SNode s, boolean timed, long nanos) {
        long lastTime = timed ? System.nanoTime() : 0;
        Thread w = Thread.currentThread();
        SNode h = head;
        //自旋次数
        int spins = (shouldSpin(s) ?
                     (timed ? maxTimedSpins : maxUntimedSpins) : 0);
        for (;;) {
            if (w.isInterrupted())
                s.tryCancel();
            SNode m = s.match;
            if (m != null)
                return m;
            if (timed) {
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0) {
                    s.tryCancel();
                    continue;
                }
            }
            if (spins > 0)
                spins = shouldSpin(s) ? (spins-1) : 0;
            else if (s.waiter == null)
                //自旋结束后,设置waiter
                s.waiter = w; // establish waiter so can park next iter
            else if (!timed)
                //阻塞线程
                LockSupport.park(this);
            else if (nanos > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanos);
        }
    }

     唤醒线程:

    boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                //唤醒线程
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s;
    }

    Executors.newCachedThreadPool使用的队列是SynchronousQueue,入队使用offer,出队使用poll。

    put和offer的区别:put会阻塞,而offer不会,不满足入队条件即返回。

  • 相关阅读:
    BZOJ 1191 HNOI2006 超级英雄hero
    BZOJ 2442 Usaco2011 Open 修建草坪
    BZOJ 1812 IOI 2005 riv
    OJ 1159 holiday
    BZOJ 1491 NOI 2007 社交网络
    NOIP2014 D1 T3
    BZOJ 2423 HAOI 2010 最长公共子序列
    LCA模板
    NOIP 2015 D1T2信息传递
    数据结构
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8289261.html
Copyright © 2011-2022 走看看