zoukankan      html  css  js  c++  java
  • bossGroup

    向bossGroup里注册通道

    流程图

     书接上文

    ServerBootstrap.bind(hostnameport)

     1 final ChannelFuture initAndRegister() {
     2         Channel channel = null;
     3         try {
     4             channel = channelFactory.newChannel();
     5             init(channel);
     6         } catch (Throwable t) {
     7             if (channel != null) {
     8                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
     9                 channel.unsafe().closeForcibly();
    10                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    11                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    12             }
    13             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    14             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    15         }
    16 
    17         ChannelFuture regFuture = config().group().register(channel);
    18         if (regFuture.cause() != null) {
    19             if (channel.isRegistered()) {
    20                 channel.close();
    21             } else {
    22                 channel.unsafe().closeForcibly();
    23             }
    24         }
    25 
    26         return regFuture;
    27     }

    上篇文章说了init方法,接下去就是注册通道

    config().group().register(channel)方法

    首先调用的是MultithreadEventLoopGroup.register(Channel channel)方法

    1 public ChannelFuture register(Channel channel) {
    2         return next().register(channel);
    3     }
    4 
    5 public EventLoop next() {
    6         return (EventLoop) super.next();
    7     }

    调用MultithreadEventExecutorGroup的next()方法

    1 public EventExecutor next() {
    2     return chooser.next();
    3 }

    chooser选择器。MultithreadEventExecutorGroup构造方法中实例化。

     1 protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
     2                                             EventExecutorChooserFactory chooserFactory, Object... args) {
     3         if (nThreads <= 0) {
     4             throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
     5         }
     6 
     7         if (executor == null) {
     8             executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
     9         }
    10 
    11         children = new EventExecutor[nThreads];
    12 
    13         for (int i = 0; i < nThreads; i ++) {
    14             boolean success = false;
    15             try {
    16                 children[i] = newChild(executor, args);
    17                 success = true;
    18             } catch (Exception e) {
    19                 // TODO: Think about if this is a good exception type
    20                 throw new IllegalStateException("failed to create a child event loop", e);
    21             } finally {
    22                 if (!success) {
    23                     for (int j = 0; j < i; j ++) {
    24                         children[j].shutdownGracefully();
    25                     }
    26 
    27                     for (int j = 0; j < i; j ++) {
    28                         EventExecutor e = children[j];
    29                         try {
    30                             while (!e.isTerminated()) {
    31                                 e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
    32                             }
    33                         } catch (InterruptedException interrupted) {
    34                             // Let the caller handle the interruption.
    35                             Thread.currentThread().interrupt();
    36                             break;
    37                         }
    38                     }
    39                 }
    40             }
    41         }
    42 
    43         chooser = chooserFactory.newChooser(children);//实例化选择器
    44 
    45         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
    46             @Override
    47             public void operationComplete(Future<Object> future) throws Exception {
    48                 if (terminatedChildren.incrementAndGet() == children.length) {
    49                     terminationFuture.setSuccess(null);
    50                 }
    51             }
    52         };
    53 
    54         for (EventExecutor e: children) {
    55             e.terminationFuture().addListener(terminationListener);
    56         }
    57 
    58         Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    59         Collections.addAll(childrenSet, children);
    60         readonlyChildren = Collections.unmodifiableSet(childrenSet);
    61     }

    chooserFactory.newChooser(children);

    其实就前面讲过的选择器工厂DefaultEventExecutorChooserFactory,如何选择一个执行器。

    1 public EventExecutorChooser newChooser(EventExecutor[] executors) {
    2         if (isPowerOfTwo(executors.length)) {
    3             return new PowerOfTwoEventExecutorChooser(executors);
    4         } else {
    5             return new GenericEventExecutorChooser(executors);
    6         }
    7     }

    chooser.next();

     1 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
     2         private final AtomicInteger idx = new AtomicInteger();
     3         private final EventExecutor[] executors;
     4 
     5         PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
     6             this.executors = executors;
     7         }
     8 
     9         @Override
    10         public EventExecutor next() {
    11             return executors[idx.getAndIncrement() & executors.length - 1];
    12         }
    13     }
    14 
    15     private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    16         // Use a 'long' counter to avoid non-round-robin behaviour at the 32-bit overflow boundary.
    17         // The 64-bit long solves this by placing the overflow so far into the future, that no system
    18         // will encounter this in practice.
    19         private final AtomicLong idx = new AtomicLong();
    20         private final EventExecutor[] executors;
    21 
    22         GenericEventExecutorChooser(EventExecutor[] executors) {
    23             this.executors = executors;
    24         }
    25 
    26         @Override
    27         public EventExecutor next() {
    28             return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    29         }
    30     }

     选到一个NioEventLoop,执行注册,调用的是NioEventLoopregister。子类没有这个方法,由父类SingleThreadEventLoop实现。

    1 public ChannelFuture register(Channel channel) {
    2         return register(new DefaultChannelPromise(channel, this));
    3     }

    创建了一个DefaultChannelPromise把通道和NioEventLoop都封装进去,DefaultChannelPromise是对Future做了加强,也是异步回调,可以设置很多监听,也可以将结果写入。

    1 public ChannelFuture register(final ChannelPromise promise) {
    2         ObjectUtil.checkNotNull(promise, "promise");
    3         promise.channel().unsafe().register(this, promise);
    4         return promise;
    5     }

    Channel.Unsafe

    这里的Unsafe不是JDK的Unsafe,是netty封装的Channel的内部类,因为他是一些操作NIO底层的方法,不建议外部用的,所有也叫Unsafe。我们所说的把通道注册进bossGroup实际上就是调用他的实现类。promise.channel()方法返回NioServerSocketChannel,他的unsafe方法由它的父类AbstractNioChannel实现。

    1 public NioUnsafe unsafe() {
    2 
    3     return (NioUnsafe) super.unsafe();
    4 }
    5 //父类unsafe方法
    6 public Unsafe unsafe() {
    7 
    8     return unsafe;
    9 }

    返回值 在初始化AbstractChannel时 指定

    1 protected AbstractChannel(Channel parent) {
    2 
    3     this.parent = parent;
    4    id = newId();
    5    unsafe = newUnsafe();
    6    pipeline = newChannelPipeline();
    7 }

    newUnsafe(),又由子类AbstractNioMessageChannel实现

    1 protected AbstractNioUnsafe newUnsafe() {
    2 
    3     return new NioMessageUnsafe();
    4 }

    所以config().group().register(channel)调用的是NioMessageUnsafe.register方法,但是自己没有这个方法,所以最终调用父类AbstractUnsafe的register方法,AbstractUnsafe为AbstractChannel的内部类.

    这个就是真正的注册方法:

     1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
     2             ObjectUtil.checkNotNull(eventLoop, "eventLoop");
     3             if (isRegistered()) {
     4                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
     5                 return;
     6             }
     7             if (!isCompatible(eventLoop)) {
     8                 promise.setFailure(
     9                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
    10                 return;
    11             }
    12 
    13             AbstractChannel.this.eventLoop = eventLoop;//设置通道的事件循环,1对1,只设置一次
    14        //只能当前线程是eventLoop的线程才可以注册,防止多线程并发问题,所以即使多线程来操作,也是安全的,会按照一定顺序提交到任务队列里
    15             if (eventLoop.inEventLoop()) {
    16                 register0(promise);
    17             } else {//否则就当做任务提交给eventLoop的任务队列
    18                 try {
    19                     eventLoop.execute(new Runnable() {
    20                         @Override
    21                         public void run() {
    22                             register0(promise);
    23                         }
    24                     });
    25                 } catch (Throwable t) {
    26                     logger.warn(
    27                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
    28                             AbstractChannel.this, t);
    29                     closeForcibly();
    30                     closeFuture.setClosed();
    31                     safeSetFailure(promise, t);
    32                 }
    33             }
    34         }

    这里有个很巧妙的点,就是eventLoop.inEventLoop()这个的判断,就是判断调用这个方法的是不是eventLoop的线程,如果是,那就是同一个线程调用,直接就注册,否则属于多线程调用,可能会有问题,所以还是提交一个任务给eventLoop的线程去执行,这样就是单线程,不会有线程安全问题。

    此时当前线程是main肯定不是事件循环里的线程,事件循环里的线程还没创建呢,所以会提交到队列。eventLoop为NioEventLoop,执行父类SingleThreadEventExecutor的execute方法。

    1 public void execute(Runnable task) {
    2 
    3     ObjectUtil.checkNotNull(task, "task");
    4    execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    5 }
     1 private void execute(Runnable task, boolean immediate) {
     2         boolean inEventLoop = inEventLoop();
     3         addTask(task);//添加任务到taskQueue
     4         if (!inEventLoop) {//当前线程不是事件循环线程
     5             startThread();//让线程工厂开启线程
     6             if (isShutdown()) {
     7                 boolean reject = false;
     8                 try {
     9                     if (removeTask(task)) {
    10                         reject = true;
    11                     }
    12                 } catch (UnsupportedOperationException e) {
    13                     // The task queue does not support removal so the best thing we can do is to just move on and
    14                     // hope we will be able to pick-up the task before its completely terminated.
    15                     // In worst case we will log on termination.
    16                 }
    17                 if (reject) {
    18                     reject();
    19                 }
    20             }
    21         }
    22 
    23         if (!addTaskWakesUp && immediate) {
    24             wakeup(inEventLoop);//唤醒线程,添加了一个空的任务去唤醒
    25         }
    26     }

    task为register0(promise);方法,addTask是给taskqueue添加task,

    startThread();

     1 private static final int ST_NOT_STARTED = 1;//没启动
     2     private static final int ST_STARTED = 2;//启动了
     3     private static final int ST_SHUTTING_DOWN = 3;//正在关闭中
     4     private static final int ST_SHUTDOWN = 4;//关闭了
     5     private static final int ST_TERMINATED = 5;//终止了
     6 
     7 private void startThread() {
     8         if (state == ST_NOT_STARTED) {
     9             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    10                 boolean success = false;
    11                 try {
    12                     doStartThread();//具体的开启线程
    13                     success = true;
    14                 } finally {
    15                     if (!success) {
    16                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
    17                     }
    18                 }
    19             }
    20         }
    21     }

    doStartThread()

    其实里面就是调用SingleThreadEventExecutor.this.run();,这里其实executor.execute已经启动了新的线程来执行new Runnable()里的任务,也就是执行NioEventLoop. run()。

     1 private void doStartThread() {
     2         assert thread == null;
     3         executor.execute(new Runnable() {
     4             @Override
     5             public void run() {
     6                 thread = Thread.currentThread();
     7                 if (interrupted) {
     8                     thread.interrupt();
     9                 }
    10 
    11                 boolean success = false;
    12                 updateLastExecutionTime();
    13                 try {
    14                     SingleThreadEventExecutor.this.run();
    15                     success = true;
    16                 } catch (Throwable t) {
    17                     logger.warn("Unexpected exception from an event executor: ", t);
    18                 } finally {
    19                     for (;;) {
    20                         int oldState = state;
    21                         if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
    22                                 SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
    23                             break;
    24                         }
    25                     }
    26 
    27                     // Check if confirmShutdown() was called at the end of the loop.
    28                     if (success && gracefulShutdownStartTime == 0) {
    29                         if (logger.isErrorEnabled()) {
    30                             logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
    31                                     SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
    32                                     "be called before run() implementation terminates.");
    33                         }
    34                     }
    35 
    36                     try {
    37                         // Run all remaining tasks and shutdown hooks. At this point the event loop
    38                         // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
    39                         // graceful shutdown with quietPeriod.
    40                         for (;;) {
    41                             if (confirmShutdown()) {
    42                                 break;
    43                             }
    44                         }
    45 
    46                         // Now we want to make sure no more tasks can be added from this point. This is
    47                         // achieved by switching the state. Any new tasks beyond this point will be rejected.
    48                         for (;;) {
    49                             int oldState = state;
    50                             if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
    51                                     SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
    52                                 break;
    53                             }
    54                         }
    55 
    56                         // We have the final set of tasks in the queue now, no more can be added, run all remaining.
    57                         // No need to loop here, this is the final pass.
    58                         confirmShutdown();
    59                     } finally {
    60                         try {
    61                             cleanup();
    62                         } finally {
    63                             // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
    64                             // the future. The user may block on the future and once it unblocks the JVM may terminate
    65                             // and start unloading classes.
    66                             // See https://github.com/netty/netty/issues/6596.
    67                             FastThreadLocal.removeAll();
    68 
    69                             STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
    70                             threadLock.countDown();
    71                             int numUserTasks = drainTasks();
    72                             if (numUserTasks > 0 && logger.isWarnEnabled()) {
    73                                 logger.warn("An event executor terminated with " +
    74                                         "non-empty task queue (" + numUserTasks + ')');
    75                             }
    76                             terminationFuture.setSuccess(null);
    77                         }
    78                     }
    79                 }
    80             }
    81         });
    82     }

    executor在SingleThreadEventExecutor构造方法中实例化,this.executor = ThreadExecutorMap.apply(executor, this);

     1 public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
     2         ObjectUtil.checkNotNull(executor, "executor");
     3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
     4         return new Executor() {
     5             @Override
     6             public void execute(final Runnable command) {
     7                 executor.execute(apply(command, eventExecutor));
     8             }
     9         };
    10     }

    apply方法中的executor为SingleThreadEventExecutor构造方法的入参参数,根据继承关系可知SingleThreadEventExecutor构造方法在NioEventLoop构造方法中调用了,而NioEventLoop构造方法在之前已说明,在

    MultithreadEventExecutorGroup的构造方法中调用所以apply方法中的executor为ThreadPerTaskExecutor

    ThreadPerTaskExecutor该类的execute比较简单,利用DefaultThreadFactory实话为thread实现异步执行,但是thread为FastThreadLocalThread而不是原生的thread.

    apply方法中的eventExecutor为SingleThreadEventExecutor对象eventExecutor

    apply方法内部的apply

     1 public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
     2         ObjectUtil.checkNotNull(command, "command");
     3         ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
     4         return new Runnable() {
     5             @Override
     6             public void run() {
     7                 setCurrentEventExecutor(eventExecutor);
     8                 try {
     9                     command.run();
    10                 } finally {
    11                     setCurrentEventExecutor(null);
    12                 }
    13             }
    14         };
    15     }

    setCurrentEventExecutor(eventExecutor);设置本地线程变量,利用了FastThreadLocal

    command.run()方法调用的为doStartThread()方法内部的异步执行方法体SingleThreadEventExecutor.this.run();下文详解

     

    sync()

    main线程如果完成了注册流程后,就会调用sync()尝试阻塞,调用的就是:

    ChannelFuture channelFuture = b.bind(hostname, port).sync();

    最终到了DefaultPromise的await()方法:

     1 public Promise<V> await() throws InterruptedException {
     2         if (isDone()) {//完成了就返回
     3             return this;
     4         }
     5 
     6         if (Thread.interrupted()) {
     7             throw new InterruptedException(toString());
     8         }
     9 
    10         checkDeadLock();
    11 
    12         synchronized (this) {
    13             while (!isDone()) {//判断是否完成
    14                 incWaiters();
    15                 try {
    16                     wait();//阻塞
    17                 } finally {
    18                     decWaiters();
    19                 }
    20             }
    21         }
    22         return this;
    23     }

    如果这个时候注册和绑定端口完成了,就会返回,否则就会wait();阻塞。

    safeSetSuccess(promise);

    绑定是事件循环的线程去做的,完成绑定后会调用safeSetSuccess(promise);方法,设置成功的结果,唤醒阻塞的主线程。最终是调用DefaultPromise的checkNotifyWaiters:

    1 protected final void safeSetSuccess(ChannelPromise promise) {
    2             if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
    3                 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
    4             }
    5         }

    调用DefaultChannelPromise的trySuccess()

     1 private boolean setValue0(Object objResult) {
     2         if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
     3             RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
     4             if (checkNotifyWaiters()) {
     5                 notifyListeners();
     6             }
     7             return true;
     8         }
     9         return false;
    10     }

    方法checkNotifyWaiters()

    1 private synchronized boolean checkNotifyWaiters() {
    2         if (waiters > 0) {
    3             notifyAll();
    4         }
    5         return listeners != null;
    6     }

    方法notifyListeners()

     1 private void notifyListeners() {
     2         EventExecutor executor = executor();
     3         if (executor.inEventLoop()) {
     4             final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
     5             final int stackDepth = threadLocals.futureListenerStackDepth();
     6             if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
     7                 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
     8                 try {
     9                     notifyListenersNow();
    10                 } finally {
    11                     threadLocals.setFutureListenerStackDepth(stackDepth);
    12                 }
    13                 return;
    14             }
    15         }
    16 
    17         safeExecute(executor, new Runnable() {
    18             @Override
    19             public void run() {
    20                 notifyListenersNow();
    21             }
    22         });
    23     }

    注册绑定监听并阻塞

    阻塞被唤醒后,就注册一个监听的回调,然后阻塞关闭事件,其实就意味着不关闭就永远运行着了:

     1 ChannelFuture cf = bootstrap.bind(8888).sync();
     2             cf.addListener((ChannelFutureListener) future -> {
     3                 if (cf.isSuccess()) {
     4                     System.out.println("监听端口 8888 成功");
     5                 } else {
     6                     System.out.println("监听端口 8888 失败");
     7                 }
     8             });
     9             System.out.println("服务器开始提供服务");
    10             cf.channel().closeFuture().sync();

    当然也可以这样写,先添加监听,然后阻塞,反正结果都是一样的:

     1 ChannelFuture cf = bootstrap.bind(8888);
     2             cf.addListener((ChannelFutureListener) future -> {
     3                 if (cf.isSuccess()) {
     4                     System.out.println("监听端口 8888 成功");
     5                 } else {
     6                     System.out.println("监听端口 8888 失败");
     7                 }
     8             });
     9             System.out.println("服务器开始提供服务");
    10             cf.sync();

    如果addListener是添加在同步后面的,此时可能事件循环线程已经完成了注册和绑定,addListener里面就会提交一个任务,然后事件循环会去执行这个任务。

    控制台打印结果

    服务器开始提供服务
    监听端口 8888 成功

    至此注册流程基本结束,后续就看事件循环中的新线程如何运行了。

     



  • 相关阅读:
    触达项目涉及到的功能点
    NodeJS编程基础
    C#Socket通讯
    HTML转义字符大全
    C# 二进制,十进制,十六进制 互转
    浏览器的分类
    Prometheus设置systemctl管理
    第十五讲:Pagerduty的联用
    第十四讲:Prometheus 企业级实际使⽤二
    第十三讲:Prometheus 企业级实际使⽤
  • 原文地址:https://www.cnblogs.com/xiaojiesir/p/15321127.html
Copyright © 2011-2022 走看看