zoukankan      html  css  js  c++  java
  • rocketMQ producer 发送消息,为什么不需要加锁?

    使用 netty,在业务层我们通常是调用 channel.write 或 channel.writeAndFlush,但是代码一路跟下来,没有发现加锁动作。

    原因是,netty 在创建 channel 的时候,为每个 channel 分配一个 IO 线程,为每个 handler 分配一个业务线程,所有 IO 和业务操作都在一个线程中执行,不存在并发操作。

    在上图的流水线中,HeadContext 和 TailContext 是功能 Context,没有业务 handler,当执行到 HeadContext 或 TailContext 时,发现没有 executor,则用 channel 的 evenetLoop 来执行操作。其实这里的 eventLoop 和 executor 都是单线程的线程池。

    以 rocketmq 客户端为例:

    // org.apache.rocketmq.remoting.netty.NettyRemotingClient 构造函数代码块
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);
    
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

    因为是客户端,不需要监听端口,所以只需要一个 IO 线程池

    // org.apache.rocketmq.remoting.netty.NettyRemotingClient#start
    public void start() {
        // 创建 handler 的线程池,这里称为业务线程池
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
    
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });
    
        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });
    }

    IO 操作在 eventLoopGroupWorker 线程池中执行,业务操作(即 handler 的方法)在 defaultEventExecutorGroup 线程池中执行。

    以 AbstractChannelHandlerContext#write 为例,观察切换线程的动作

    // io.netty.channel.AbstractChannelHandlerContext#write
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        // 取出当前 HandlerContext 的 executor
        EventExecutor executor = next.executor();
        // 当前执行代码的线程是否等于 executor 中的线程
        // 如果是,则直接在当前线程执行
        // 如果不是,则提交任务到 executor 中执行
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(msg, promise);
            } else {
                next.invokeWrite(msg, promise);
            }
        } else {
            AbstractWriteTask task;
            // 创建任务
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, msg, promise);
            }  else {
                task = WriteTask.newInstance(next, msg, promise);
            }
            // 提交到业务线程池
            safeExecute(executor, task, promise, msg);
        }
    }

    回到最初的问题,发送消息,最后的网络写操作,会提交到 channel 对应的线程,因此不需要加锁动作。
    我提的小白 issue:https://github.com/netty/netty/issues/10035

  • 相关阅读:
    Kettle 实现mysql数据库不同表之间数据同步——实验过程
    Kettle ETL 来进行mysql 数据同步——试验环境搭建(表中无索引,无约束,无外键连接的情况)
    并查集知识总结
    c# 线程同步问题(about volatile)
    c# 线程的等待(堵塞)
    net中多线程返回值
    c# 中的 lock monitor mutex Semaphore 的比较
    c#两种同步结构
    links-some-blog
    T-SQL中的APPLY用法
  • 原文地址:https://www.cnblogs.com/allenwas3/p/12326439.html
Copyright © 2011-2022 走看看