zoukankan      html  css  js  c++  java
  • SynchronousQueue------TransferStack源码分析

    s,e在线程栈里面,TransferStack在堆里面,方法只是线程的执行逻辑。线程过来调用transfer方法,线程在堆里面创建一个节点,加到Stack里面去,然后这个线程归属节点的waiter,阻塞(方法局部变量保留)。配对的线程过来,在堆里创建一个节点加入stack,

    配对后移除2个节点,正在配对时候,有线程带着局部变量e入队或者来交易,什么都不做只是帮助匹配(同时只能一个节点在配对),帮助配对完成之后,该入队就入队该交易就交易。

    package com.itmayiedu;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.LockSupport;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.*;
    import java.util.Spliterator;
    import java.util.Spliterators;
    
    public class SynchronousStack<E>   {
        abstract static class Transferer<E> {
            abstract E transfer(E e, boolean timed, long nanos);
        }
        static final int NCPUS = Runtime.getRuntime().availableProcessors();
        static final int maxTimedSpins = (NCPUS < 2) ? 0 : 32;
        static final int maxUntimedSpins = maxTimedSpins * 16;
        static final long spinForTimeoutThreshold = 1000L;
    
        static final class TransferStack<E> extends Transferer<E> {
            static final int REQUEST    = 0;//消费者 
            static final int DATA       = 1;//生产者 
            static final int FULFILLING = 2;//表示正在进行交易的节点。
            static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }//FULFILLING返回true,是否是正在进行交易的生产者或者消费者。
            static final class SNode {
                volatile SNode next;         
                volatile SNode match;       // 相匹配的节点
                volatile Thread waiter;      // 等待的线程
                //item域和mode域不需要使用volatile修饰,因为它们在volatile/atomic操作之前写,之后读
                Object item;                // item 域
                int mode;//  REQUEST,DATA,FULFILLING
                SNode(Object item) {
                    this.item = item;
                }
                boolean casNext(SNode cmp, SNode val) {
                    return cmp == next &&
                        UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
                }
                boolean tryMatch(SNode s) {//匹配成功,则unpark等待线程
                    if (match == null &&//设置本结点的匹配为s节点
                        UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
                        Thread w = waiter;
                        if (w != null) {    // waiters need at most one unpark
                            waiter = null;
                            LockSupport.unpark(w);
                        }
                        return true;
                    }
                    return match == s;
                }
                void tryCancel() {//取消这个节点,match从原来的null变为this
                    UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
                }
                boolean isCancelled() {
                    return match == this;
                }
                private static final sun.misc.Unsafe UNSAFE;
                private static final long matchOffset;
                private static final long nextOffset;
                static {
                    try {
                        UNSAFE = getUnsafe();//sun.misc.Unsafe.getUnsafe();
                        Class<?> k = SNode.class;
                        matchOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("match"));
                        nextOffset = UNSAFE.objectFieldOffset
                            (k.getDeclaredField("next"));
                    } catch (Exception e) {
                        throw new Error(e);
                    }
                }
            }
            volatile SNode head;//栈的头结点 
            boolean casHead(SNode h, SNode nh) {
                return h == head &&
                    UNSAFE.compareAndSwapObject(this, headOffset, h, nh);//改变头节点需要cas只有一个线程成功
            }
            static SNode snode(SNode s, Object e, SNode next, int mode) {//入账,新进来节点下一个节点是head节点。
                if (s == null) s = new SNode(e);
                s.mode = mode;
                s.next = next;
                return s;
            }
    
            E transfer(E e, boolean timed, long nanos) {
                SNode s = null;
                int mode = (e == null) ? REQUEST : DATA;//消费者是0生产者是1
                for (;;) {
                    SNode h = head;//刚开始头节点为null,第一个进来的节点就是头节点。
                    //入队2步:构建新节点新节点.next=原来头节点原来头节点变为新节点
                    if (h == null || h.mode == mode) {  // 栈为空或者当前节点模式与头节点模式一样,将节点压入栈内,等待匹配
                        if (timed && nanos <= 0) {      // 新进来的入队节点已经超时,还要看一下头节点是否取消了?只有一个头节点,看下头节点。
                            if (h != null && h.isCancelled())// 节点被取消了,向前推进
                                casHead(h, h.next);    
                            else
                                return null;// 头节点没有被取消,但是这个节点已经超时,什么都不做,直接返回
                        } else if (casHead(h, s = snode(s, e, h, mode))) {//线程栈里面构建节点s,头节点指向最新进来的节点s,s.next=原来头节点
                            SNode m = awaitFulfill(s, timed, nanos);//  等待 匹配,线程阻塞时候局部变量保留,唤醒时候再次使用不变。
                            if (m == s) {  // 返回match到的节点m == s节点自己, 表示该节点被取消了或者超时、中断了
                                clean(s);
                                return null;
                            }
                            // 先唤醒在移除节点,唤醒之后这里执行唤醒这里执行时候有可能还没有移除,这里就帮助移除唤醒的是第一个生产节点s=62,此时head是交易节点81, h.next == s
                            if ((h = head) != null && h.next == s) 
                                casHead(h, s.next);// 将s.next节点设置为head,相当于取消节点h、s帮助移除。
                            return (E) ((mode == REQUEST) ? m.item : s.item);
                        }
                    // 取节点,头节点不是正在取节点的节点,头节点没有配对//取节点:线程过来调用transfer方法,线程在堆里面创建一个节点改变模式为FULFILLING,加到Stack里面去,然后配对,移除2个节点,返回值。
                    } else if (!isFulfilling(h.mode)) { 
                        if (h.isCancelled())            // 取节点看头节点有没有取消
                            casHead(h, h.next);         // pop and retry
                        else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//消费者s进来也要入队成为头节点,将这个节点s模式变为FULFILLINGs是交易节点
                            for (;;) { // 比while效率高
                                SNode m = s.next;       //s是交易节点81,m是被匹配的节点62
                                if (m == null) {         // m == null,其他帮助配对的线程移除了
                                    casHead(s, null);    // 将s弹出
                                    s = null;           // 将s置空,下轮循环的时候还会新建,帮助GC
                                    break;              // 退出该循环,继续主循环
                                }
                                SNode mn = m.next;
                                if (m.tryMatch(s)) {//s节点和m节点匹配配对时候唤醒节点
                                    casHead(s, mn);     // 匹配成功,将s 、 m弹出,完成交易之后将两个节点一起弹出,并且返回交易的数据。
                                    return (E) ((mode == REQUEST) ? m.item : s.item);
                                } else                  // lost match
                                    s.casNext(m, mn);   // 如果没有匹配成功m有可能取消了,那么就需要把m从栈中移除。s继续跟mn做交易  没有人指向m就会被回收。
                            }
                        }
    //如果栈顶已经存在一个模式为FULFILLING的节点,说明栈顶的节点正在进行匹配,那么就帮助这个栈顶节点快速完成交易,然后继续交易。
                    } else { 
                        SNode m = h.next;    // h=81,m=62,h是交易节点,m是被配对节点,
                        if (m == null) // m == null ,执行到这一行时候,配对已经被别的帮助线程或者交易节点线程自己执行完了,
                            casHead(h, null);            
                        else {
                            SNode mn = m.next;//跟取节点差不多mn=56
                            if (m.tryMatch(h))    //62和81配对       
                                casHead(h, mn);         //帮助移除。h和m
                            else                       
                                h.casNext(m, mn);//帮助匹配失败,h的下一个节点变为下一个的下一个节点。  没有人指向m就会被回收。
                        }
                    }
                }
            }
            SNode awaitFulfill(SNode s, boolean timed, long nanos) {
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Thread w = Thread.currentThread();
                int spins = (shouldSpin(s) ?//第一个节点要自旋
                             (timed ? maxTimedSpins : maxUntimedSpins) : 0);
                for (;;) {
                    if (w.isInterrupted())
                        s.tryCancel();
                    SNode m = s.match; 
                    if (m != null)
                        return m;//返回这个节点match到的节点
                    if (timed) {
                        nanos = deadline - System.nanoTime();
                        if (nanos <= 0L) {
                            s.tryCancel();
                            continue;
                        }
                    }
                    if (spins > 0)
                        spins = shouldSpin(s) ? (spins-1) : 0;
                    else if (s.waiter == null)
                        s.waiter = w; // establish waiter so can park next iter
                    else if (!timed)
                        LockSupport.park(this);
                    else if (nanos > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanos);
                }
            }
            boolean shouldSpin(SNode s) {//如果当前节点在栈顶,或者正在请求交易,那么就应该自旋//因为很可能立刻就会有新的线程到来,那么就会立刻进行交易而不需要进行阻塞,然后被唤醒,这是需要过程的,所以这样的自旋等待是值得的。
                SNode h = head;
                boolean b = isFulfilling(h.mode);
                return (h == s || h == null || isFulfilling(h.mode));
            }
            void clean(SNode s) {//clean()方法就是将head节点到S节点之间所有已经取消的节点全部移出。
                s.item = null;   // forget item
                s.waiter = null; // forget thread
                SNode past = s.next;//s=39,past=38
                //这个方法首先找到接下来的第一个不为null并且没有被取消交易的节点past,然后设置新的head节点,
                if (past != null && past.isCancelled())
                    past = past.next;
                SNode p;
                while ((p = head) != null && p != past && p.isCancelled())
                    casHead(p, p.next);//p是从头节点开始第一个不移除的节点
                while (p != null && p != past) {//p到past之间的都要移除
                    SNode n = p.next;
                    if (n != null && n.isCancelled())
                        p.casNext(n, n.next);//移除节点n
                    else
                        p = n;//修改p指针
                }
            }
            private static final sun.misc.Unsafe UNSAFE;
            private static final long headOffset;
            static {
                try {
                    UNSAFE = getUnsafe();//sun.misc.Unsafe.getUnsafe();
                    Class<?> k = TransferStack.class;
                    headOffset = UNSAFE.objectFieldOffset
                        (k.getDeclaredField("head"));
                } catch (Exception e) {
                    throw new Error(e);
                }
            }
        }
        private transient volatile Transferer<E> transferer;
        public SynchronousStack() {
            this(false);
        }
        public SynchronousStack(boolean fair) {
            transferer = fair ? new TransferStack<E>() : new TransferStack<E>();
        }
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
                Thread.interrupted();
                throw new InterruptedException();
            }
        }
        public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
                return true;
            if (!Thread.interrupted())
                return false;
            throw new InterruptedException();
        }
    
        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            return transferer.transfer(e, true, 0) != null;
        }
    
        public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            E e = transferer.transfer(null, true, unit.toNanos(timeout));
            if (e != null || !Thread.interrupted())
                return e;
            throw new InterruptedException();
        }
    
        public E poll() {
            return transferer.transfer(null, true, 0);
        } 
        
        private static sun.misc.Unsafe getUnsafe() {
            try {
                return sun.misc.Unsafe.getUnsafe();
            } catch (SecurityException tryReflectionInstead) {}
            try {
                return java.security.AccessController.doPrivileged
                        (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
                            public sun.misc.Unsafe run() throws Exception {
                                Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
                                for (java.lang.reflect.Field f : k.getDeclaredFields()) {
                                    f.setAccessible(true);
                                    Object x = f.get(null);
                                    if (k.isInstance(x))
                                        return k.cast(x);
                                }
                                throw new NoSuchFieldError("the Unsafe");
                            }});
            } catch (java.security.PrivilegedActionException e) {
                throw new RuntimeException("Could not initialize intrinsics",
                        e.getCause());
            }
        }
    }

  • 相关阅读:
    Linux程序的执行
    Linux图形操作与命令行
    Linux网络配置
    Zip文件中文乱码问题解决方法(MAC->Windows)
    我只是一直很努力
    Android抓包方法(三)之Win7笔记本Wifi热点+WireShark工具
    Android抓包方法(二)之Tcpdump命令+Wireshark
    Android抓包方法(一)之Fiddler代理
    Android反编译(二)之反编译XML资源文件
    Android反编译(一)之反编译JAVA源码
  • 原文地址:https://www.cnblogs.com/yaowen/p/10773705.html
Copyright © 2011-2022 走看看