zoukankan      html  css  js  c++  java
  • 【Java】NIO中Selector的创建源码分析

    在使用Selector时首先需要通过静态方法open创建Selector对象

    1 public static Selector open() throws IOException {
    2         return SelectorProvider.provider().openSelector();
    3 }

    可以看到首先是调用SelectorProvider的静态方法provider,得到一个Selector的提供者

     1 public static SelectorProvider provider() {
     2     synchronized (lock) {
     3         if (provider != null)
     4             return provider;
     5         return AccessController.doPrivileged(
     6             new PrivilegedAction<SelectorProvider>() {
     7                 public SelectorProvider run() {
     8                         if (loadProviderFromProperty())
     9                             return provider;
    10                         if (loadProviderAsService())
    11                             return provider;
    12                         provider = sun.nio.ch.DefaultSelectorProvider.create();
    13                         return provider;
    14                     }
    15                 });
    16     }
    17 }

    这段代码的逻辑也比较简单,首先判断provider是否已经产生,若已经产生,则直接返回现有的;若没有,则需要调用AccessController的静态方法doPrivileged,该方法是一个native方法,就不说了;可以看到在实现的PrivilegedAction接口中的run方法,做了三次判断:

    第一次是根据是系统属性,使用ClassLoader类加载:

     1 private static boolean loadProviderFromProperty() {
     2     String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
     3     if (cn == null)
     4         return false;
     5     try {
     6         Class<?> c = Class.forName(cn, true,
     7                                    ClassLoader.getSystemClassLoader());
     8         provider = (SelectorProvider)c.newInstance();
     9         return true;
    10     } catch (ClassNotFoundException x) {
    11         throw new ServiceConfigurationError(null, x);
    12     } catch (IllegalAccessException x) {
    13         throw new ServiceConfigurationError(null, x);
    14     } catch (InstantiationException x) {
    15         throw new ServiceConfigurationError(null, x);
    16     } catch (SecurityException x) {
    17         throw new ServiceConfigurationError(null, x);
    18     }
    19 }

    先获取键值为"java.nio.channels.spi.SelectorProvider"的属性,若没有,则直接返回false;若设置了,则需要使用加载器直接加载系统属性设置的java.nio.channels.spi.SelectorProvider的实现类,再通过反射机制直接产生实例对象并赋值给静态成员provider,最后返回true。

    第二次使用ServiceLoader加载:

     1 private static boolean loadProviderAsService() {
     2     ServiceLoader<SelectorProvider> sl =
     3         ServiceLoader.load(SelectorProvider.class,
     4                            ClassLoader.getSystemClassLoader());
     5     Iterator<SelectorProvider> i = sl.iterator();
     6     for (;;) {
     7         try {
     8             if (!i.hasNext())
     9                 return false;
    10             provider = i.next();
    11             return true;
    12         } catch (ServiceConfigurationError sce) {
    13             if (sce.getCause() instanceof SecurityException) {
    14                 // Ignore the security exception, try the next provider
    15                 continue;
    16             }
    17             throw sce;
    18         }
    19     }
    20 }

    有关ServiceLoader的加载过程可以看我的上一篇博客【Java】ServiceLoader源码分析,在这里我就不累赘了。
    该方法调用ServiceLoader的load加载在"META-INF/services/"路径下指明的SelectorProvider.class的实现类(其实是懒加载,在迭代时才真正加载)得到ServiceLoader对象,通过该对象的带迭代器,遍历这个迭代器;可以看到若是迭代器不为空,则直接返回迭代器保存的第一个元素,即第一个被加载的类的对象,并赋值给provider,返回true;否则返回false;

    第三次是使用的默认的SelectorProvider(windows环境为例):

    1 public class DefaultSelectorProvider {
    2     private DefaultSelectorProvider() {
    3     }
    4 
    5     public static SelectorProvider create() {
    6         return new WindowsSelectorProvider();
    7     }
    8 }

    可以看到直接返回了WindowsSelectorProvider赋值给provider ;

    此时provider无论如何都已经有了,接下来就是调用provider的openSelector方法。

    WindowsSelectorProvider的openSelector方法:

    1 public class WindowsSelectorProvider extends SelectorProviderImpl {
    2     public WindowsSelectorProvider() {
    3     }
    4 
    5     public AbstractSelector openSelector() throws IOException {
    6         return new WindowsSelectorImpl(this);
    7     }
    8 }

    可以看到仅仅是产生了WindowsSelectorImpl:

    1 WindowsSelectorImpl(SelectorProvider var1) throws IOException {
    2     super(var1);
    3     this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal();
    4     SinkChannelImpl var2 = (SinkChannelImpl)this.wakeupPipe.sink();
    5     var2.sc.socket().setTcpNoDelay(true);
    6     this.wakeupSinkFd = var2.getFDVal();
    7     this.pollWrapper.addWakeupSocket(this.wakeupSourceFd, 0);
    8 }

    WindowsSelectorImpl首先调用父类SelectorImpl的构造方法:

     1 protected Set<SelectionKey> selectedKeys = new HashSet();
     2 protected HashSet<SelectionKey> keys = new HashSet();
     3 private Set<SelectionKey> publicKeys;
     4 private Set<SelectionKey> publicSelectedKeys;
     5 
     6 protected SelectorImpl(SelectorProvider var1) {
     7     super(var1);
     8     if (Util.atBugLevel("1.4")) {
     9         this.publicKeys = this.keys;
    10         this.publicSelectedKeys = this.selectedKeys;
    11     } else {
    12         this.publicKeys = Collections.unmodifiableSet(this.keys);
    13         this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
    14     }
    15 
    16 }

    SelectorImpl同样调用父类AbstractSelector的构造:

    1 protected AbstractSelector(SelectorProvider provider) {
    2         this.provider = provider;
    3 }

    此时的provider就是刚才产生的WindowsSelectorProvider对象;
    在SelectorImpl中还会对其成员有一系列的赋值操作;
    上述都完成后才继续完成WindowsSelectorImpl的构造。

    WindowsSelectorImpl在进行this.wakeupSourceFd = ((SelChImpl)this.wakeupPipe.source()).getFDVal()之前,其wakeupPipe成员如下:

    1 private final Pipe wakeupPipe = Pipe.open();

    wakeupPipe管道通过Pipe.open()赋值:

    1 public static Pipe open() throws IOException {
    2     return SelectorProvider.provider().openPipe();
    3 }

    可以看到实际上 SelectorProvider.provider()的provider的openPipe方法,而这个provider就是WindowsSelectorProvider,而WindowsSelectorProvider继承自SelectorProviderImpl,openPipe方法是在SelectorProviderImpl里实现的:

    1 public Pipe openPipe() throws IOException {
    2     return new PipeImpl(this);
    3 }

    该方法直接产生了PipeImpl对象,并将WindowsSelectorProvider对象传入进去:

    1 PipeImpl(SelectorProvider var1) throws IOException {
    2     try {
    3         AccessController.doPrivileged(new PipeImpl.Initializer(var1));
    4     } catch (PrivilegedActionException var3) {
    5         throw (IOException)var3.getCause();
    6     }
    7 }

    可以看到这个构造方法实际上是以特权模式运行的PipeImpl的内部类Initializer的run方法(doPrivileged需要的参数是PrivilegedExceptionAction接口的实现类,该接口只有run方法):
    Initializer 的初始化:

     1 private class Initializer implements PrivilegedExceptionAction<Void> {
     2     private final SelectorProvider sp;
     3     private IOException ioe;
     4     
     5     private Initializer(SelectorProvider var2) {
     6         this.ioe = null;
     7         this.sp = var2;
     8     }
     9     ......
    10 }

    该构造方法给sp赋值为传入进来的WindowsSelectorProvider对象,令ioe=null;
    其所实现的run方法如下:

     1 public Void run() throws IOException {
     2     PipeImpl.Initializer.LoopbackConnector var1 = new PipeImpl.Initializer.LoopbackConnector();
     3     var1.run();
     4     if (this.ioe instanceof ClosedByInterruptException) {
     5         this.ioe = null;
     6         Thread var2 = new Thread(var1) {
     7             public void interrupt() {
     8             }
     9         };
    10         var2.start();
    11 
    12         while(true) {
    13             try {
    14                 var2.join();
    15                 break;
    16             } catch (InterruptedException var4) {
    17                 ;
    18             }
    19         }
    20 
    21         Thread.currentThread().interrupt();
    22     }
    23 
    24     if (this.ioe != null) {
    25         throw new IOException("Unable to establish loopback connection", this.ioe);
    26     } else {
    27         return null;
    28     }
    29 }

    首先产生LoopbackConnector 对象,是Initializer的内部类,而且实现了Runnable接口:

    1 private class LoopbackConnector implements Runnable {
    2     private LoopbackConnector() {
    3     }
    4 }

    其实现的run方法如下:

     1 public void run() {
     2     ServerSocketChannel var1 = null;
     3     SocketChannel var2 = null;
     4     SocketChannel var3 = null;
     5 
     6     try {
     7         ByteBuffer var4 = ByteBuffer.allocate(16);
     8         ByteBuffer var5 = ByteBuffer.allocate(16);
     9         InetAddress var6 = InetAddress.getByName("127.0.0.1");
    10 
    11         assert var6.isLoopbackAddress();
    12 
    13         InetSocketAddress var7 = null;
    14 
    15         while(true) {
    16             if (var1 == null || !var1.isOpen()) {
    17                 var1 = ServerSocketChannel.open();
    18                 var1.socket().bind(new InetSocketAddress(var6, 0));
    19                 var7 = new InetSocketAddress(var6, var1.socket().getLocalPort());
    20             }
    21 
    22             var2 = SocketChannel.open(var7);
    23             PipeImpl.RANDOM_NUMBER_GENERATOR.nextBytes(var4.array());
    24 
    25             do {
    26                 var2.write(var4);
    27             } while(var4.hasRemaining());
    28 
    29             var4.rewind();
    30             var3 = var1.accept();
    31 
    32             do {
    33                 var3.read(var5);
    34             } while(var5.hasRemaining());
    35 
    36             var5.rewind();
    37             if (var5.equals(var4)) {
    38                 PipeImpl.this.source = new SourceChannelImpl(Initializer.this.sp, var2);
    39                 PipeImpl.this.sink = new SinkChannelImpl(Initializer.this.sp, var3);
    40                 break;
    41             }
    42 
    43             var3.close();
    44             var2.close();
    45         }
    46     } catch (IOException var18) {
    47         try {
    48             if (var2 != null) {
    49                 var2.close();
    50             }
    51 
    52             if (var3 != null) {
    53                 var3.close();
    54             }
    55         } catch (IOException var17) {
    56             ;
    57         }
    58 
    59         Initializer.this.ioe = var18;
    60     } finally {
    61         try {
    62             if (var1 != null) {
    63                 var1.close();
    64             }
    65         } catch (IOException var16) {
    66             ;
    67         }
    68 
    69     }
    70 
    71 }

    在这个run方法中首先定义了三个Channel一个ServerSocketChannel和两个SocketChannel,然后申请了两个十六字节的ByteBuffer缓冲区,定义了一个回送地址var6;在while循环中先检查ServerSocketChannel是否开启了,若没有则需要调用open方法开启并赋值给var1,绑定地址为var6即回送地址,端口为0,令var7这个InetSocketAddress对象的地址是var6,端口是ServerSocketChannel的端口;ServerSocketChannel初始化完毕,初始化一个SocketChannel即var2,通过刚才的var7这个InetSocketAddress对象和ServerSocketChannel建立连接;

    在PipeImpl里有一个静态成员:

    1 private static final Random RANDOM_NUMBER_GENERATOR = new SecureRandom();

    RANDOM_NUMBER_GENERATOR 听名字就知道它是用来生成随机数;
    通过RANDOM_NUMBER_GENERATOR将从生成的随机数存放在其中一个缓冲区ByteBuffer(var4)中,然后通过刚才连接好的SocketChannel即var2的write方法写入缓冲区中的所有可用数据发送给ServerSocketChannel;令var4缓冲区标志置0;接着ServerSocketChannel调用accept方法侦听刚才的连接产生一个SocketChannel对象var3,从var3中读取数据存放在缓冲区var5中,令var5缓冲区标志置0;然后比较var4和var5中的内容是否一致,若是一致则给PipeImpl的成员source和sink分别初始化保存起来,若不一致就继续循环,不断地重复上述过程,直至Pipe通道成功建立;至此结束LoopbackConnector的run方法。
    其在连接建立的过程中若是出现了异常会通过Initializer的ioe成员保存异常。

    再回到Initializer的run方法,在完成LoopbackConnector的run方法后,再根据ioe判读是否在刚才的连接建立中出现了ClosedByInterruptException异常,若是出现还需要通过线程启动LoopbackConnector的run方法直至其结束;若不是ClosedByInterruptException异常则直接抛出IOException。

    至此PipeImpl的构造结束,再回到WindowsSelectorImpl的构造,通过上述的操作产生的PipeImpl对象就赋值给了wakeupPipe成员;wakeupPipe的source就是刚才产生的SourceChannelImpl对象,wakeupPipe的sink就是刚才产生的SinkChannelImpl对象,再使用wakeupSourceFd保存source的fdVal值和wakeupSinkFd保存sink的fdVal值;并且禁用Nagle算法,最后使用pollWrpper成员保存source的fdVal值。

    上述建立的这个连接通道的主要目的不是为了确保能建立连接,而是为了解决Selector的select方法的阻塞问题,调用select方法时只有注册在Selector上的channel有事件就绪时才会被唤醒,而Selector提供的wakeup方法就利用了上述建立好的通道,通过SinkChannel给SourceChannel发送信号量,使得select被唤醒,具体实现会在后续的博客给出。

    Selector到此创建完毕。

  • 相关阅读:
    损失函数
    numpy中的broadcast
    混合模型
    贝叶斯学习
    python3中输出不换行
    C++11 实现 argsort
    Python中的闭包
    C语言 fread()与fwrite()函数说明与示例
    DFT与傅里叶变换的理解
    MISRA C:2012 Dir-1.1(只记录常犯的错误和常用的规则)Bit-fields inlineC99,NOT support in C90 #pragma
  • 原文地址:https://www.cnblogs.com/a526583280/p/10873176.html
Copyright © 2011-2022 走看看