zoukankan      html  css  js  c++  java
  • netty ChannelPipeline流处理源码详细分析

    netty 官网api,在介绍pipeline处理流的时候,给了一些例子和图片介绍。

    以来证明 upstreamHandle和downstreamHandler流处理的顺序。

    光看例子和结论,说实话很难理解,干脆不如自己动手,debug。

    如图。

    下面是例子

    public class Server {
    	public static void main(String args[]) {
    		ServerBootstrap bootsrap = new ServerBootstrap(
    				new NioServerSocketChannelFactory(Executors
    						.newCachedThreadPool(), Executors.newCachedThreadPool()));
    		bootsrap.setPipelineFactory(new PipelineFactoryTest());
    		bootsrap.bind(new InetSocketAddress(8888));
    	}
    
    public class PipelineFactoryTest implements ChannelPipelineFactory {
    
    	@Override
    	public ChannelPipeline getPipeline() throws Exception {
    		ChannelPipeline pipeline = Channels.pipeline();
    		pipeline.addLast("1", new UpstreamHandlerA());
    		pipeline.addLast("2", new UpstreamHandlerB());
    		pipeline.addLast("3", new DownstreamHandlerA());
    		pipeline.addLast("4", new DownstreamHandlerB());
    		pipeline.addLast("5", new UpstreamHandlerX());
    		return pipeline;
    	}
    }
    
    public class UpstreamHandlerA extends SimpleChannelUpstreamHandler {
    	@Override
    	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    			throws Exception {
    		Channel ctxchannel = ctx.getChannel();
    		Channel echannel =  e.getChannel();
    		
    		System.out.println(ctxchannel.equals(echannel));//handle和event共享一个channel
    System.out.println("UpstreamHandlerA.messageReceived:" + e.getMessage()); ctx.sendUpstream(e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerA.exceptionCaught:" + e.toString()); e.getChannel().close(); } public class UpstreamHandlerB extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out .println("UpstreamHandlerB.messageReceived:" + e.getMessage()); ctx.sendUpstream(e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerB.exceptionCaught:" + e.toString()); e.getChannel().close(); } } public class UpstreamHandlerX extends SimpleChannelUpstreamHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { System.out.println("UpstreamHandlerX.messageReceived:"+e.getMessage()); e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { System.out.println("UpstreamHandlerX.exceptionCaught"); e.getChannel().close(); } } public class DownstreamHandlerA extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.out.println("DownstreamHandlerA.handleDownstream"); super.handleDownstream(ctx, e); } } public class DownstreamHandlerB extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.out.println("DownstreamHandlerB.handleDownstream:"); super.handleDownstream(ctx, e); } }

     client:



    public class AppStoreClinetBootstrap { public static void main(String args[]){ ExecutorService bossExecutor = Executors.newCachedThreadPool(); ExecutorService workerExecutor = Executors.newCachedThreadPool(); ChannelFactory channelFactory = new NioClientSocketChannelFactory(bossExecutor, workerExecutor); ClientBootstrap bootstarp = new ClientBootstrap(channelFactory); bootstarp.setPipelineFactory(new AppClientChannelPipelineFactory()); ChannelFuture future = bootstarp.connect(new InetSocketAddress("localhost", 8888)); future.awaitUninterruptibly(); if(future.isSuccess()){ String msg = "hello word"; ChannelBuffer buffer = ChannelBuffers.buffer(msg.length()); buffer.writeBytes(msg.getBytes()); future.getChannel().write(buffer); } } }

    public class AppClientChannelPipelineFactory implements ChannelPipelineFactory{

     public ChannelPipeline getPipeline() throws Exception {  

     ChannelPipeline pipeline = pipeline();  

     //pipeline.addLast("encode", new StringEncoder());   

    pipeline.addLast("handler", new AppStoreClientHandler());   return pipeline;  

    }

    }

    public class AppStoreClientHandler extends SimpleChannelUpstreamHandler {  

    private static Logger log = Logger.getLogger(AppStoreClientHandler.class);  

    @Override

     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)    throws Exception {

     }

     @Override  

    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)    throws Exception {   

    // TODO Auto-generated method stub   super.exceptionCaught(ctx, e);

     }

    }

      

     上面的例子证明了。updatestream 和downstream的传播顺序。

    Upstream: 1 ->2 ->5 顺序处理
    Downstream: 4 ->3  逆序处理

    ========================================================

    好了到此打住。开始分析源码为什么这样?

    在servers端,bind后,

    如果client 没有请求,那么servers端会一直处于循环状态。直到有新的client 连接就开始激活

    代码如

     
    NioServerSocketPipelineSinkle class
    ......................
    public void run() {
                final Thread currentThread = Thread.currentThread();
    
                channel.shutdownLock.lock();
                try {
                    for (;;) {
                        try {
                            if (selector.select(1000) > 0) {
                                selector.selectedKeys().clear();
                            }
                            //启动servers后如果clent没有请求,则这个一直循环
    SocketChannel acceptedSocket = channel.socket.accept(); if (acceptedSocket != null) { registerAcceptedChannel(acceptedSocket, currentThread); } ......................................

    有client请求后被激活开始注册,如下代码

     private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                try {
                    ChannelPipeline pipeline =channel.getConfig().getPipelineFactory().getPipeline();
                    NioWorker worker = nextWorker();
                    worker.register(new NioAcceptedSocketChannel(
                            channel.getFactory(), pipeline, channel,
                            NioServerSocketPipelineSink.this, acceptedSocket,
                            worker, currentThread), null);
                } catch (Exception e) {
                    logger.warn(
                            "Failed to initialize an accepted socket.", e);
                    try {
                        acceptedSocket.close();
                    } catch (IOException e2) {
                        logger.warn(
                                "Failed to close a partially accepted socket.",
                                e2);
                    }
                }
            }
    

    红色部分。拿到 pipeline所有的handle ,即 PipelineFactoryTest 类中ChannelPipeline,具体由NIOWORK去处理I/O

    重点在于pipeline.addlast方法

        public synchronized void addLast(String name, ChannelHandler handler) {
            if (name2ctx.isEmpty()) {
                init(name, handler);
            } else {
                checkDuplicateName(name);
                DefaultChannelHandlerContext oldTail = tail;
                DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
    
                callBeforeAdd(newTail);
    
                oldTail.next = newTail;
                tail = newTail;
                name2ctx.put(name, newTail);
    
                callAfterAdd(newTail);
            }
        }
    

    DefaultChannelHandlerContext 就是链表结构,通过next和prev用来存放各种upstreamhandler和downstreamhandle(这个地方是重点),

    由于upstream是专门负责接收数据的,所以当客户端有数据请求时,PipelineFactoryTest类中的upstreamhandle就依次顺序传递。

    下面的代码说明了,为什么是顺序传递。如果大家细心的的话,可以看到 PipelineFactoryTest 3个upstreamhandle里面都有一个

    ctx.sendUpstream(e);(ChannelHandlerContext 就是各种handler的上下文)

    这个方法,就是上一个upstreamhandler负责将事件传递给下一个 upstreamhandler (典型的责任链模式)

    代码如下

     
         public void sendUpstream(ChannelEvent e) {
                DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
                if (next != null) {
                    DefaultChannelPipeline.this.sendUpstream(next, e);//下一个upstreamhandle立马触发
                }
            } DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) { if (ctx == null) { return null; } DefaultChannelHandlerContext realCtx = ctx; while (!realCtx.canHandleUpstream()) { realCtx = realCtx.next; if (realCtx == null) { return null; } } return realCtx; }

      前面讲了DefaultChannelHandlerContext是链表结构存放了不少handler,因此所有的upstreamhandle都在这里取。然后继续事件传递。

    因为所有的upstreamhandle是共用一个event,他们同时也共用一个channelbuffer。这种模式和责任链很相像,也可以来处理来用做filter处理

    写到这里,就很容易理解了,netty里面 各种encode(downstreamhandle)和decode(upstreamhandle)。

    同理downstreamhandle分析

    UpstreamHandlerX类有个e.getChannel().write(e.getMessage())方法。这里会触发一个DownstreamMessageEvent,从而找到对应的DownstreamHandlerA

    downA-downB的传递 是通过super.handleDownstream(ctx, e);来完成的。

     public ChannelFuture write(Object message) {
            return Channels.write(this, message);
        }
    
     public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
            ChannelFuture future = future(channel);
            channel.getPipeline().sendDownstream(
                    new DownstreamMessageEvent(channel, future, message, remoteAddress));
            return future;
        }
    

      

    写这篇blog思维跳跃比较大。摸索了一天,挺有收获的。记录下

      

      

      

  • 相关阅读:
    Vue.js中学习使用Vuex详解
    vuex存储和本地存储(localstorage、sessionstorage)的区别
    Java 编译与反编译
    Vue导航守卫beforeRouteEnter,beforeRouteUpdate,beforeRouteLeave详解
    Vue生命周期简介和钩子函数
    微信开发----被动回复用户消息
    C#4.0 System.Dynamic
    Mvc5 控制器,视图简单说明
    JQuery 禁用后退按钮
    防止用户多次点击
  • 原文地址:https://www.cnblogs.com/montya/p/2834279.html
Copyright © 2011-2022 走看看