zoukankan      html  css  js  c++  java
  • Netty源码—二、server启动(2)


    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        // 配置服务器的NIO线程组
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
            .option(ChannelOption.SO_BACKLOG, 1024)
            .childHandler(new ChildChannelHandler());
        // 绑定端口,同步等待成功
        ChannelFuture f = b.bind(port).sync();
        // 等待服务端监听端口关闭
    } finally {
        // 优雅退出,释放线程池资源


    1. 初始化channel
    2. channel注册到selector



    // 配置使用的channel的时候会指定对应的channelFactory
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // channelFactory是ReflectiveChannelFactory
            channel = channelFactory.newChannel();
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
            } else {
        // If we are here and the promise is not failed, it's one of the following cases:
        // 1) If we attempted registration from the event loop, the registration has been completed at this point.
        //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
        // 2) If we attempted registration from the other thread, the registration request has been successfully
        //    added to the event loop's task queue for later execution.
        //    i.e. It's safe to attempt bind() or connect() now:
        //         because bind() or connect() will be executed *after* the scheduled registration task is executed
        //         because register(), bind(), and connect() are all bound to the same thread.
        return regFuture;


    // 创建serverChannel的时候先调用newSocket,然后调用下面的构造方法
    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 设置当前socket监听的事件,由于是server一定要添加accept事件
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    // io.netty.channel.socket.nio.NioServerSocketChannel#newSocket
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
                 *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
                 *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
                 *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                "Failed to open a server socket.", e);
    ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        // 创建一个socket,返回的是socket对应的文件描述符
        this.fd =  Net.serverSocket(true);
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
    // sun.nio.ch.Net#serverSocket
    static FileDescriptor serverSocket(boolean stream) {
        // socket0是一个native方法,返回的是int类型的linux的文件描述符,使用newFD转化为Java的文件描述符
        return IOUtil.newFD(socket0(isIPv6Available(), stream, true));
    // jdk/src/solaris/native/sun/nio/ch/Net.c
    Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                                jboolean stream, jboolean reuse)
    	// 省略中间代码...
        // 调用socket方法创建一个socket,并返回对应的文件描述符
        fd = socket(domain, type, 0);
        if (fd < 0) {
            return handleSocketError(env, errno);
    	// 省略中间代码...
        return fd;


    // config()返回的是ServerBootstrapConfig
    // group()返回的是parentGroup,对应开始的例子是bossGroup,也就是NioEventLoopGroup
    // 所以是调用的是NioEventLoopGroup.register,该方法继承自MultithreadEventLoopGroup
    ChannelFuture regFuture = config().group().register(channel);
    // io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
    public ChannelFuture register(Channel channel) {
        // 使用的是bossGroup,next方法选出第一个NioEventLoop,调用NioEventLoop.register,该方法继承自SingleThreadEventLoop
        return next().register(channel);
    // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
    public ChannelFuture register(Channel channel) {
        // 注册的还是使用一个promise,可以异步注册
        return register(new DefaultChannelPromise(channel, this));
    // io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // channel返回的是NioServerSocketChannel
        // unsafe返回的是io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe
        // 所以调用的是NioMessageUnsafe.register,该方法继承自AbstractUnsafe
        promise.channel().unsafe().register(this, promise);
        return promise;
    // io.netty.channel.AbstractChannel.AbstractUnsafe#register
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 省略中间代码...
    	// 当前线程是main线程,eventLoop是bossGroup中的一个线程,所以这里返回false,会在新线程中执行register0
        if (eventLoop.inEventLoop()) {
        } else {
            try {
                // 在eventLoop中执行
                eventLoop.execute(new Runnable() {
                    public void run() {
            } catch (Throwable t) {
    		// 省略中间代码...
    private void register0(ChannelPromise promise) {
        try {
    		// 省略中间代码...
            // 这里面主要是调用ServerSocketChannelImpl.register,注册的过程中主要是将需要监听的文件描述符添加到EPollArrayWrapper中
            neverRegistered = false;
            registered = true;
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            if (isActive()) {
                if (firstRegistration) {
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    // See https://github.com/netty/netty/issues/4805
        } catch (Throwable t) {
            // 省略中间代码...


    // sun.nio.ch.SelectorImpl#register
    // 这里ch是ServerSocketChannelImpl
    // attachment是NioServerSocketChannel
    // ops是0,这里并不注册需要监听的事件
    // selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        // 创建一个SelectionKeyImpl,
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        synchronized (publicKeys) {
            // 调用sun.nio.ch.EPollSelectorImpl#implRegister
        // 设置当前channel关注的事件
        return k;
    protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        int fd = Integer.valueOf(ch.getFDVal());
        fdToKey.put(fd, ski);
        // poolWrapper是epoll监听事件所需数据结构的java版本
        // add方法调用setUpdateEvents来指定当前socket监听的事件
     * struct epoll_event {
     *     __uint32_t events;
     *     epoll_data_t data;
     * };
     * 由于一开始并不知道会监听多少个socket,所以jdk默认指定了MAX_UPDATE_ARRAY_SIZE
     * 如果小于MAX_UPDATE_ARRAY_SIZE则使用数组eventsLow存储每个socket监听的事件,eventsLow的下标就是socket对应的文件描述符
     * 如果大于等于MAX_UPDATE_ARRAY_SIZE个则使用EPollArrayWrapper#eventsHigh,也就是一个map来保存每个socket监听的事件
     * 注意这个时候调用setUpdateEvents的events参数是0,也就是还没有执行监听的事件类型
    private void setUpdateEvents(int fd, byte events, boolean force) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            if ((eventsLow[fd] != KILLED) || force) {
                eventsLow[fd] = events;
        } else {
            Integer key = Integer.valueOf(fd);
            if (!isEventsHighKilled(key) || force) {
                eventsHigh.put(key, Byte.valueOf(events));


    // io.netty.channel.nio.AbstractNioChannel#doBeginRead
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
        readPending = true;
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            // readInterestOp是16,在NioServerSocketChannel构造方法里面指定了这个channel需要监听accept事件
            // 这里才是真正设置socket监听事件的地方
            // 下面这个方法最后会调用到sun.nio.ch.EPollArrayWrapper#setInterest
            selectionKey.interestOps(interestOps | readInterestOp);
    // sun.nio.ch.EPollArrayWrapper#setInterest
    void setInterest(int fd, int mask) {
        synchronized (updateLock) {
            // record the file descriptor and events
            int oldCapacity = updateDescriptors.length;
            if (updateCount == oldCapacity) {
                int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
                int[] newDescriptors = new int[newCapacity];
                System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
                updateDescriptors = newDescriptors;
            updateDescriptors[updateCount++] = fd;
            // events are stored as bytes for efficiency reasons
            byte b = (byte)mask;
            assert (b == mask) && (b != KILLED);
            // 上面已经说过这个方法了,把当前socket对应的文件描述符监听的事件设置为b
            setUpdateEvents(fd, b, false);


    public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
        synchronized (lock) {
            // 省略中间代码...
            // 调用native方法的bind,最后调用linux的bind方法
            Net.bind(fd, isa.getAddress(), isa.getPort());
            // 最后调用listen方法完成监听serverSocket的文件描述符
            Net.listen(fd, backlog < 1 ? 50 : backlog);
            synchronized (stateLock) {
                localAddress = Net.localAddress(fd);
        return this;



  • 相关阅读:
    Leetcode589.N-ary Tree Preorder TraversalN叉树的前序遍历
    Leetcode563.Binary Tree Tilt二叉树的坡度
    Leetcode559.Maximum Depth of N-ary TreeN叉树的最大深度
    Leetcode561.Array Partition I数组拆分1
    Leetcode551.Student Attendance Record I学生出勤记录1
    Leetcode543.Diameter of Binary Tree二叉树的直径
    Leetcode520Detect Capital检测大写字母
    Leetcode532.K-diff Pairs in an Array数组中的K-diff数对
    Leetcode496.Next Greater Element I下一个更大的元素1
  • 原文地址:https://www.cnblogs.com/sunshine-2015/p/9357760.html
Copyright © 2011-2022 走看看