zoukankan      html  css  js  c++  java
  • dubbo 心跳

    HeartBeatTask 类封装了心跳定时任务,需要了解的是 provider 和 consumer 都有可能发送心跳。

    final class HeartBeatTask implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger( HeartBeatTask.class );
        private ChannelProvider channelProvider;
        private int             heartbeat;
        private int             heartbeatTimeout;
    
        HeartBeatTask( ChannelProvider provider, int heartbeat, int heartbeatTimeout ) {
            this.channelProvider = provider;
            this.heartbeat = heartbeat;
            this.heartbeatTimeout = heartbeatTimeout;
        }
    
        public void run() {
            try {
                long now = System.currentTimeMillis();
                for ( Channel channel : channelProvider.getChannels() ) {
                    if (channel.isClosed()) {
                        continue;
                    }
                    try {
                        Long lastRead = ( Long ) channel.getAttribute(
                                HeaderExchangeHandler.KEY_READ_TIMESTAMP );
                        Long lastWrite = ( Long ) channel.getAttribute(
                                HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
                        if ( ( lastRead != null && now - lastRead > heartbeat )
                                || ( lastWrite != null && now - lastWrite > heartbeat ) ) {
                            Request req = new Request();
                            req.setVersion( "2.0.0" );
                            req.setTwoWay( true );
                            req.setEvent( Request.HEARTBEAT_EVENT );
                            channel.send( req );                  
                        }
                        if (lastRead != null && now - lastRead > heartbeatTimeout) {
                            //如果是 consumer 端
                            if (channel instanceof Client) {
                               ((Client)channel).reconnect();
                            } else { // provider
                                channel.close();
                            }
                        }
                    } catch ( Throwable t ) {
                    }
                }
            } catch ( Throwable t ) {
                logger.warn( "Unhandled exception when heartbeat, cause: " + t.getMessage(), t );
            }
        }
    
        interface ChannelProvider {
            Collection<Channel> getChannels();
        }
    
    }

    对于 consumer,是在 HeaderExchangeClient 类中启动心跳定时器,而 provider,则是在 HeaderExchangeServer 中启动心跳定时器。

    consumer发送请求时,更新 lastWrite 值,接收响应时,更新 lastRead 值。心跳定时器定时检查 lastRead 和 lastWrite,发送心跳、重连。

    public class HeaderExchangeClient implements ExchangeClient {
        private static final ScheduledThreadPoolExecutor scheduled = 
            new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
        // 心跳定时器
        private ScheduledFuture<?> heatbeatTimer;
        private int heartbeat;
        private int heartbeatTimeout;
        
        public HeaderExchangeClient(Client client){
            if (client == null) {
                throw new IllegalArgumentException("client == null");
            }
            this.client = client;
            this.channel = new HeaderExchangeChannel(client);
            String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
            //heartbeat = 60000
            this.heartbeat = client.getUrl().getParameter( Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0 );
            //heartbeatTimeout = 180000
            this.heartbeatTimeout = client.getUrl().getParameter( Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 );
            if ( heartbeatTimeout < heartbeat * 2 ) {
                throw new IllegalStateException( "heartbeatTimeout < heartbeatInterval * 2" );
            }
            startHeatbeatTimer();
        }
    
        public ResponseFuture request(Object request) throws RemotingException {
            return channel.request(request);
        }
        
        private void startHeatbeatTimer() {
            stopHeartbeatTimer();
            if ( heartbeat > 0 ) {
                heatbeatTimer = scheduled.scheduleWithFixedDelay(
                        new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                            public Collection<Channel> getChannels() {
                                return Collections.<Channel>singletonList( HeaderExchangeClient.this );
                            }
                        }, heartbeat, heartbeatTimeout),
                        heartbeat, heartbeat, TimeUnit.MILLISECONDS );
            }
        }
    }

    在 HeartbeatHandler 类中设置 lastRead 和 lastWrite 值:

    public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
        //省略其他代码
        private void setReadTimestamp(Channel channel) {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        }
    
        private void setWriteTimestamp(Channel channel) {
            channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
        }
    }

    设置 lastWrite 的调用栈:

    设置 lastRead 的调用栈:

  • 相关阅读:
    .net技巧推荐
    ASPNETPager常用属性
    带有like的存储过程
    Jquery选择器
    关于出现too many open files异常
    将ReadWriteLock应用于缓存设计
    读CopyOnWriteArrayList有感
    HttpClient容易忽视的细节——连接关闭
    windows下如何用java命令运行jar包?
    再谈重入锁ReentrantLock
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8335778.html
Copyright © 2011-2022 走看看