考虑这样一种情况,由于网络延时,consumer先抛出超时异常,一段时间后又收到了已经超时的响应,dubbo是怎么处理的?
拆分为3步看:
1. consumer的DubboResponseTimeoutScanTimer进行扫描
DubboResponseTimeoutScanTimer负责扫描响应,如果发现超时,自行构造一个超时响应,并处理。
Future,Request,Response共用同一个id
//DefaultFuture内部类 private static class RemotingInvocationTimeoutScan implements Runnable { public void run() { while (true) { try { for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // consumer创建一个超时响应 // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } } //DefaultFuture类 public static void received(Channel channel, Response response) { try { //首先删除future对象 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { //唤醒等待的线程(也许有,也许没有) done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
2. consumer因为超时抛异常
//DefaultFuture public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone()) { // 被DubboResponseTimeoutScanTimer线程唤醒,但是有个超时的响应,所以isDone返回true done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (! isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // isDone返回true,进入returnFromResponse return returnFromResponse(); } private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { //此处抛异常 throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); }
3. 迟到的请求到来时
//DefaultFuture public static void received(Channel channel, Response response) { try { //DubboResponseTimeoutScanTimer已经删除了迟到的请求 //所以走else分支 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }
静态分析代码时,以为先是步骤2,然后是步骤1,但实际调试时,结果是步骤1,步骤2.