1 //创建一个AsynchronousSocketChannel实例, 2 //AsynchronousChannelGroup在linux中是EPollPort,在windows中是Iopc, 3 //AsynchronousChannelGroup内持有fileDescriptor到channel的映射,从epoll返回的事件可以间接的找到fileDescriptor,通过映射找到channel,从而完成io; 4 //AsynchronousChannelGroup还持有线程池,自动开启,用于处理io,执行CompletionHandler。 5 //线程池默认是一个new ThreadPool(Executors.newCachedThreadPool(threadFactory), isFixed, poolSize), 6 //并启动poolSize个线程,poolSize的大小为Runtime.getRuntime().availableProcessors(); 7 //线程池由EPollPort(AsynchronousChannelGroupImpl的实现类)管理,会详细说EPollPort 8 public static AsynchronousSocketChannel open(AsynchronousChannelGroup group) 9 throws IOException 10 { 11 AsynchronousChannelProvider provider = (group == null) ? 12 AsynchronousChannelProvider.provider() : group.provider(); 13 return provider.openAsynchronousSocketChannel(group); 14 }
1 public AsynchronousSocketChannel openAsynchronousSocketChannel(AsynchronousChannelGroup group) 2 throws IOException 3 { 4 return new UnixAsynchronousSocketChannelImpl(toPort(group)); 5 } 6 7 private Port toPort(AsynchronousChannelGroup group) throws IOException { 8 if (group == null) { 9 return defaultEventPort(); 10 } else { 11 if (!(group instanceof EPollPort)) 12 throw new IllegalChannelGroupException(); 13 return (Port)group; 14 } 15 } 16 17 private EPollPort defaultEventPort() throws IOException { 18 if (defaultPort == null) { 19 synchronized (LinuxAsynchronousChannelProvider.class) { 20 if (defaultPort == null) { 21 //默认会创建一个Executors.newCachedThreadPool(threadFactory),并启动poolSize个线程,poolSize在创建ThreadPool时会设置 22 defaultPort = new EPollPort(this, ThreadPool.getDefault()).start(); 23 } 24 } 25 } 26 return defaultPort; 27 } 28 29 static ThreadPool createDefault() { 30 int poolSize = getDefaultThreadPoolInitialSize(); 31 if (poolSize < 0) { 32 poolSize = Runtime.getRuntime().availableProcessors();//与核心数相同,双核四线程,availableProcessors返回4 33 } 34 35 ThreadFactory threadFactory = getDefaultThreadPoolThreadFactory(); 36 if (threadFactory == null) { 37 threadFactory = defaultThreadFactory(); 38 } 39 40 ExecutorService executorService = Executors.newCachedThreadPool(threadFactory); 41 boolean isFixed = false; 42 return new ThreadPool(executorService, isFixed, poolSize); 43 } 44 45 //Executors.newCachedThreadPool(threadFactory)中的threadFactory 46 static ThreadFactory defaultThreadFactory() { 47 return System.getSecurityManager() == null ? (var0) -> {//该ThreadFactory每次newThread都会返回一个Thread 48 Thread var1 = new Thread(var0); 49 var1.setDaemon(true); 50 return var1; 51 } : (var0) -> { 52 PrivilegedAction var1 = () -> { 53 InnocuousThread var1 = new InnocuousThread(var0); 54 var1.setDaemon(true); 55 return var1; 56 }; 57 return (Thread)AccessController.doPrivileged(var1); 58 }; 59 } 60 61 //close时会关闭线程池 62 void implClose() throws IOException { 63 // remove the mapping 64 port.unregister(fdVal); 65 66 // close file descriptor 67 nd.close(fd); 68 69 // All outstanding I/O operations are required to fail 70 finish(false, true, true); 71 } 72 final void unregister(int fd) { 73 boolean checkForShutdown = false; 74 75 fdToChannelLock.writeLock().lock(); 76 try { 77 fdToChannel.remove(Integer.valueOf(fd)); 78 // last key to be removed so check if group is shutdown 79 if (fdToChannel.isEmpty()) 80 checkForShutdown = true; 81 } finally { 82 fdToChannelLock.writeLock().unlock(); 83 } 84 85 // continue shutdown 86 if (checkForShutdown && isShutdown()) { 87 try { 88 //关闭线程池 89 shutdownNow(); 90 } catch (IOException ignore) { } 91 } 92 }
1 //重点类,EPollPort 2 EPollPort(AsynchronousChannelProvider provider, ThreadPool pool) 3 throws IOException 4 { 5 super(provider, pool); 6 7 // open epoll 8 //重点,创建epoll,之后所有的socket都会注册在该epoll上 9 this.epfd = epollCreate(); 10 11 // create socket pair for wakeup mechanism 12 int[] sv = new int[2]; 13 try { 14 socketpair(sv); 15 // register one end with epoll 16 //重点,将sv[0]注册在epfd上,当sv[0]有事件发生时,epollWait就会wakeup,这是为了充分利用epollWait线程,也是为了。 17 epollCtl(epfd, EPOLL_CTL_ADD, sv[0], POLLIN); 18 } catch (IOException x) { 19 close0(epfd); 20 throw x; 21 } 22 this.sp = sv; 23 24 // allocate the poll array 25 this.address = allocatePollArray(MAX_EPOLL_EVENTS); 26 27 // create the queue and offer the special event to ensure that the first threads polls 28 // epoll返回的事件都会放在该队列上 29 this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS); 30 //首个线程需要waitPoll,其余线程在queue上等待处理事件(event) 31 this.queue.offer(NEED_TO_POLL); 32 }
1 /* 2 * Task to process events from epoll and dispatch to the channel's 3 * onEvent handler. 4 * 5 * Events are retreived from epoll in batch and offered to a BlockingQueue 6 * where they are consumed by handler threads. A special "NEED_TO_POLL" 7 * event is used to signal one consumer to re-poll when all events have 8 * been consumed. 9 */ 10 //线程池中运行的就是EventHandlerTask 11 private class EventHandlerTask implements Runnable { 12 private Event poll() throws IOException { 13 try { 14 for (;;) { 15 //等待事件就绪 16 int n = epollWait(epfd, address, MAX_EPOLL_EVENTS); 17 /* 18 * 'n' events have been read. Here we map them to their 19 * corresponding channel in batch and queue n-1 so that 20 * they can be handled by other handler threads. The last 21 * event is handled by this thread (and so is not queued). 22 */ 23 fdToChannelLock.readLock().lock(); 24 try { 25 while (n-- > 0) { 26 long eventAddress = getEvent(address, n); 27 int fd = getDescriptor(eventAddress); 28 29 // wakeup 30 //用于wakeup,sp[0]是fileDescriptor,sp[1]相当于一个socket,当需要唤醒epollWait时就向sp[1]写入数据。 31 //唤醒有两个作用,1为了关闭socket时,shutdown线程池, 32 //2为了充分利用正在epollWait的线程,唤醒它去执行任务,也避免了单线程下没有可执行线程的问题 33 if (fd == sp[0]) { 34 if (wakeupCount.decrementAndGet() == 0) { 35 // no more wakeups so drain pipe 36 drain1(sp[0]); 37 } 38 39 // queue special event if there are more events 40 // to handle. 41 //如果不是单纯的wakeup,而是因为有事件发生,则入队EXECUTE_TASK_OR_SHUTDOWN, 42 //这样可以让阻塞在queue.take处的线程尽快执行handlerTask 43 if (n > 0) { 44 queue.offer(EXECUTE_TASK_OR_SHUTDOWN); 45 continue; 46 } 47 //如果只是单纯的被wakeup,则由该线程执行handlerTask,并将NEED_TO_POLL入队,表示需要有线程来epoll 48 return EXECUTE_TASK_OR_SHUTDOWN; 49 } 50 51 PollableChannel channel = fdToChannel.get(fd); 52 if (channel != null) { 53 int events = getEvents(eventAddress); 54 Event ev = new Event(channel, events); 55 56 // n-1 events are queued; This thread handles 57 // the last one except for the wakeup 58 if (n > 0) { 59 queue.offer(ev); 60 } else {//由当前线程处理最后一个事件,避免单线程下没有可执行线程,多线程下也可充分利用线程 61 return ev; 62 } 63 } 64 }//while 65 } finally { 66 fdToChannelLock.readLock().unlock(); 67 } 68 } 69 } finally { 70 // to ensure that some thread will poll when all events have 71 // been consumed 72 queue.offer(NEED_TO_POLL); 73 } 74 } 75 76 public void run() { 77 Invoker.GroupAndInvokeCount myGroupAndInvokeCount = 78 Invoker.getGroupAndInvokeCount(); 79 final boolean isPooledThread = (myGroupAndInvokeCount != null); 80 boolean replaceMe = false; 81 Event ev; 82 try { 83 for (;;) { 84 // reset invoke count 85 if (isPooledThread) 86 myGroupAndInvokeCount.resetInvokeCount(); 87 88 try { 89 replaceMe = false; 90 //线程池中多数进程会阻塞在这里,等待事件的到来 91 ev = queue.take(); 92 93 // no events and this thread has been "selected" to 94 // poll for more. 95 //从队列中得到NEED_TO_POLL元素的线程,会去epoll 96 if (ev == NEED_TO_POLL) { 97 try { 98 ev = poll(); 99 } catch (IOException x) { 100 x.printStackTrace(); 101 return; 102 } 103 } 104 } catch (InterruptedException x) { 105 continue; 106 } 107 108 // handle wakeup to execute task or shutdown 109 //从队列中(也可能是直接返回的)获取到EXECUTE_TASK_OR_SHUTDOWN元素的线程去执行handlerTask, 110 //handlerTask为null意味着关闭socket,要shutdown线程池 111 if (ev == EXECUTE_TASK_OR_SHUTDOWN) { 112 Runnable task = pollTask(); 113 if (task == null) { 114 // shutdown request 115 return; 116 } 117 // run task (may throw error/exception) 118 replaceMe = true; 119 task.run(); 120 continue; 121 } 122 123 // process event 124 try { 125 //多数情况会执行这里,即epoll到事件(非wakeup)需要处理, 126 //以读事件为例,处理逻辑大致就是读数据到ByteBuffer,然后回调CompletionHandler, 127 //我们在CompletionHandler中只需要处理已经读到数据的ByteBuffer即可。 128 ev.channel().onEvent(ev.events(), isPooledThread); 129 } catch (Error x) { 130 replaceMe = true; throw x; 131 } catch (RuntimeException x) { 132 replaceMe = true; throw x; 133 } 134 } 135 } finally { 136 // last handler to exit when shutdown releases resources 137 int remaining = threadExit(this, replaceMe); 138 if (remaining == 0 && isShutdown()) { 139 implClose(); 140 } 141 } 142 } 143 }
是不是注册一次读事件,之后每当可读时CompletionHandler都会被回调呢?不是的,java的aio框架中限定了注册一次事件,也只监听一次事件,这是通过EPOLLONESHOT限制的。
1 // invoke by clients to register a file descriptor 2 @Override 3 void startPoll(int fd, int events) { 4 5 //EPOLLONESHOT只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把事件加入到EPOLL里。 6 //EPOLLIN触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭); 7 //EPOLLOUT触发该事件,表示对应的文件描述符上可以写数据; 8 // update events (or add to epoll on first usage) 9 int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT)); 10 if (err == ENOENT) 11 err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT)); 12 if (err != 0) 13 throw new AssertionError(); // should not happen 14 }
其实nio也同样使用了epoll,只不过nio不自带线程池框架,并且select的返回只意味着事件的就绪,而aio框架中CompletionHandler的回调意味着事件的完成(比如read中得到的ByteBuffer已经被填好了数据,这也是线程池的任务)。个人感觉,aio适合简单处理,nio更适合复杂处理。