zoukankan      html  css  js  c++  java
  • Netty4 SEDA 事件驱动原理分析

    Netty4 SEDA 事件驱动原理分析

     

     

        AioServerSocketChannel的事件处理线程(AioEventLoop)建立过程分析

    (Bootstrap启动ServerSocketChannel处理线程的过程分析)

     

    1:AbstractBootstrap.initAndRegister(): 向BossGroup(AioEventLoopGroup)

    注册初始化后的AioServerSocketChannel,

     

    2,调用MultithreadEventExecutorGroup.next()方法,在BossGroup中取出一个可用的EventLoop

     

    3, 返回到MultithreadEventLoopGroup.register(Channel channel, ChannelPromise promise)方法,
    将AioServerSocketChannel注册到(2)从BossGroup中取出的那个EventLoop.

     

     

    4,进入到AioEventLoop.register(Channel channel, ChannelPromise future)方法

    AioEventLoop实现了JDK的ScheduledExecutorService,持有一条线程,具备任务执行能力,

    将AioServerSocketChannel测试到AioEventLoop后,由AioEventLoop执行ServerSocketChannel的accept新的socket的操作

     

    5,进入到AbstractChannel.AbstractUnsafe.register(EventLoop eventLoop, ChannelPromise promise)方法,

     

    6)进入到SingleThreadEventExecutor.execute(Runnable task)方法,执行startThread()后,AioServerSocketChannel的事件处理线程(AioEventLoop)启动,将由它来接管AioServerSocketChanel的事件处理(如Accept新的Socket),然后ServerBootstrap线程就进入等待状态,等待ServerSocketChannel绑定端口,然后等待ServerSocketChannel关闭.

     

    7)先看下面

     

     

     

    8)AioServerSocketChannel的事件处理线程AioEventLoop从一个BlockingDeque里取出新任务,取出的第一个任务是给AioServerSocketChannel注册JDK的AsynchronousServerSocketChannel(这才是真正和底层操作系统通信的ServerSocketChannel)

    注册了:

     

    9)Channel成功注册后,调用其pipeline.fireChannelRegistered()方法Fire这个channel注册的事件

     

    10)ChannelPipeline的fireChannelRegistered方法先初始化 其处理器链的第一个元素HeadHandler,然后再执行HeadHandler的fireChannelRegistered()方法

     

    DefaultChannelHandlerContext.fireChannelRegistered(),Netty4 的DefaultChannelHandlerContext对ChannelInboundInvoker接口的的事件处理方法fireXxxx(),具有链式传递效应,顺序将同一方向上的所有事件

     

     

     

     

    11)然后, AioServerSocketChanel的pipeline的处理器链上的ChannelInitializer响应channelRegistered事件: pipeline安装上Acceptor,使得AioServerSocketChanel得以具备处理socket Accept事件的能力.

     

    12) AioServerSocketChanel将accept委托给JDK的异步ServerSocketChannel来执行异步accept,并指定了事件处理器,当ACCEPT事件发生的时候, AcceptHandler就会被调用

     

    然后AioServerSocketChanel的AioEventLoop就进入等待状态,等待taskQueue有任务到来

     

    13)现在在浏览器输入http://127.0.0.1:8080/,向服务器发出请求

    当相应事件发生时,JDK的CompletionHandler.complted(..)方法会被一条后台Daemon的线程调用 :

     

    将接收到的SocketChannel包装成AioSocketChannel作为一个Message写进AioServerSocketChannel的ChannelPipeline的inboundMessageBuffer,然后Fire 相关事件inboundBufferUpdated事件

     

    14)AioServerSocketChannel的ChannelPipeline Fire inboundBufferUpdated事件

     

    15)沿inbound方向找到了SererBootstrapAcceptor的ChannelHandlerContext

     

    16)inboundBufferUpdate事件传递到ServerBootstrapAcceptor

     

    17) DefaultChannelHandlerContext.invokeInboundBufferUpdated(),调用Acceptor的inboundBufferUpdated方法

     

    18)Acceptor取得AioSocketChannel,并向其添加ChannelInitializer

     

    19)向ChildGroup注册AioSocketChannel

     

    20)将AioSocketChannel注册到AioEventLoopGroup分配的AioEventLoop上

     

    21)BOSS线程(AioServerSocketChannel所属的线程)启动AioSocketChannel所属的AioEventLoop线程,BOSS线程的Accept工作也至此结束,然后就去回去处理其taskQueue中的任务,或进入wait状态,等待下一个Accept事件的发生.

     

     

     

        Worker线程线程的初始化

     

     

    22)Worker线程(AioSocketChannel所属的AioEventLoop)独立运行后,取出其taskQueue中的第一个任务: 通过AioSocketChannel的ChannelPipeline触发pipeline的ChannelHandlerContext处理器链的channelRegistered注册事件

    23)初始化ByteHeadHandler(ChannelOutboundHandler的子类),给它分配outByteBuf和outMsgBuf

     

    24)触发处理器链的下一节点的channelRegistered事件


     

    25)沿ChannelHandlerContext处理器链找到了自己定义 的用户AioSocketChannel初始化的HttpSnoopServerInitializer

     

    26)ChannelInitializer是利用channelRegistered事件来初始化pipeline,向其安装相关channelHandler

     

    27)从pipeline的处理器链删除初始化器,并触发下一处理器的channelRegistered事件

     

    28) channelRegistered事件通过一个一个ChannelHandlerContext的fireChannelRegistered方法沿ByteHeadHandler传递到TailHandler,然后事件就停止传播

     

     

    29)ChannelPipeline完成channelRegistered的事件触发后,就触发channelActive事件,

    然后channelActive事件就随着ChannelHandlerContext和链式调用而在处理器链中传递,原理上上面差不多,故不详细说.

     

    30)AioSocketChannel读取数据

    由pipeline来读取数据

     

    到TailHandler读取数据

     

    31)向outbound方向上寻找最近的ChannelOperationHandler,委托它进行数据读取

     

    首先委托给HttpResponseEncoder读取

    HttpRequestDecoder委托给下一个ChannelOperationHandler读取

     

    最终委托给ByteHeadHandler读取

     

     

    32)最终由AioSocketChannel通过JDK的AsynchronousSocketChannel从底层读取

     

     

    33)AioSocketChannel将读取数据操作委托给JDKAsynchronousSocketChannel来完成,AioSocketChannel的当前线程不会被阻塞,立马返回处理taskQueue中下一任务或没任务时进入wait等待BlockingQueue的状态.

    AsynchronousSocketChannelread操作完成后,ReadHandler会被一条后台Daemon线程调用

     

    34) ReadHandler的执行从Daemon线程转移回AioSocketChannle所属的AioEventLoop线程来执行

    35)由AioEventLoop执行ReadHandler

     

     

     

    36)数据已准备好,pipeline触发inboundBufferUpdated方法,然后pipeline中处理器链的inbound方向上的ChannelStateHandler的inboundBufferUpdated方法会被顺度调用

     

    37)head 触发处理器链上inbound方向上的stateHandler的inboudBufferUpdated方法的顺序调用

     

     

    38) 处理器链上inbound方向上的stateHandler的inboudBufferUpdated方法的顺序调用

    -----这里省略,参考:http://blog.csdn.net/irelandken/article/details/8760302

     

     

    39)输入数据: DefaultChannelHandlerContext.flush(ChannelPromise promise)

     

    40)由HttpServerCodec中的HttpResponseEncoder将HttpMessage 转换为ByteBuf

    DefaultChannelHandlerContext.invokeFlush0(ChannelPromise promise)

     

    41)由Pipeline的ByteHeadHandler将ByteBuf  flush出去

     

    42)AioSocketChannel将Flush操作委托给JDK AsyncounsSocketChannel异步进行,最终结果等写操作完成后由后台Deamon线程通知并执行WriteHandler.

    而AioEventLoop线程是马上返回的,不会被write操作阻塞,然后继续执行taskQueue中的任务或进入wait状态等待BlockingQueue.

     

     

     

    43)Write操作完成后,WriteHandler被后台Daemon线程调用

     

     

    44)再次WriteHandler转交由关联的AioSocketChannel所属的AioEventLoop执行

     

    45) AioEventLoop执行完WriteHandler后,整个接收和响应过程就完成了.

  • 相关阅读:
    安装RabbitMQ说明
    死锁
    管程
    MybatisPlus快速开发
    了解Mybatis-Plus
    查看监听器状态
    The command supports no service 解决办法
    任务11 Arduino光照报警器
    任务10 测试光的强度实验
    任务9 Arduino光敏实验
  • 原文地址:https://www.cnblogs.com/javawebsoa/p/3002727.html
Copyright © 2011-2022 走看看