zoukankan      html  css  js  c++  java
  • RocketMQ-通信层

    前言

    RocketMQ的网络通信是基于Netty实现的RPC框架,这些RPC框架实现的功能都具有通用性,如sofa-bolt,分布式服务框架Dubbo,实现的网络通信模型都具有协议定义,同步请求,异步请求,单向请求,负载均衡,流控,心跳,重连等机制。

    服务端NettyRemotingServer

    通信的服务端继承NettyRemotingAbstract实现RemotingServer

    public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer

       server端实现的功能:

    public interface RemotingServer extends RemotingService {
       // 注册处理器
        void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
            final ExecutorService executor);
      
        void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
    
        int localListenPort();
    
        Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
    
        RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
            final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
            RemotingTimeoutException;
    
        void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
            final InvokeCallback invokeCallback) throws InterruptedException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    
        void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
            RemotingSendRequestException;
    View Code

    server端初始化
    实现Reactor主从多线程模型,初始化3个eventloop,
    private final EventLoopGroup eventLoopGroupSelector;   // 用于就绪选择,可以使Epoll或者NIO的模式。默认3个线程数
    private final EventLoopGroup eventLoopGroupBoss; //作为acceptor负责与client建立连接 一个线程足够了
    private DefaultEventExecutorGroup defaultEventExecutorGroup; // 工作线程池用于处理handler,默认8个线程

    server端启动
    初始化serverBootstrap,设置TCP参数,定义handler调用链。
    @Override
        public void start() {
            // 处理handler i/o 线程组
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                    nettyServerConfig.getServerWorkerThreads(),
                    new ThreadFactory() {
    
                        private AtomicInteger threadIndex = new AtomicInteger(0);
    
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                        }
                    });
            //   对于线程池的配置  可以参考一下 Reactor  主从多线程池模型
            ServerBootstrap childHandler =
                    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                            .option(ChannelOption.SO_BACKLOG, 1024)
                            .option(ChannelOption.SO_REUSEADDR, true)
                            .option(ChannelOption.SO_KEEPALIVE, false)
                            .childOption(ChannelOption.TCP_NODELAY, true)
                            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel ch) throws Exception {
                                    ch.pipeline().addLast(
                                            defaultEventExecutorGroup,
                                            new NettyEncoder(),
                                            new NettyDecoder(),
                                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                            new NettyConnectManageHandler(),
                                            new NettyServerHandler());
                                }
                            });
    
            // 启用字节缓冲池
            if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
                childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            }
            try {
                ChannelFuture sync = this.serverBootstrap.bind().sync();
                InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
                this.port = addr.getPort();
            } catch (InterruptedException e1) {
                throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
            }
    
            // 启动netty事件监听
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
    
            // 定时任务处理当前响应
            this.timer.scheduleAtFixedRate(new TimerTask() {
    
                @Override
                public void run() {
                    try {
                        NettyRemotingServer.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
    
    
        }
    View Code

    这里面主要关注这几个handler,NettyEncoder,NettyDecoder,IdleStateHandler,NettyConnectManageHandler,NettyServerHandler,将会在下文介绍。

     客户端 NettyRemotingClient

    public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient 

    client端实现的功能,与server端类似
    public interface RemotingClient extends RemotingService {
    
        public void updateNameServerAddressList(final List<String> addrs);
    
        public List<String> getNameServerAddressList();
    
        public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
            final long timeoutMillis) throws InterruptedException, RemotingConnectException,
            RemotingSendRequestException, RemotingTimeoutException;
    
        public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
            final InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException,
            RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
    
        public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)
            throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException,
            RemotingTimeoutException, RemotingSendRequestException;
    
        public void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
            final ExecutorService executor);
    
        public boolean isChannelWriteable(final String addr);
    }
    View Code

      客户端发起连接,是在发送消息的时候实时建立连接的

    client初始化bootstrap

    跟server不同的是,client需要一个eventLoopGroupWorker负责建立连接,并且初始化一个defaultEventExecutorGroup处理handler。

        @Override
        public void start() {
            this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
                nettyClientConfig.getClientWorkerThreads(), //
                new ThreadFactory() {
    
                    private AtomicInteger threadIndex = new AtomicInteger(0);
    
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                    }
                });
    
            Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
                .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
                .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(
                            defaultEventExecutorGroup,
                            new NettyEncoder(),
                            new NettyDecoder(),
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                            new NettyConnectManageHandler(),
                            new NettyClientHandler());
                    }
                });
    
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        NettyRemotingClient.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
    
            if (this.channelEventListener != null) {
                this.nettyEventExecutor.start();
            }
        }
    View Code

    相关的handler将会在下文介绍。

    通信协议RemotingCommand

    定义了远程调用的数据结构,定义了私有化协议,具体看一下这些属性:

    private static final int RPC_TYPE = 0; // 0, REQUEST_COMMAND  // 1, RESPONSE_COMMAND
        private static final int RPC_ONEWAY = 1; // 0, RPC  // 1, Oneway
        /**
         * 缓存了具体的CommandCustomHeader都有哪些field
         */
        private static final Map<Class<? extends CommandCustomHeader>, Field[]> CLASS_HASH_MAP =
                new HashMap<Class<? extends CommandCustomHeader>, Field[]>();
        /**
         * 缓存具体的CommandCustomHeader对应的简单类名
         */
        private static final Map<Class, String> CANONICAL_NAME_CACHE = new HashMap<Class, String>();
    
        /**
         * 缓存一个field和对应的注解信息,用于后续解码后,数据的非空校验等
         */
        private static final Map<Field, Annotation> NOT_NULL_ANNOTATION_CACHE = new HashMap<Field, Annotation>();
        // 具体的请求编号
        private int code;
        private LanguageCode language = LanguageCode.JAVA;
        private int version = 0;
        // 一次rpc的请求ID
        private int opaque = requestId.getAndIncrement();
        private int flag = 0;
        // 异常信息记录在remark
        private String remark;
    
        //扩展字段,数据序列化前后存储结构
        private HashMap<String, String> extFields;
        
        //  数据编码之前 把customHeader中的属性转化成extFields 再序列化
        //   数据反序列化之后 存在在extFields 再装换成具体的customHeader
        private transient CommandCustomHeader customHeader;
    
        /**
         * 有rocketmq和json两种序列化方式,默认为json
         */
        private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
    
        // 消息message,调用接口传入
        private transient byte[] body;
    

      

    编解码

    NettyEncoder

    实际上就是把请求header与body的数据发送出去

    ByteBuffer header = remotingCommand.encodeHeader();
                out.writeBytes(header);
                // body的编码是上面在message中序列化好的
                byte[] body = remotingCommand.getBody();
                if (body != null) {
                    out.writeBytes(body);
                }

    具体的编码过程:

    header返回的缓冲字节结构如下:

    |-- int 4byte 总长度(包括body长度)--|--int 4bytes 序列化信息--| -- headerData--|

    private ByteBuffer encodeHeader(final int bodyLength) {
            //计算分配的内存大小   最前面需要四个字节,放一个int整型
            // 1> header length size  最前面放一个Int整型  占用四个字节
            int length = 4;
    
            // 2> header data length   这里面包含很多信息
            byte[] headerData;
            headerData = this.headerEncode();
            length += headerData.length;
    
            // 3> body data length
            length += bodyLength;
    
    
            //  放  总长度(4)  +   序列化类型(4)+ header的长度
            ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
    
            // 目前为止 length的值包含了  4 + headerData.length + body.length
            // length  这个值不包含 ProtocolType长度
            result.putInt(length);
    
    
            // header length  4个字节
            result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
    
            // header data
            result.put(headerData);
    
            result.flip();
    
            return result;
    
        }

     4个字节的的整型int序列化信息,包含着一个字节的序列化code和3个字节保存一个4字节的headerLength

    public static byte[] markProtocolType(int source, SerializeType type) {
            byte[] result = new byte[4];
    
            result[0] = type.getCode();
            result[1] = (byte) ((source >> 16) & 0xFF);
            result[2] = (byte) ((source >> 8) & 0xFF);
            result[3] = (byte) (source & 0xFF);
            return result;
        }

    // 获取headerlength
    public static int getHeaderLength(int length) {
    return length & 0xFFFFFF;
    }

     接下来看一下headerData = this.headerEncode(); 具体如何编码header的

    private byte[] headerEncode() {
    // 这个方法实际上就是  把自定义header类里面的字段,封装成map,放在extFields里面 以便后续序列化
            this.makeCustomHeaderToNet();
            if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
                return RocketMQSerializable.rocketMQProtocolEncode(this);
            } else {
    // json序列化很好理解 就是 JSON.toJSONString(obj, prettyFormat);
                return RemotingSerializable.encode(this);
            }
        }

    herderData在RocketMQ自定义序列化的条件下,结构是这样的,

    private static int calTotalLen(int remark, int ext) {
            // int code(~32767)
            int length = 2
                    // LanguageCode language
                    + 1
                    // int version(~32767)
                    + 2
                    // int opaque
                    + 4
                    // int flag
                    + 4
                    // String remark
                    + 4 + remark
                    // HashMap<String, String> extFields
                    + 4 + ext;
    
            return length;
    

      其中ext的序列化方式是

    // 都是长度加数据的方式追加
    Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                int kvLength; // keylength(short 2)+ keybyte + valuelength(int 4) + valuebyte
                Map.Entry<String, String> entry = it.next();
                if (entry.getKey() != null && entry.getValue() != null) {
                    kvLength = 2 + entry.getKey().getBytes(CHARSET_UTF8).length
                            + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
                    totalLength += kvLength;
                }

     

    NettyDecoder

    public class NettyDecoder extends LengthFieldBasedFrameDecoder
    public NettyDecoder() {
            /**
             * 1.从消息开头偏移lengthFieldOffset长度, 到达A位置
             *
             * 2.再从A位置读取lengthFieldLength长度, 到达B位置, 内容是d
             *
             * 3.再从B位置读取(d+lengthAdjustment)长度, 达到D位置
             *
             * 4.从消息开头跳过initialBytesToStrip长度到达C位置
             *
             * 5.将C位置-D位置之间的内容传送给接下来的处理器进行后续处理
             *
             * 根据编码器定义  实际上一个数据包是 4字节的序列化信息 + headerData + body
             */
            super(FRAME_MAX_LENGTH, 0, 4, 0, 4);

    主要关注 RemotingCommand.decode(byteBuffer);逻辑,

    public static RemotingCommand decode(final ByteBuffer byteBuffer) {
            int length = byteBuffer.limit();  // 4+ headerlength + bodylength
            int oriHeaderLen = byteBuffer.getInt();  // 4个字节   序列化方式信息
            // HeaderData长度
            int headerLength = getHeaderLength(oriHeaderLen);
            byte[] headerData = new byte[headerLength];
            byteBuffer.get(headerData);
            // 反序列化headerData
            RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
            // 获取body字节
            int bodyLength = length - 4 - headerLength;
            byte[] bodyData = null;
            if (bodyLength > 0) {
                bodyData = new byte[bodyLength];
                byteBuffer.get(bodyData);
            }
            cmd.body = bodyData;
    
            return cmd;
        }
    private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
            switch (type) {
                case JSON: // 相对简单  这边不详细介绍了
                    RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
                    resultJson.setSerializeTypeCurrentRPC(type);
                    return resultJson;
                case ROCKETMQ:  // 主要看一下
                    RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
                    resultRMQ.setSerializeTypeCurrentRPC(type);
                    return resultRMQ;
                default:
                    break;
            }
    public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
            RemotingCommand cmd = new RemotingCommand();
            ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
            // int code(~32767)
            cmd.setCode(headerBuffer.getShort());
            // LanguageCode language
            cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
            // int version(~32767)
            cmd.setVersion(headerBuffer.getShort());
            // int opaque
            cmd.setOpaque(headerBuffer.getInt());
            // int flag
            cmd.setFlag(headerBuffer.getInt());
            // String remark
            int remarkLength = headerBuffer.getInt();
            if (remarkLength > 0) {
                byte[] remarkContent = new byte[remarkLength];
                headerBuffer.get(remarkContent);
                cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
            }
    
            // HashMap<String, String> extFields
            int extFieldsLength = headerBuffer.getInt();
            if (extFieldsLength > 0) {
                byte[] extFieldsBytes = new byte[extFieldsLength];
                headerBuffer.get(extFieldsBytes);
                //mapDeserialize方法会把extmap逐个解码
                cmd.setExtFields(mapDeserialize(extFieldsBytes));
            }
            return cmd;
        }

     

    网络事件处理机制

    这里主要介绍IdleStateHandler 和 NettyConnectManageHandler

    IdleStateHandler

    // IdleStateHandler  是netty提供的心跳机制 其本身不会发送心跳数据
    // 用来检测读空闲 写空闲 还是读写空闲,根据设置的时间来触发对应的事件
    // 用户自定义的handler的userEventTriggered会捕捉到空闲超时类型,用户可以
    //自定义处理逻辑 比如写空闲了 可以发送心跳数据
    // 设置0表示不启用设置
    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),

    NettyConnectManageHandler

    class NettyConnectManageHandler extends ChannelDuplexHandler {
            @Override
            public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
                                ChannelPromise promise) throws Exception {
                final String local = localAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(localAddress);
                final String remote = remoteAddress == null ? "UNKNOWN" : RemotingHelper.parseSocketAddressAddr(remoteAddress);
                log.info("NETTY CLIENT PIPELINE: CONNECT  {} => {}", local, remote);
    
                super.connect(ctx, remoteAddress, localAddress, promise);
    
                if (NettyRemotingClient.this.channelEventListener != null) {
                    NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CONNECT, remote, ctx.channel()));
                }
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY CLIENT PIPELINE: DISCONNECT {}", remoteAddress);
                closeChannel(ctx.channel());
                super.disconnect(ctx, promise);
    
                if (NettyRemotingClient.this.channelEventListener != null) {
                    NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
                }
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
                closeChannel(ctx.channel());
                super.close(ctx, promise);
    
                if (NettyRemotingClient.this.channelEventListener != null) {
                    NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
                }
            }
    
            /**
             * 空闲时,客户端会主动断开连接
             * @param ctx
             * @param evt
             * @throws Exception
             */
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent event = (IdleStateEvent) evt;
                    if (event.state().equals(IdleState.ALL_IDLE)) {
                        final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress);
                        closeChannel(ctx.channel());
                        if (NettyRemotingClient.this.channelEventListener != null) {
                            NettyRemotingClient.this
                                    .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));
                        }
                    }
                }
    
                ctx.fireUserEventTriggered(evt);
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                log.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", remoteAddress);
                log.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", cause);
                closeChannel(ctx.channel());
                if (NettyRemotingClient.this.channelEventListener != null) {
                    NettyRemotingClient.this.putNettyEvent(new NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));
                }
            }
        }
    View Code

    读者可以发现,在每一次网络事件都会去组装一个NettyEvent的事件信息放到一个网络事件处理队列中

                            NettyRemotingClient.this
                                    .putNettyEvent(new NettyEvent(NettyEventType.IDLE, remoteAddress, ctx.channel()));

    这边是典型的生产者和消费者线程模型

    /**
         * 这个内部类的作用:
         * 当nettyclient监听到Netty事件,比如连接  断开  异常等情况 执行客户端自定义的事件处理
         */
        class NettyEventExecutor extends ServiceThread {
            private final LinkedBlockingQueue<NettyEvent> eventQueue = new LinkedBlockingQueue<>();
            private final int maxSize = 10000;
    
            public void putNettyEvent(final NettyEvent event) {
                if (this.eventQueue.size() <= maxSize) {
                    this.eventQueue.add(event);
                } else {
                    log.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString());
                }
            }
    
            @Override
            public String getServiceName() {
                return NettyEventExecutor.class.getSimpleName();
            }
    
            @Override
            public void run() {
                log.info(this.getServiceName() + " service started");
                final ChannelEventListener listener = NettyRemotingAbstract.this.getChannelEventListener();
                while (!this.isStopped()) {
                    try {
                        NettyEvent event = this.eventQueue.poll(3000, TimeUnit.MILLISECONDS);
                        if (event != null && listener != null) {
                            switch (event.getType()) {
                                case IDLE:
                                    listener.onChannelIdle(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case CLOSE:
                                    listener.onChannelClose(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case CONNECT:
                                    listener.onChannelConnect(event.getRemoteAddr(), event.getChannel());
                                    break;
                                case EXCEPTION:
                                    listener.onChannelException(event.getRemoteAddr(), event.getChannel());
                                    break;
                                default:
                                    break;
    
                            }
                        }
                    } catch (Exception e) {
                        log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
            }
        }

    这边延伸介绍一下后台线程基类ServiceThread,本身是一个runnable对象,stop方法和shutdown方法 与 waitForRunning方法 实现等待通知的模型。在后面的刷盘,数据同步等都采用这种方式

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.rocketmq.remoting.common;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * Base class for background thread
     */
    public abstract class ServiceThread implements Runnable {
        private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    
        private static final long JOIN_TIME = 90 * 1000;
        protected final Thread thread;
        protected volatile boolean hasNotified = false;
        protected volatile boolean stopped = false;
    
        public ServiceThread() {
            this.thread = new Thread(this, this.getServiceName());
        }
    
        public abstract String getServiceName();
    
        public void start() {
            this.thread.start();
        }
    
        public void shutdown() {
            this.shutdown(false);
        }
    
        public void shutdown(final boolean interrupt) {
            this.stopped = true;
            log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
    
            try {
                if (interrupt) {
                    this.thread.interrupt();
                }
    
                long beginTime = System.currentTimeMillis();
                this.thread.join(this.getJointime());
                long eclipseTime = System.currentTimeMillis() - beginTime;
                log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
                        + this.getJointime());
            } catch (InterruptedException e) {
                log.error("Interrupted", e);
            }
        }
    
        public long getJointime() {
            return JOIN_TIME;
        }
    
        public void stop() {
            this.stop(false);
        }
    
        public void stop(final boolean interrupt) {
            this.stopped = true;
            log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
    
            if (interrupt) {
                this.thread.interrupt();
            }
        }
    
        public void makeStop() {
            this.stopped = true;
            log.info("makestop thread " + this.getServiceName());
        }
    
        public void wakeup() {
            synchronized (this) {
                if (!this.hasNotified) {
                    this.hasNotified = true;
                    this.notify();
                }
            }
        }
    
        protected void waitForRunning(long interval) {
            synchronized (this) {
                if (this.hasNotified) {
                    this.hasNotified = false;
                    this.onWaitEnd();
                    return;
                }
    
                try {
                    this.wait(interval);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                } finally {
                    this.hasNotified = false;
                    this.onWaitEnd();
                }
            }
        }
    
        protected void onWaitEnd() {
        }
    
        public boolean isStopped() {
            return stopped;
        }
    }
    View Code

    超时请求定时清除

     在client和server端还维护了一个timer的定时任务

    this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        NettyRemotingClient.this.scanResponseTable();
                    } catch (Throwable e) {
                        log.error("scanResponseTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
    /**
         * <p>
         *    This method is periodically invoked to scan and expire deprecated request.
         * </p>
         */
        public void scanResponseTable() {
            final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
            Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<Integer, ResponseFuture> next = it.next();
                ResponseFuture rep = next.getValue();
                // 停止等待超时响应 并删除缓存
                if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                    rep.release();
                    it.remove();
                    rfList.add(rep);
                    log.warn("remove timeout request, " + rep);
                }
            }
    
            // 同时对超时的异步请求 执行回调
            for (ResponseFuture rf : rfList) {
                try {
                    executeInvokeCallback(rf);
                } catch (Throwable e) {
                    log.warn("scanResponseTable, operationComplete Exception", e);
                }
            }
        }

    处理接收的远程数据  NettyClientHandler 和 NettyServerHandler

    客户端或者服务端对接收的数据可以分两类,一个是远程服务请求操作,另外一个是远程服务响应返回。

    REQUEST_COMMAND

    public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            final RemotingCommand cmd = msg;
            if (cmd != null) {
                switch (cmd.getType()) {
                    case REQUEST_COMMAND:
                        processRequestCommand(ctx, cmd);
                        break;
                    case RESPONSE_COMMAND:
                        processResponseCommand(ctx, cmd);
                        break;
                    default:
                        break;
                }
            }
        }

    REQUEST_COMMAND

        public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
            // 根据请求code,获取已经注册的处理器与线程池配对组
            final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
            // 如果找不到 则使用默认处理器  在server端使用adminprocess作为默认处理器
            final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
            final int opaque = cmd.getOpaque();
    
            if (pair != null) {
                Runnable run = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // rpchook
                            RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                            if (rpcHook != null) {
                                rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                            }
    
                            // 处理请求的具体逻辑,返回RemotingCommand响应数据
                            final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                            if (rpcHook != null) {
                                rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            }
    
                            // 对于远程过来的请求类型不是单向请求的话  则设置opaque  ResponseType
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
    
                                }
                            }
                        } catch (Throwable e) {
                            log.error("process request exception", e);
                            log.error(cmd.toString());
    
                            if (!cmd.isOnewayRPC()) {
                                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                                    RemotingHelper.exceptionSimpleDesc(e));
                                response.setOpaque(opaque);
                                ctx.writeAndFlush(response);
                            }
                        }
                    }
                };
    
                if (pair.getObject1().rejectRequest()) {
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[REJECTREQUEST]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    return;
                }
    
                try {
                    // 把任务包装成RequestTask  submit给对应的线程池
                    final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                    pair.getObject2().submit(requestTask);
                } catch (RejectedExecutionException e) {
                    if ((System.currentTimeMillis() % 10000) == 0) {
                        log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                            + ", too many requests and system thread pool busy, RejectedExecutionException " //
                            + pair.getObject2().toString() //
                            + " request code: " + cmd.getCode());
                    }
    
                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                            "[OVERLOAD]system busy, start flow control for a while");
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            } else {
                String error = " request type " + cmd.getCode() + " not supported";
                final RemotingCommand response =
                    RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
            }
        }

    RESPONSE_COMMAND

    返回响应数据相对简单一些,同步本地请求的话,就直接设置返回值,如果本地是异步请求的话,使用回调线程池处理回调函数,最后在缓存中删除对应的responseFuture

    public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
            final int opaque = cmd.getOpaque();
            final ResponseFuture responseFuture = responseTable.get(opaque);
            if (responseFuture != null) {
                responseFuture.setResponseCommand(cmd);
    
                responseFuture.release();
    
                responseTable.remove(opaque);
    
                if (responseFuture.getInvokeCallback() != null) {
                    executeInvokeCallback(responseFuture);
                } else {
                    responseFuture.putResponse(cmd);
                }
            } else {
                log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
                log.warn(cmd.toString());
            }
        }

    同步请求

    1 客户端主动发起连接,如果没传addr,请求namesrv,否则获取addr对应的channel,如果channel没有在缓存中找到,则创建channel。

    2 执行RpcHook

    3 同步请求会创建一个ResponseFuture,并缓存起来,等到数据发送成功之后,设置发送请求成功并立刻返回。如果发送失败,则设置异常信息,并返回。

    final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
                this.responseTable.put(opaque, responseFuture);
                final SocketAddress addr = channel.remoteAddress();
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }
    
                        responseTable.remove(opaque);
                        responseFuture.setCause(f.cause());
                        responseFuture.putResponse(null);
                        log.warn("send a request command to channel <" + addr + "> failed.");
                    }
                });

    那么如何实现同步的呢?

    接着调用下面的代码,

    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

    这个就得益于ResponseFuture中的countDownLatch,实现同步等待,每一个response都有有一个独立的countDownLatch,当前调用线程执行完waitResponse会等待中,等到响应回调的时候(见RESPONSE_COMMAND中的代码),会调用putResponse,将会设置返回结果,同步等待得到唤醒。

    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
            this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
            return this.responseCommand;
        }
    
        public void putResponse(final RemotingCommand responseCommand) {
            this.responseCommand = responseCommand;
            this.countDownLatch.countDown();
        }

    最后在缓存中删除对应的ResponseFuture

    异步请求

    相对于同步请求,异步请求只有前置的rpchook,同时实现异步流控,并在发送完数据之后,释放信号量。另外调用线程不需要调用waitResponse方法同步等待,

    等到响应回调的时候(见RESPONSE_COMMAND中的代码),会调用executeInvokeCallback(responseFuture);

    private void executeInvokeCallback(final ResponseFuture responseFuture) {
            boolean runInThisThread = false;
            ExecutorService executor = this.getCallbackExecutor();
            if (executor != null) {
                try {
                    executor.submit(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                responseFuture.executeInvokeCallback();
                            } catch (Throwable e) {
                                log.warn("execute callback in executor exception, and callback throw", e);
                            }
                        }
                    });
                } catch (Exception e) {
                    runInThisThread = true;
                    log.warn("execute callback in executor exception, maybe executor busy", e);
                }
            } else {
                runInThisThread = true;
            }
    
            if (runInThisThread) {
                try {
                    responseFuture.executeInvokeCallback();
                } catch (Throwable e) {
                    log.warn("executeInvokeCallback Exception", e);
                }
            }
        }
        /**
         * 包装回调只处理一次 ,可能存在网络回调和定时调度并行处理 使用布尔原子类能保证对个线程只处理一次
         */
        public void executeInvokeCallback() {
            if (invokeCallback != null) {
                if (this.executeCallbackOnlyOnce.compareAndSet(false, true)) {
                    invokeCallback.operationComplete(this);
                }
            }
        }

    单向请求

    单向请求不需要同步等待,也不需要回调,其他与异步请求调用类似。

  • 相关阅读:
    算法演示工具
    1198:逆波兰表达式
    1315:【例4.5】集合的划分
    1192:放苹果
    1191:流感传染
    1354括弧匹配检验
    1331【例1-2】后缀表达式的值
    1307高精度乘法
    1162字符串逆序
    1161转进制
  • 原文地址:https://www.cnblogs.com/gaojy/p/15077224.html
Copyright © 2011-2022 走看看