zoukankan      html  css  js  c++  java
  • Okhttp源码解析(二)——任务调度

    Okhttp源码版本:3.4.2

    参考:https://www.jianshu.com/p/074dff0f4ecb 

    一.来源

    在对Okhttp的使用中

    执行的操作就是在RealCall类进行

    同步执行:

      

    @Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        try {
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain(false);
          if (result == null) throw new IOException("Canceled");
          return result;
        } finally {
          client.dispatcher().finished(this);
        }
      }
    

     

    异步执行:

       

    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

      

    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        private AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl().toString());
          this.responseCallback = responseCallback;
        }
    
        String host() {
          return originalRequest.url().host();
        }
    
        Request request() {
          return originalRequest;
        }
    
        RealCall get() {
          return RealCall.this;
        }
    
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
          //注意这里的回调执行在线程池中,而不是在主线程 responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }

      具体的执行都是在RealCall中进行,同步的在RealCall的execute()执行,异步的在RealCall.AsyncCall.execute()执行。两者都是通过getResponseWithInterceptorChain()责任连模式调用。

    二.Dispatcher类

    1. 线程池

        内部维护着一个线程池,线程池相关可以了解下另外一片文章

        

    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

        该线程维护的阻塞队列为同步队列SychronousQueue,该队列的特点就是生产者往队列放入数据的前提要有消费者准备消费,同样消费者要从队列中获取数据前提是要有生产者准备往数据放入数据。这在多任务处理的队列中属于最快的任务处理方式,网络请求属于高频情况,最佳。

      2.请求

        异步:

    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

      

        内部维护者一个待运行异步任务的集合Deque<AsyncCall> readyAsyncCalls,和一个正在运行异步任务的集合Deque<AsyncCall> runningAsyncCalls,(Deque就是一个继承Queue的接口,双端操作的队列,而AsyncCall就是一个经过封装的Runnable),每次执行异步任务就是:1.判断runningAsyncCalls长度是否到达了最大数64,2.判断runningAsyncCall中访问同一个服务器不同端口的数量是否达到5,如果都没有则往runningAsyncCall增加该任务,并且让线程池执行该任务。如果1或者2满足了就只往readAsyncCalls中添加该任务。

        同步:

    synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    

      

        内部还维护着一个待运行同步任务的集合Deque<RealCall> readySyncCalls。执行同步任务就是只往同步队列中添加该realCall

      3.完成

        

    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    
    private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

      

     

    无论同步的任务还是异步的任务,完成时都执行finished方法,calls就可放入同步的running队列或者异步的running队列,总的来说就是往队列中remove掉执行玩的call,同时由于Dispacher类有一个idleCallback,判断当前running的同步队列+running的异步队列,如果任务数==0,就执行idleCallback。

    promoteCalls方法就是当一个call结束时,是否执行异步任务队列中下一个任务

      

  • 相关阅读:
    现在分词和过去分词
    VMware Workstation Ubuntu 20.04 LTS无法连接网络问题
    Java中定时器Timer致命缺点(附学习方法)
    2020 年度编程语言排行榜出炉!C 语言称霸,Java 遭遇滑铁卢…….
    人工智能必备数学基础:线性代数基础(1)
    初学VBA
    何同学新视频火了!找到减少沉迷手机的最佳方法:附免费APP
    支付宝蚂蚁森林下线能量提醒功能 产品经理:被骂了、我改
    可抵御所有已知黑客攻击 中国组建天地一体化量子通信网络
    MYSQL数据库 增删改查基础语句
  • 原文地址:https://www.cnblogs.com/could-deng/p/8372622.html
Copyright © 2011-2022 走看看