zoukankan      html  css  js  c++  java
  • aio-java

     1 //创建一个AsynchronousSocketChannel实例,
     2 //AsynchronousChannelGroup在linux中是EPollPort,在windows中是Iopc,
     3 //AsynchronousChannelGroup内持有fileDescriptor到channel的映射,从epoll返回的事件可以间接的找到fileDescriptor,通过映射找到channel,从而完成io;
     4 //AsynchronousChannelGroup还持有线程池,自动开启,用于处理io,执行CompletionHandler。
     5 //线程池默认是一个new ThreadPool(Executors.newCachedThreadPool(threadFactory), isFixed, poolSize),
     6 //并启动poolSize个线程,poolSize的大小为Runtime.getRuntime().availableProcessors();
     7 //线程池由EPollPort(AsynchronousChannelGroupImpl的实现类)管理,会详细说EPollPort
     8 public static AsynchronousSocketChannel open(AsynchronousChannelGroup group)
     9     throws IOException
    10 {
    11     AsynchronousChannelProvider provider = (group == null) ?
    12       AsynchronousChannelProvider.provider() : group.provider();
    13     return provider.openAsynchronousSocketChannel(group);
    14 }
     1 public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group)
     2   throws IOException
     3 {
     4   return new UnixAsynchronousSocketChannelImpl(toPort(group));
     5 }
     6 
     7 private Port toPort(AsynchronousChannelGroup group) throws IOException {
     8   if (group == null) {
     9     return defaultEventPort();
    10   } else {
    11     if (!(group instanceof EPollPort))
    12       throw new IllegalChannelGroupException();
    13     return (Port)group;
    14   }
    15 }
    16     
    17 private EPollPort defaultEventPort() throws IOException {
    18   if (defaultPort == null) {
    19     synchronized (LinuxAsynchronousChannelProvider.class) {
    20       if (defaultPort == null) {
    21             //默认会创建一个Executors.newCachedThreadPool(threadFactory),并启动poolSize个线程,poolSize在创建ThreadPool时会设置
    22         defaultPort = new EPollPort(this, ThreadPool.getDefault()).start();
    23       }
    24     }
    25   }
    26   return defaultPort;
    27 }
    28     
    29 static ThreadPool createDefault() {
    30   int poolSize = getDefaultThreadPoolInitialSize();
    31   if (poolSize < 0) {
    32     poolSize = Runtime.getRuntime().availableProcessors();//与核心数相同,双核四线程,availableProcessors返回4
    33   }
    34 
    35   ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory();
    36   if (threadFactory == null) {
    37     threadFactory = defaultThreadFactory();
    38   }
    39 
    40   ExecutorService executorService = Executors.newCachedThreadPool(threadFactory);
    41   boolean isFixed = false;
    42   return new ThreadPool(executorService, isFixed, poolSize);
    43 }
    44     
    45 //Executors.newCachedThreadPool(threadFactory)中的threadFactory
    46 static ThreadFactory defaultThreadFactory() {
    47   return System.getSecurityManager() == null ? (var0) -> {//该ThreadFactory每次newThread都会返回一个Thread
    48     Thread var1 = new Thread(var0);
    49     var1.setDaemon(true);
    50       return var1;
    51   } : (var0) -> {
    52     PrivilegedAction var1 = () -> {
    53       InnocuousThread var1 = new InnocuousThread(var0);
    54       var1.setDaemon(true);
    55       return var1;
    56     };
    57     return (Thread)AccessController.doPrivileged(var1);
    58   };
    59 }
    60 
    61 //close时会关闭线程池
    62 void implClose() throws IOException {
    63   // remove the mapping
    64   port.unregister(fdVal);
    65 
    66   // close file descriptor
    67   nd.close(fd);
    68 
    69   // All outstanding I/O operations are required to fail
    70   finish(false, true, true);
    71 }
    72 final void unregister(int fd) {
    73   boolean checkForShutdown = false;
    74 
    75   fdToChannelLock.writeLock().lock();
    76   try {
    77     fdToChannel.remove(Integer.valueOf(fd));
    78     // last key to be removed so check if group is shutdown
    79     if (fdToChannel.isEmpty())
    80       checkForShutdown = true;
    81   } finally {
    82     fdToChannelLock.writeLock().unlock();
    83   }
    84 
    85   // continue shutdown
    86   if (checkForShutdown && isShutdown()) {
    87     try {
    88           //关闭线程池
    89       shutdownNow();
    90     } catch (IOException ignore) { }
    91   }
    92 }
    View Code
     1 //重点类,EPollPort
     2 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
     3 throws IOException
     4 {
     5   super(provider, pool);
     6 
     7   // open epoll
     8   //重点,创建epoll,之后所有的socket都会注册在该epoll上
     9   this.epfd = epollCreate();
    10 
    11   // create socket pair for wakeup mechanism
    12   int[] sv = new int[2];
    13   try {
    14     socketpair(sv);
    15     // register one end with epoll
    16     //重点,将sv[0]注册在epfd上,当sv[0]有事件发生时,epollWait就会wakeup,这是为了充分利用epollWait线程,也是为了。
    17     epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN);
    18   } catch (IOException x) {
    19     close0(epfd);
    20     throw x;
    21   }
    22   this.sp = sv;
    23 
    24   // allocate the poll array
    25   this.address = allocatePollArray(MAX_EPOLL_EVENTS);
    26 
    27   // create the queue and offer the special event to ensure that the first threads polls
    28   // epoll返回的事件都会放在该队列上
    29   this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
    30   //首个线程需要waitPoll,其余线程在queue上等待处理事件(event)
    31   this.queue.offer(NEED_TO_POLL);
    32 }
      1  /*
      2  * Task to process events from epoll and dispatch to the channel's
      3  * onEvent handler.
      4  *
      5  * Events are retreived from epoll in batch and offered to a BlockingQueue
      6  * where they are consumed by handler threads. A special "NEED_TO_POLL"
      7  * event is used to signal one consumer to re-poll when all events have
      8  * been consumed.
      9  */
     10 //线程池中运行的就是EventHandlerTask
     11 private class EventHandlerTask implements Runnable {
     12   private Event poll() throws IOException {
     13     try {
     14       for (;;) {
     15             //等待事件就绪
     16         int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
     17         /*
     18          * 'n' events have been read. Here we map them to their
     19          * corresponding channel in batch and queue n-1 so that
     20          * they can be handled by other handler threads. The last
     21          * event is handled by this thread (and so is not queued).
     22          */
     23         fdToChannelLock.readLock().lock();
     24         try {
     25           while (n-- > 0) {
     26             long eventAddress = getEvent(address, n);
     27             int fd = getDescriptor(eventAddress);
     28 
     29             // wakeup
     30             //用于wakeup,sp[0]是fileDescriptor,sp[1]相当于一个socket,当需要唤醒epollWait时就向sp[1]写入数据。
     31             //唤醒有两个作用,1为了关闭socket时,shutdown线程池,
     32             //2为了充分利用正在epollWait的线程,唤醒它去执行任务,也避免了单线程下没有可执行线程的问题
     33             if (fd == sp[0]) {
     34               if (wakeupCount.decrementAndGet() == 0) {
     35                 // no more wakeups so drain pipe
     36                 drain1(sp[0]);
     37               }
     38 
     39               // queue special event if there are more events
     40               // to handle.
     41               //如果不是单纯的wakeup,而是因为有事件发生,则入队EXECUTE_TASK_OR_SHUTDOWN,
     42               //这样可以让阻塞在queue.take处的线程尽快执行handlerTask
     43               if (n > 0) {
     44                 queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
     45                 continue;
     46               }
     47               //如果只是单纯的被wakeup,则由该线程执行handlerTask,并将NEED_TO_POLL入队,表示需要有线程来epoll
     48               return EXECUTE_TASK_OR_SHUTDOWN;
     49           }
     50 
     51           PollableChannel channel = fdToChannel.get(fd);
     52           if (channel != null) {
     53               int events = getEvents(eventAddress);
     54               Event ev = new Event(channel, events);
     55 
     56               // n-1 events are queued; This thread handles
     57               // the last one except for the wakeup
     58               if (n > 0) {
     59                   queue.offer(ev);
     60               } else {//由当前线程处理最后一个事件,避免单线程下没有可执行线程,多线程下也可充分利用线程
     61                   return ev;
     62               }
     63             }
     64           }//while
     65         } finally {
     66           fdToChannelLock.readLock().unlock();
     67         }
     68       }
     69     } finally {
     70       // to ensure that some thread will poll when all events have
     71       // been consumed
     72       queue.offer(NEED_TO_POLL);
     73     }
     74   }
     75 
     76     public void run() {
     77       Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
     78           Invoker.getGroupAndInvokeCount();
     79       final boolean isPooledThread = (myGroupAndInvokeCount != null);
     80       boolean replaceMe = false;
     81       Event ev;
     82       try {
     83         for (;;) {
     84           // reset invoke count
     85           if (isPooledThread)
     86             myGroupAndInvokeCount.resetInvokeCount();
     87 
     88           try {
     89               replaceMe = false;
     90               //线程池中多数进程会阻塞在这里,等待事件的到来
     91               ev = queue.take();
     92 
     93               // no events and this thread has been "selected" to
     94               // poll for more.
     95               //从队列中得到NEED_TO_POLL元素的线程,会去epoll
     96               if (ev == NEED_TO_POLL) {
     97               try {
     98                 ev = poll();
     99               } catch (IOException x) {
    100                 x.printStackTrace();
    101                 return;
    102               }
    103               }
    104           } catch (InterruptedException x) {
    105             continue;
    106           }
    107 
    108           // handle wakeup to execute task or shutdown
    109           //从队列中(也可能是直接返回的)获取到EXECUTE_TASK_OR_SHUTDOWN元素的线程去执行handlerTask,
    110           //handlerTask为null意味着关闭socket,要shutdown线程池
    111           if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
    112             Runnable task = pollTask();
    113             if (task == null) {
    114                 // shutdown request
    115                 return;
    116             }
    117             // run task (may throw error/exception)
    118             replaceMe = true;
    119             task.run();
    120             continue;
    121           }
    122 
    123           // process event
    124           try {
    125                 //多数情况会执行这里,即epoll到事件(非wakeup)需要处理,
    126                 //以读事件为例,处理逻辑大致就是读数据到ByteBuffer,然后回调CompletionHandler,
    127                 //我们在CompletionHandler中只需要处理已经读到数据的ByteBuffer即可。
    128             ev.channel().onEvent(ev.events(), isPooledThread);
    129           } catch (Error x) {
    130             replaceMe = true; throw x;
    131           } catch (RuntimeException x) {
    132             replaceMe = true; throw x;
    133           }
    134         }
    135       } finally {
    136         // last handler to exit when shutdown releases resources
    137         int remaining = threadExit(this, replaceMe);
    138         if (remaining == 0 && isShutdown()) {
    139           implClose();
    140         }
    141       }
    142     }
    143 }

    是不是注册一次读事件,之后每当可读时CompletionHandler都会被回调呢?不是的,java的aio框架中限定了注册一次事件,也只监听一次事件,这是通过EPOLLONESHOT限制的。

     1 // invoke by clients to register a file descriptor
     2 @Override
     3 void startPoll(int fd, int events) {
     4   
     5   //EPOLLONESHOT只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把事件加入到EPOLL里。
     6   //EPOLLIN触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
     7   //EPOLLOUT触发该事件,表示对应的文件描述符上可以写数据;
     8   // update events (or add to epoll on first usage)
     9   int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
    10   if (err == ENOENT)
    11     err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
    12   if (err != 0)
    13     throw new AssertionError();     // should not happen
    14 }

    其实nio也同样使用了epoll,只不过nio不自带线程池框架,并且select的返回只意味着事件的就绪,而aio框架中CompletionHandler的回调意味着事件的完成(比如read中得到的ByteBuffer已经被填好了数据,这也是线程池的任务)。个人感觉,aio适合简单处理,nio更适合复杂处理。

  • 相关阅读:
    java四种线程池的使用
    @Autowired@Resource@Qualifier的区别
    Unsupported major.minor version 52.0解决办法
    CentOS7配置防火墙
    redis 集群搭建
    excludepathpatterns 无效
    解决 SpringBoot 没有主清单属性
    Java Web应用中调优线程池的重要性
    spring boot application properties配置详解
    Class path contains multiple SLF4J bindings.
  • 原文地址:https://www.cnblogs.com/holoyong/p/7353716.html
Copyright © 2011-2022 走看看