zoukankan      html  css  js  c++  java
  • RocketMQ之连接以及连接缓存

     发现rabbitmq有一个ConnectionFactory。发现rocketmq好像没这个东西。按道理来说如果每次发送消息都新建一条连接肯定是不可能的。

    ps:其实之所以是有上面的疑问是因为数据库连接池那个地方来的,因为数据库连接connection并没有说是线程安全的,所以为了线程安全会为每个事物单独分配一个连接。但是rocketmq用的是netty的长连接channel,Java 上关于SocketChannel 的注释 说明是安全的。

    Netty : writeAndFlush的线程安全及并发问题

    可见,writeAndFlush如果在Netty线程池内执行,则是直接write;否则,将作为一个task插入到Netty线程池执行。

    这个问题归根到底是netty的问题。既然是线程安全的,那么整个系统就可以只用一个连接了。

    所以我们从源码角度来分析一下rocketmq是如何做得。

    下面我们追踪一条消息发送的过程,下面是发送消息时涉及io的模块。

    rocketmq就是在channelTables里面维护的连接。

        private Channel getAndCreateNameserverChannel() throws InterruptedException {
            String addr = this.namesrvAddrChoosed.get();
            if (addr != null) {
                ChannelWrapper cw = this.channelTables.get(addr);
                if (cw != null && cw.isOK()) {
                    return cw.getChannel();
                }
            }
    
            final List<String> addrList = this.namesrvAddrList.get();
            if (this.lockNamesrvChannel.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
                try {
                    addr = this.namesrvAddrChoosed.get();
                    if (addr != null) {
                        ChannelWrapper cw = this.channelTables.get(addr);
                        if (cw != null && cw.isOK()) {
                            return cw.getChannel();
                        }
                    }
    
                    if (addrList != null && !addrList.isEmpty()) {
                        for (int i = 0; i < addrList.size(); i++) {
                            int index = this.namesrvIndex.incrementAndGet();
                            index = Math.abs(index);
                            index = index % addrList.size();
                            String newAddr = addrList.get(index);
    
                            this.namesrvAddrChoosed.set(newAddr);
                            Channel channelNew = this.createChannel(newAddr);
                            if (channelNew != null)
                                return channelNew;
                        }
                    }
                } catch (Exception e) {
                    log.error("getAndCreateNameserverChannel: create name server channel exception", e);
                } finally {
                    this.lockNamesrvChannel.unlock();
                }
            } else {
                log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LockTimeoutMillis);
            }
    
            return null;
        }
        private Channel createChannel(final String addr) throws InterruptedException {
            ChannelWrapper cw = this.channelTables.get(addr);
            if (cw != null && cw.isOK()) {
                return cw.getChannel();
            }
    
            //lockChannelTables是ReentrantLock锁。tryLock()  尝试获取锁,不管成功失败,都立即返回true、false
            if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
                try {
                    boolean createNewConnection = false;
                    cw = this.channelTables.get(addr);
                    if (cw != null) {
    
                        if (cw.isOK()) {
                            return cw.getChannel();
                        }
    
                        else if (!cw.getChannelFuture().isDone()) {
                            createNewConnection = false;
                        }
    
                        else {
                            this.channelTables.remove(addr);
                            createNewConnection = true;
                        }
                    }
    
                    else {
                        createNewConnection = true;
                    }
    
                    if (createNewConnection) {
                        ChannelFuture channelFuture = this.bootstrap.connect(RemotingHelper.string2SocketAddress(addr));
                        log.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
                        cw = new ChannelWrapper(channelFuture);
                //连接被保存到channelTables中。 this
    .channelTables.put(addr, cw); } } catch (Exception e) { log.error("createChannel: create channel exception", e); } finally { this.lockChannelTables.unlock(); } } else { log.warn("createChannel: try to lock channel table, but timeout, {}ms", LockTimeoutMillis); } if (cw != null) { ChannelFuture channelFuture = cw.getChannelFuture(); if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { if (cw.isOK()) { log.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); return cw.getChannel(); } else { log.warn("createChannel: connect remote host[" + addr + "] failed, " + channelFuture.toString(), channelFuture.cause()); } } else { log.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), channelFuture.toString()); } } return null; }

        /**
         * @see {@link #connect()}
         */
        private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise = channel.newPromise();
            if (regFuture.isDone()) {
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            } else {
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }
        private static void doConnect0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new OneTimeTask() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        if (localAddress == null) {
                  //io.netty.channel.connect(remoteAddress,promise); channel.connect(remoteAddress, promise); }
    else { channel.connect(remoteAddress, localAddress, promise); } promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

    连接这块就点到netty为止,进一步了解可以参考netty这方面的资料。

    下面分析rocketmq对channelTables的维护。

     1,关闭系统时清空:

    2.关闭channel时

  • 相关阅读:
    抽象类和接口
    回调函数
    Spring Aop、拦截器、过滤器的区别
    事务
    SQL 模糊查询条件的四种匹配模式
    shell编程(二)
    shell编程(一)
    shell介绍
    字符验证码
    selenium
  • 原文地址:https://www.cnblogs.com/guazi/p/6664984.html
Copyright © 2011-2022 走看看