zoukankan      html  css  js  c++  java
  • consumer的DubboResponseTimeoutScanTimer线程

    考虑这样一种情况,由于网络延时,consumer先抛出超时异常,一段时间后又收到了已经超时的响应,dubbo是怎么处理的?

    拆分为3步看:

    1. consumer的DubboResponseTimeoutScanTimer进行扫描

    DubboResponseTimeoutScanTimer负责扫描响应,如果发现超时,自行构造一个超时响应,并处理。

    Future,Request,Response共用同一个id

    //DefaultFuture内部类
    private static class RemotingInvocationTimeoutScan implements Runnable {
    
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // consumer创建一个超时响应
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
                            // handle response.
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }
    
    //DefaultFuture类
    public static void received(Channel channel, Response response) {
        try {
            //首先删除future对象
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                //唤醒等待的线程(也许有,也许没有)
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

    2. consumer因为超时抛异常

    //DefaultFuture
    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                   // 被DubboResponseTimeoutScanTimer线程唤醒,但是有个超时的响应,所以isDone返回true      
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        // isDone返回true,进入returnFromResponse
        return returnFromResponse();
    }
    
    private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            return res.getResult();
        }
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            //此处抛异常
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }
    

    3. 迟到的请求到来时

    //DefaultFuture
    public static void received(Channel channel, Response response) {
        try {
            //DubboResponseTimeoutScanTimer已经删除了迟到的请求
            //所以走else分支
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }

    静态分析代码时,以为先是步骤2,然后是步骤1,但实际调试时,结果是步骤1,步骤2.

  • 相关阅读:
    SpringMVC扩展
    反射机制
    python day9
    python day8
    python day7
    python day6
    python day4
    python day3
    python day2
    python day1
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8242088.html
Copyright © 2011-2022 走看看