zoukankan      html  css  js  c++  java
  • Netty系列-netty的初体验

    一、前言

       最近看了netty源码,打算写个博客记下来,方便后面再复习,同时希望也能方便看到的人,在研究netty的时候,多少能方便点。

    二、环境搭建

       git clone netty的代码下来,或者可以fork到自己的git 仓库,然后git clone下来。

      后面的版本统一用

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.6.Final</version>
    </dependency>
    

    三、例子研究

    如下是服务端标准的代码案例,bossgroup主要是用来接收连接请求的,workergroup主要是用来处理读写请求的

     1    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
     2         EventLoopGroup workerGroup = new NioEventLoopGroup();
     3         final EchoServerHandler serverHandler = new EchoServerHandler();
     4         try {
     5             ServerBootstrap b = new ServerBootstrap();
     6             b.group(bossGroup, workerGroup)
     7              .channel(NioServerSocketChannel.class)
     8              .option(ChannelOption.SO_BACKLOG, 100)
     9              .handler(new LoggingHandler(LogLevel.INFO))
    10              .childHandler(new ChannelInitializer<SocketChannel>() {
    11                  @Override
    12                  public void initChannel(SocketChannel ch) throws Exception {
    13                      ChannelPipeline p = ch.pipeline();
    14                      if (sslCtx != null) {
    15                          p.addLast(sslCtx.newHandler(ch.alloc()));
    16                      }
    17                      //p.addLast(new LoggingHandler(LogLevel.INFO));
    18                      p.addLast(serverHandler);
    19                  }
    20              });
    21 
    22             // Start the server.
    23             ChannelFuture f = b.bind(PORT).sync();

    前面5-20都是初始化,我们先看23行,bind方法,一路跟下去,分为三部分

     1 private ChannelFuture doBind(final SocketAddress localAddress) {
     2         final ChannelFuture regFuture = initAndRegister();
     3         final Channel channel = regFuture.channel();
     4         if (regFuture.cause() != null) {
     5             return regFuture;
     6         }
     7 
     8         if (regFuture.isDone()) {
     9             // At this point we know that the registration was complete and successful.
    10             ChannelPromise promise = channel.newPromise();
    11             doBind0(regFuture, channel, localAddress, promise);
    12             return promise;
    13         } else {
    14             // Registration future is almost always fulfilled already, but just in case it's not.
    15             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    16             regFuture.addListener(new ChannelFutureListener() {
    17                 @Override
    18                 public void operationComplete(ChannelFuture future) throws Exception {
    19                     Throwable cause = future.cause();
    20                     if (cause != null) {
    21                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
    22                         // IllegalStateException once we try to access the EventLoop of the Channel.
    23                         promise.setFailure(cause);
    24                     } else {
    25                         // Registration was successful, so set the correct executor to use.
    26                         // See https://github.com/netty/netty/issues/2586
    27                         promise.registered();
    28 
    29                         doBind0(regFuture, channel, localAddress, promise);
    30                     }
    31                 }
    32             });
    33             return promise;
    34         }
    35     }

    第一是 initAndRegister方法

     在这里,channelFactory啥时候初始化的?我们回到标准案例那里

     跟进去看看

    public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }

    跟到底会发现下面这段

    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
            if (channelFactory == null) {
                throw new NullPointerException("channelFactory");
            }
            if (this.channelFactory != null) {
                throw new IllegalStateException("channelFactory set already");
            }
            //初始化cannelFactory
            this.channelFactory = channelFactory;
            return self();
        }

    所以很明显,cannelFactory是ReflectiveChannelFactory,我们继续看ReflectiveChannelFactory的newChannel方法

     public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }

    这就可以看出,channel的创建是通过工厂模式,反射创建无参构造函数的,实例就是我们初始化传进去的 NioServerSocketChannel,我们把channel的创建看完,继续跟它的构造函数

    
    
    private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }

    DEFAULT_SELECTOR_PROVIDER 是根据操作系统选择的provider,而newSocket其实就是根据provider到jdk底层去获取对应的serversockerchannel,我们继续this,

    public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }

    调用父类的构造方法,注意参数 SelectionKey.OP_ACCEPT,继续

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }

    这个构造函数就是把前面生成的channel和accept事件保存起来,并设置该channel为非阻塞模式,是不是就是nio的代码方式,我们继续看super

    protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }

    id我们暂时不管,这里会生成一个unsafe 来操作bytebuffer的,还生成了pipeline,这个主要是为了执行我们初始化设定的一些handdler,我们后面分析;到这里把channel的初始化分析完了,回到之前的initAndRegister方法,我们继续往下看有个init方法,它有两个实现,一个是客户端的BootStrap,一个是服务端的ServerBootStrap,做的事情都差不多,我们看下ServerBootStrap的,

    @Override
        void init(Channel channel) throws Exception {
            // 把代码启动的时候设置的参数放到它该有的位置上
            final Map<ChannelOption<?>, Object> options = options0();
            synchronized (options) {
                 // options设置到channel上
                setChannelOptions(channel, options, logger);
            }
    
            final Map<AttributeKey<?>, Object> attrs = attrs0();
            synchronized (attrs) {
                // 遍历attr事件,设置到channel上
                for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                    @SuppressWarnings("unchecked")
                    AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                    channel.attr(key).set(e.getValue());
                }
            }
    
            ChannelPipeline p = channel.pipeline();
                ......
    
            // 把所有handler组装成pipeline
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    final ChannelPipeline pipeline = ch.pipeline();
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }            

    省略了些代码,主要还是把之前一开始初始化保存的对象绑到对应的channel上,然后放到一个 inboundhandler类型-ServerBootstrapAcceptor对象上,并给放到pipeline 链上。

    我们继续看initAndRegister的另一行代码

    ChannelFuture regFuture = config().group().register(channel);

     config().group(),在这里是 NioEventLoopGroup,register执行的是它的父类 MultithreadEventLoopGroup

    public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }

    next方法有两种实现,在NioEventLoopGroup初始化的时候会调用它的父类构造函数,如果线程数是2的次方就实例化 PowerOfTwoEventExecutorChooser,否则就是GenericEventExecutorChooser,我们看看两个实现的有啥区别

    PowerOfTwoEventExecutorChooser:
    public EventExecutor next() {
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
    
    
    GenericEventExecutorChooser:
    public EventExecutor next() {
                return executors[Math.abs(idx.getAndIncrement() % executors.length)];
            }

    一个按位与,一个是取模运算,明显按位与快一点,所以推荐设置2的n次方

    讲完chooser选择后,继续看register,因为我们是 NioEventLoop,它继承于 SingleThreadEventLoop,所以我们看它的register

    public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            promise.channel().unsafe().register(this, promise);
            return promise;
        }

    从DefaultChannelPromise 拿到niosocketchannel,拿到对应的unsafe,而 AbstractUnsafe 是 AbstractChannel的内部类,

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
               ......
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } 
                }
            }

    继续看 register0

    private void register0(ChannelPromise promise) {
                   .......
                    doRegister();
                   .......
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                   .......
            }

    继续 doRegister

    protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                }
            }
        }

    调用底层的jdk channel来注册selector,拿到一个selectionKey,initAndRegister方法算是结束了,主要是初始化channel和注册selector的,接下来看看 doBind0 方法,

    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        // 通过pipeline 从tail到head节点执行,最终在对应的NioServerSocketChannel执行bind方法
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }

    我们继续往下跟会发现,扔给线程池的任务异步执行,从AbstractChannel开始做bind操作,通过每个channel对应的 DefaultChannelPipeline 来执行bind,最终会通过 pipeline 从tail执行到head节点,最终跑到NioServerSocketChannel 类

    protected void doBind(SocketAddress localAddress) throws Exception {
            // 最终执行到jdk底层的 bind方法
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }

    最终bind 方法执行完毕。

  • 相关阅读:
    codeforces 1438D,思路非常非常巧妙的构造题
    【Azure DevOps系列】开始第一个Azure DevOps应用
    .NET Core SameSite cookie问题
    解决Caused by: java.lang.IllegalArgumentException: Property 'sqlSessionFactory' or 'sqlSessionTemplate' are required
    feign.FeignException$NotFound: status 404 reading OrdersClient#isBuyCourse(String,String)
    feign.FeignException$NotFound: status 404 reading EduClient#getCourseInfoOrder
    谷粒学院查询全部课程不显示问题
    解决java.sql.SQLException: Zero date value prohibited
    使用Visual Studio Code代码编辑器给vue安装插件,结果导致node_modules里面的安装好的依赖丢失
    Redis报错: Caused by: io.lettuce.core.RedisConnectionException: DENIED Redis is running in protected mode because protected mode is enabled, no bind address was specified, ...
  • 原文地址:https://www.cnblogs.com/myos/p/13194299.html
Copyright © 2011-2022 走看看