zoukankan      html  css  js  c++  java
  • 9.7 dubbo心跳机制

    dubbo的心跳机制:

    • 目的:检测provider与consumer之间的connection连接是不是还连接着,如果连接断了,需要作出相应的处理。
    • 原理:
      • provider:dubbo的心跳默认是在heartbeat(默认是60s)内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,provider会关闭channel
      • consumer:dubbo的心跳默认是在60s内如果没有接收到消息,就会发送心跳消息,如果连着3次(180s)没有收到心跳响应,consumer会进行重连

    来看源码调用链。先看provider端。

    一、provider端心跳机制

                  -->openServer(URL url)
                     url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&default.server=netty4&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836
                    -->createServer(URL url)
                        -->HeaderExchanger.bind(URL url, ExchangeHandler handler)
                           url:dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=10.10.10.10&bind.port=20880&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21999&qos.port=22222&side=provider&timestamp=1520660491836 handler:DubboProtocol.requestHandler
                          -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                            -->NettyTransporter.bind(URL url, ChannelHandler listener)
                               listener:上边的DecodeHandler实例
                              -->new NettyServer(URL url, ChannelHandler handler)
                                -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
                                   handler:上边的DecodeHandler实例
                                -->doOpen()//开启netty服务
                          -->new HeaderExchangeServer(Server server)
                             server:上述的NettyServer
                            -->startHeatbeatTimer()

    服务端在开启netty服务时, 在调用createServer时,会从url的parameters map中获取heartbeat配置,代码如下:

     1     private ExchangeServer createServer(URL url) {
     2 
     3         ...
     4 
     5         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
     6        
     7         ...
     8 
     9         ExchangeServer server;
    10         try {
    11             server = Exchangers.bind(url, requestHandler);
    12         } catch (RemotingException e) {
    13             throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    14         }
    15 
    16         ...
    17 
    18         return server;
    19     }

    其中:int DEFAULT_HEARTBEAT = 60 * 1000,即当用户没有配置heartbeat(心跳时间)时,默认heartbeat=60s(即60s内没有接收到任何请求,就会发送心跳信息)。那么这个heartbeat到底该怎么配?

    provider端:

    1     <dubbo:service ...>
    2         <dubbo:parameter key="heartbeat" value="3000"/>
    3     </dubbo:service>

    consumer端:

    1     <dubbo:reference ...>
    2         <dubbo:parameter key="heartbeat" value="3000"/>
    3     </dubbo:reference>

    再来看调用链,当执行到这一句。

    1 ChannelHandler.wrapInternal(ChannelHandler handler, URL url)

    会形成一个handler调用链,调用链如下:

    1 MultiMessageHandler
    2 -->handler: HeartbeatHandler
    3    -->handler: AllChannelHandler
    4          -->url: providerUrl
    5          -->executor: FixedExecutor
    6          -->handler: DecodeHandler
    7             -->handler: HeaderExchangeHandler
    8                -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)

    这也是netty接收到请求后的处理链路,注意其中有一个HeartbeatHandler。

    最后,执行new HeaderExchangeServer(Server server),来看源码:

     1 public class HeaderExchangeServer implements ExchangeServer {
     2     /** 心跳定时器 */
     3     private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
     4             new NamedThreadFactory(
     5                     "dubbo-remoting-server-heartbeat",
     6                     true));
     7     /** NettyServer */
     8     private final Server server;
     9     // heartbeat timer
    10     private ScheduledFuture<?> heatbeatTimer;
    11     // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
    12     private int heartbeat;
    13     private int heartbeatTimeout;
    14     private AtomicBoolean closed = new AtomicBoolean(false);
    15 
    16     public HeaderExchangeServer(Server server) {
    17         if (server == null) {
    18             throw new IllegalArgumentException("server == null");
    19         }
    20         this.server = server;
    21         this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    22         this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    23         if (heartbeatTimeout < heartbeat * 2) {
    24             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    25         }
    26         startHeatbeatTimer();
    27     }
    28 
    29     private void startHeatbeatTimer() {
    30         stopHeartbeatTimer();
    31         if (heartbeat > 0) {
    32             heatbeatTimer = scheduled.scheduleWithFixedDelay(
    33                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
    34                         public Collection<Channel> getChannels() {
    35                             return Collections.unmodifiableCollection(
    36                                     HeaderExchangeServer.this.getChannels());
    37                         }
    38                     }, heartbeat, heartbeatTimeout),
    39                     heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    40         }
    41     }
    42 
    43     private void stopHeartbeatTimer() {
    44         try {
    45             ScheduledFuture<?> timer = heatbeatTimer;
    46             if (timer != null && !timer.isCancelled()) {
    47                 timer.cancel(true);
    48             }
    49         } catch (Throwable t) {
    50             logger.warn(t.getMessage(), t);
    51         } finally {
    52             heatbeatTimer = null;
    53         }
    54     }
    55 }

    创建HeaderExchangeServer时,初始化了heartbeat(心跳间隔时间)和heartbeatTimeout(心跳响应超时时间:即如果最终发送的心跳在这个时间内都没有返回,则做出响应的处理)。

    • heartbeat默认是0(从startHeatbeatTimer()方法可以看出只有heartbeat>0的情况下,才会发心跳,这里heartbeat如果从url的parameter map中获取不到,就是0,但是我们在前边看到dubbo会默认设置heartbeat=60s到parameter map中,所以此处的heartbeat=60s);
    • heartbeatTimeout:默认是heartbeat*3。(原因:假设一端发出一次heartbeatRequest,另一端在heartbeat内没有返回任何响应-包括正常请求响应和心跳响应,此时不能认为是连接断了,因为有可能还是网络抖动什么的导致了tcp包的重传超时等)
    • scheduled是一个含有一个线程的定时线程执行器(其中的线程名字为:"dubbo-remoting-server-heartbeat-thread-*")

    之后启动心跳定时任务:

    • 首先如果原来有心跳定时任务,关闭原来的定时任务
    • 之后启动scheduled中的定时线程,从启动该线程开始,每隔heartbeat执行一次HeartBeatTask任务(第一次执行是在启动线程后heartbeat时)

    来看一下HeartBeatTask的源码:

     1 final class HeartBeatTask implements Runnable {
     2     // channel获取器:用于获取所有需要进行心跳检测的channel
     3     private ChannelProvider channelProvider;
     4     private int heartbeat;
     5     private int heartbeatTimeout;
     6 
     7     HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
     8         this.channelProvider = provider;
     9         this.heartbeat = heartbeat;
    10         this.heartbeatTimeout = heartbeatTimeout;
    11     }
    12 
    13     public void run() {
    14         try {
    15             long now = System.currentTimeMillis();
    16             for (Channel channel : channelProvider.getChannels()) {
    17                 if (channel.isClosed()) {
    18                     continue;
    19                 }
    20                 try {
    21                     // 获取最后一次读操作的时间
    22                     Long lastRead = (Long) channel.getAttribute(
    23                             HeaderExchangeHandler.KEY_READ_TIMESTAMP);
    24                     // 获取最后一次写操作的时间
    25                     Long lastWrite = (Long) channel.getAttribute(
    26                             HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
    27                     // 如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求
    28                     if ((lastRead != null && now - lastRead > heartbeat)
    29                             || (lastWrite != null && now - lastWrite > heartbeat)) {
    30                         Request req = new Request();
    31                         req.setVersion("2.0.0");
    32                         req.setTwoWay(true);
    33                         req.setEvent(Request.HEARTBEAT_EVENT);
    34                         channel.send(req);
    35                         if (logger.isDebugEnabled()) {
    36                             logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
    37                                     + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
    38                         }
    39                     }
    40                     //正常消息和心跳在heartbeatTimeout都没接收到
    41                     if (lastRead != null && now - lastRead > heartbeatTimeout) {
    42                         logger.warn("Close channel " + channel
    43                                 + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
    44                         // consumer端进行重连
    45                         if (channel instanceof Client) {
    46                             try {
    47                                 ((Client) channel).reconnect();
    48                             } catch (Exception e) {
    49                                 //do nothing
    50                             }
    51                         } else {// provider端关闭连接
    52                             channel.close();
    53                         }
    54                     }
    55                 } catch (Throwable t) {
    56                     logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
    57                 }
    58             }
    59         } catch (Throwable t) {
    60             logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
    61         }
    62     }
    63 
    64     interface ChannelProvider {
    65         Collection<Channel> getChannels();
    66     }
    67 }            

    HeartBeatTask首先获取所有的channelProvider#getChannels获取所有需要心跳检测的channel,channelProvider实例是HeaderExchangeServer中在启动线程定时执行器的时候创建的内部类。

    1                     new HeartBeatTask.ChannelProvider() {
    2                         public Collection<Channel> getChannels() {
    3                             return Collections.unmodifiableCollection(
    4                                     HeaderExchangeServer.this.getChannels());
    5                         }
    6                     }

    来看一下HeaderExchangeServer.this.getChannels():

     1     public Collection<Channel> getChannels() {
     2         return (Collection) getExchangeChannels();
     3     }
     4 
     5     public Collection<ExchangeChannel> getExchangeChannels() {
     6         Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
     7         Collection<Channel> channels = server.getChannels();
     8         if (channels != null && channels.size() > 0) {
     9             for (Channel channel : channels) {
    10                 exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
    11             }
    12         }
    13         return exchangeChannels;
    14     }

    实际上就是获取NettyServer中的全部channel连接。

    获取到需要心跳检测的channel后,对每一个channel进行如下判断:

    • 如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求
    • 如果正常消息和心跳在heartbeatTimeout都没接收到,consumer端会进行重连,provider端会关闭channel

    这里比较关键的是lastRead和lastWrite的设置。先来看一下获取:

    1 Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
    2 Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);

    说明有地方在设置这两个值到channel中。

    从请求和响应处理来看,无论是请求还是响应都会按照这个顺序处理一遍。

    1 MultiMessageHandler
    2 -->handler: HeartbeatHandler
    3    -->handler: AllChannelHandler
    4          -->url: providerUrl
    5          -->executor: FixedExecutor
    6          -->handler: DecodeHandler
    7             -->handler: HeaderExchangeHandler
    8                -->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)

    其中HeartbeatHandler源码如下:

     1 public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
     2 
     3     private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
     4 
     5     public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
     6 
     7     public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
     8 
     9     public HeartbeatHandler(ChannelHandler handler) {
    10         super(handler);
    11     }
    12 
    13     public void connected(Channel channel) throws RemotingException {
    14         setReadTimestamp(channel);
    15         setWriteTimestamp(channel);
    16         handler.connected(channel);
    17     }
    18 
    19     public void disconnected(Channel channel) throws RemotingException {
    20         clearReadTimestamp(channel);
    21         clearWriteTimestamp(channel);
    22         handler.disconnected(channel);
    23     }
    24 
    25     public void sent(Channel channel, Object message) throws RemotingException {
    26         setWriteTimestamp(channel);
    27         handler.sent(channel, message);
    28     }
    29 
    30     public void received(Channel channel, Object message) throws RemotingException {
    31         setReadTimestamp(channel);
    32         if (isHeartbeatRequest(message)) {
    33             Request req = (Request) message;
    34             if (req.isTwoWay()) {
    35                 Response res = new Response(req.getId(), req.getVersion());
    36                 res.setEvent(Response.HEARTBEAT_EVENT);
    37                 channel.send(res);
    38                 if (logger.isInfoEnabled()) {
    39                     int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
    40                     if (logger.isDebugEnabled()) {
    41                         logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
    42                                 + ", cause: The channel has no data-transmission exceeds a heartbeat period"
    43                                 + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
    44                     }
    45                 }
    46             }
    47             return;
    48         }
    49         if (isHeartbeatResponse(message)) {
    50             if (logger.isDebugEnabled()) {
    51                 logger.debug(
    52                         new StringBuilder(32)
    53                                 .append("Receive heartbeat response in thread ")
    54                                 .append(Thread.currentThread().getName())
    55                                 .toString());
    56             }
    57             return;
    58         }
    59         handler.received(channel, message);
    60     }
    61 
    62     private void setReadTimestamp(Channel channel) {
    63         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    64     }
    65 
    66     private void setWriteTimestamp(Channel channel) {
    67         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    68     }
    69 
    70     private void clearReadTimestamp(Channel channel) {
    71         channel.removeAttribute(KEY_READ_TIMESTAMP);
    72     }
    73 
    74     private void clearWriteTimestamp(Channel channel) {
    75         channel.removeAttribute(KEY_WRITE_TIMESTAMP);
    76     }
    77 
    78     private boolean isHeartbeatRequest(Object message) {
    79         return message instanceof Request && ((Request) message).isHeartbeat();
    80     }
    81 
    82     private boolean isHeartbeatResponse(Object message) {
    83         return message instanceof Response && ((Response) message).isHeartbeat();
    84     }
    85 }
    • 连接完成时:设置lastRead和lastWrite
    • 连接断开时:清空lastRead和lastWrite
    • 发送消息时:设置lastWrite
    • 接收消息时:设置lastRead

    之后交由AllChannelHandler进行处理。之后会一直交由HeaderExchangeHandler进行处理。其对lastRead和lastWrite也做了设置和清理:

     1     public void connected(Channel channel) throws RemotingException {
     2         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
     3         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
     4         ...
     5     }
     6 
     7     public void disconnected(Channel channel) throws RemotingException {
     8         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
     9         channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    10         ...
    11     }
    12 
    13     public void sent(Channel channel, Object message) throws RemotingException {
    14         Throwable exception = null;
    15         try {
    16             channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    17             ...
    18         } catch (Throwable t) {
    19             exception = t;
    20         }
    21     }
    22 
    23     public void received(Channel channel, Object message) throws RemotingException {
    24         channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    25         ...
    26     }
    • 连接完成时:设置lastRead和lastWrite
    • 连接断开时:也设置lastRead和lastWrite(为什么?)
    • 发送消息时:设置lastWrite
    • 接收消息时:设置lastRead

     这里里有个疑问,从handler链来看,无论是请求还是响应都会按照handler链来处理一遍。那么在HeartbeatHandler中已经进行了lastWrite和lastRead的设置,为什么还要在HeaderExchangeHandler中再处理一遍?

    最后,provider端认为连接断了,则会关闭channel。来看一下NettyChannel的close方法:

     1     public void close() {
     2         // 1 将close属性设为true
     3         try {
     4             super.close();
     5         } catch (Exception e) {
     6             logger.warn(e.getMessage(), e);
     7         }
     8         // 2 从全局NettyChannel缓存器中将当前的NettyChannel删掉
     9         try {
    10             removeChannelIfDisconnected(channel);
    11         } catch (Exception e) {
    12             logger.warn(e.getMessage(), e);
    13         }
    14         // 3 清空当前的NettyChannel中的attributes属性
    15         try {
    16             attributes.clear();
    17         } catch (Exception e) {
    18             logger.warn(e.getMessage(), e);
    19         }
    20         // 4 关闭netty的channel,执行netty的channel的优雅关闭
    21         try {
    22             if (logger.isInfoEnabled()) {
    23                 logger.info("Close netty channel " + channel);
    24             }
    25             channel.close();
    26         } catch (Exception e) {
    27             logger.warn(e.getMessage(), e);
    28         }
    29     }

    从上边代码来看,假设consumer端挂了,provider端的心跳检测机制可以进行相关的资源回收,所以provider端的心跳检测机制是有必要的。

    二、consumer端心跳机制

                          //创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接
                          -->getClients(URL url)
                            -->getSharedClient(URL url)
                              -->ExchangeClient exchangeClient = initClient(url)
                                -->Exchangers.connect(url, requestHandler)
                                  -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                    -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                      -->Transporters.connect(URL url, ChannelHandler... handlers)
                                        -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                          -->new NettyClient(url, listener)
                                            -->new MultiMessageHandler(HeartbeatHandler(AllChannelHandler(handler)))
                                            -->getChannelCodec(url)//获取Codec2,这里是DubboCountCodec实例
                                            -->doOpen()//开启netty客户端
                                            -->doConnect()//连接服务端,建立长连接
                                    -->new HeaderExchangeClient(Client client, boolean needHeartbeat)//上述的NettyClient实例,needHeartbeat:true
                                      -->startHeatbeatTimer()//启动心跳计数器

    客户端在initClient(url)中设置了heartbeat参数(默认为60s,用户自己设置的方式见“一”中所讲),如下:

     1     /**
     2      * Create new connection
     3      */
     4     private ExchangeClient initClient(URL url) {
     5         ...
     6         // enable heartbeat by default
     7         url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
     8 
     9         ...
    10 
    11         ExchangeClient client;
    12         try {
    13             // connection should be lazy
    14             if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
    15                 client = new LazyConnectExchangeClient(url, requestHandler);
    16             } else {
    17                 client = Exchangers.connect(url, requestHandler);
    18             }
    19         } catch (RemotingException e) {
    20             throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    21         }
    22         return client;
    23     }

    与provider类似,来看一下最后开启心跳检测的地方。

     1 public class HeaderExchangeClient implements ExchangeClient {
     2     private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
     3     private final Client client;
     4     private final ExchangeChannel channel;
     5     // heartbeat timer
     6     private ScheduledFuture<?> heartbeatTimer;
     7     // heartbeat(ms), default value is 0 , won't execute a heartbeat.
     8     private int heartbeat;
     9     private int heartbeatTimeout;
    10 
    11     public HeaderExchangeClient(Client client, boolean needHeartbeat) {
    12         if (client == null) {
    13             throw new IllegalArgumentException("client == null");
    14         }
    15         this.client = client;
    16         this.channel = new HeaderExchangeChannel(client);
    17         String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
    18         this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
    19         this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    20         if (heartbeatTimeout < heartbeat * 2) {
    21             throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    22         }
    23         if (needHeartbeat) {
    24             startHeatbeatTimer();
    25         }
    26     }
    27 
    28     private void startHeatbeatTimer() {
    29         stopHeartbeatTimer();
    30         if (heartbeat > 0) {
    31             heartbeatTimer = scheduled.scheduleWithFixedDelay(
    32                     new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
    33                         public Collection<Channel> getChannels() {
    34                             return Collections.<Channel>singletonList(HeaderExchangeClient.this);
    35                         }
    36                     }, heartbeat, heartbeatTimeout),
    37                     heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    38         }
    39     }
    40 
    41     private void stopHeartbeatTimer() {
    42         if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {
    43             try {
    44                 heartbeatTimer.cancel(true);
    45                 scheduled.purge();
    46             } catch (Throwable e) {
    47                 if (logger.isWarnEnabled()) {
    48                     logger.warn(e.getMessage(), e);
    49                 }
    50             }
    51         }
    52         heartbeatTimer = null;
    53     }
    54 }

    主要看一下startHeartbeatTimer()方法,与provider相同,只是provider是获取NettyServer的所有的NettyChannel,而consumer只是获取当前的对象。

    consumer的handler处理链与provider完全相同。

    最后来看一下consumer的重连机制:AbstractClient#reconnect

     1     public void reconnect() throws RemotingException {
     2         disconnect();
     3         connect();
     4     }
     5 
     6     public void disconnect() {
     7         connectLock.lock();
     8         try {
     9             destroyConnectStatusCheckCommand();
    10             try {
    11                 Channel channel = getChannel();
    12                 if (channel != null) {
    13                     channel.close();
    14                 }
    15             } catch (Throwable e) {
    16                 logger.warn(e.getMessage(), e);
    17             }
    18             try {
    19                 doDisConnect();
    20             } catch (Throwable e) {
    21                 logger.warn(e.getMessage(), e);
    22             }
    23         } finally {
    24             connectLock.unlock();
    25         }
    26     }
    27 
    28     protected void connect() throws RemotingException {
    29         connectLock.lock();
    30         try {
    31             if (isConnected()) {
    32                 return;
    33             }
    34             initConnectStatusCheckCommand();
    35             doConnect();
    36             if (!isConnected()) {
    37                 throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
    38                         + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
    39                         + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
    40             } else {
    41                 if (logger.isInfoEnabled()) {
    42                     logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
    43                             + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
    44                             + ", channel is " + this.getChannel());
    45                 }
    46             }
    47             reconnect_count.set(0);
    48             reconnect_error_log_flag.set(false);
    49         } catch (RemotingException e) {
    50             throw e;
    51         } catch (Throwable e) {
    52             throw new RemotingException(this, "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
    53                     + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
    54                     + ", cause: " + e.getMessage(), e);
    55         } finally {
    56             connectLock.unlock();
    57         }
    58     }

    代码比较简单,先断连,再连接。

    对于心跳机制,netty本身提供了空闲检测:IdleStateHandler。也可以直接基于此实现心跳机制。

  • 相关阅读:
    kubernetes HA 脚本
    架构之数据库分表分库
    架构之高可用性(HA)集群(Keepalived)
    架构之Nginx(负载均衡/反向代理)
    架构之消息队列
    oracle数据迁移
    Oracle中关于清除数据和释放表空间
    疑难杂症:org.hibernate.MappingException: Unknown entity,annotation配置Entity类报错
    j疑难杂症:java.lang.VerifyError: class org.hibernate.type.WrappedMaterializedBlobType overrides final method getReturnedClass.()Ljava/lang/Class;
    疑难杂症:java.lang.AbstractMethodError: org.apache.xerces.dom.DocumentImpl.setXmlVersion(Ljava/lang/String;)V
  • 原文地址:https://www.cnblogs.com/java-zhao/p/8539046.html
Copyright © 2011-2022 走看看