使用 netty,在业务层我们通常是调用 channel.write 或 channel.writeAndFlush,但是代码一路跟下来,没有发现加锁动作。
原因是,netty 在创建 channel 的时候,为每个 channel 分配一个 IO 线程,为每个 handler 分配一个业务线程,所有 IO 和业务操作都在一个线程中执行,不存在并发操作。
在上图的流水线中,HeadContext 和 TailContext 是功能 Context,没有业务 handler,当执行到 HeadContext 或 TailContext 时,发现没有 executor,则用 channel 的 evenetLoop 来执行操作。其实这里的 eventLoop 和 executor 都是单线程的线程池。
以 rocketmq 客户端为例:
// org.apache.rocketmq.remoting.netty.NettyRemotingClient 构造函数代码块 this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet())); } });
因为是客户端,不需要监听端口,所以只需要一个 IO 线程池
// org.apache.rocketmq.remoting.netty.NettyRemotingClient#start public void start() { // 创建 handler 的线程池,这里称为业务线程池 this.defaultEventExecutorGroup = new DefaultEventExecutorGroup( nettyClientConfig.getClientWorkerThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet()); } }); Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, false) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()) .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc())); log.info("Prepend SSL handler"); } else { log.warn("Connections are insecure as SSLContext is null!"); } } pipeline.addLast( defaultEventExecutorGroup, new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), new NettyConnectManageHandler(), new NettyClientHandler()); } }); }
IO 操作在 eventLoopGroupWorker 线程池中执行,业务操作(即 handler 的方法)在 defaultEventExecutorGroup 线程池中执行。
以 AbstractChannelHandlerContext#write 为例,观察切换线程的动作
// io.netty.channel.AbstractChannelHandlerContext#write private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); // 取出当前 HandlerContext 的 executor EventExecutor executor = next.executor(); // 当前执行代码的线程是否等于 executor 中的线程 // 如果是,则直接在当前线程执行 // 如果不是,则提交任务到 executor 中执行 if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(msg, promise); } else { next.invokeWrite(msg, promise); } } else { AbstractWriteTask task; // 创建任务 if (flush) { task = WriteAndFlushTask.newInstance(next, msg, promise); } else { task = WriteTask.newInstance(next, msg, promise); } // 提交到业务线程池 safeExecute(executor, task, promise, msg); } }
回到最初的问题,发送消息,最后的网络写操作,会提交到 channel 对应的线程,因此不需要加锁动作。
我提的小白 issue:https://github.com/netty/netty/issues/10035