zoukankan      html  css  js  c++  java
  • Retrofit2源码分析

    例子

    从简单的例子开始分析Retrofit2是怎么和其他的库一起合作的,

    下边是一个很简单的例子,是rxjava2 + retrofit2 + okhttp3 + gson混合使用,是访问淘宝的ip地址查询服务,返回信息输出到EditText里。

    public static Retrofit getRetrofit() {
        if (retrofit == null) {
            synchronized (Retrofit.class) {
                if (retrofit == null) {
                    retrofit = new Retrofit.Builder()
                            .baseUrl(BASE_URL)
                            .addConverterFactory(ScalarsConverterFactory.create())
                            .addConverterFactory(GsonConverterFactory.create())
                            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                            .client(getOkHttpClient())
                            .build();
                }
            }
        }
        return retrofit;
    }
    public interface IpServiceRx {
        @Headers({
                "Accept-Encoding: application/json",
                "User-Agent: wz"
        })
        @GET("getIpInfo.php")
        Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
    }
    /**
     * rxjava2 + retrofit2 + okhttp3
     */
    private void requestData3() {
        Retrofit retrofit = NetworkUtils.getRetrofit();
    
        IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
        String ip = "117.100.130.5";
        Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
        ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Response<IpModel>>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    }
    
                    @Override
                    public void onNext(@NonNull Response<IpModel> ipModelResponse) {
                        IpModel ipModel = ipModelResponse.body();
                        if (ipModel == null) {
                            return;
                        }
                        IpData data = ipModel.getData();
                        if (data == null) {
                            return;
                        }
                        mEt.setText(getCSData(data));
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
                        mEt.setText(e.toString());
                        e.printStackTrace();
                    }
    
                    @Override
                    public void onComplete() {
                    }
                });
    }

    先从创建Retrofit时传递的几个factory看起

    ConverterFactory

    .addConverterFactory(GsonConverterFactory.create())

    public Builder addConverterFactory(Converter.Factory factory) {
      converterFactories.add(checkNotNull(factory, "factory == null"));
      return this;
    }

    把转换器加入到了一个list中

    public final class GsonConverterFactory extends Converter.Factory {
      /**
       * Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
       * decoding from JSON (when no charset is specified by a header) will use UTF-8.
       */
      public static GsonConverterFactory create() {
        return create(new Gson());
      }
    
      /**
       * Create an instance using {@code gson} for conversion. Encoding to JSON and
       * decoding from JSON (when no charset is specified by a header) will use UTF-8.
       */
      @SuppressWarnings("ConstantConditions") // Guarding public API nullability.
      public static GsonConverterFactory create(Gson gson) {
        if (gson == null) throw new NullPointerException("gson == null");
        return new GsonConverterFactory(gson);
      }
    
      private final Gson gson;
    
      private GsonConverterFactory(Gson gson) {
        this.gson = gson;
      }
    
    //返回解析okhttp3.ResponseBody的Converter实例
      @Override
      public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
          Retrofit retrofit) {
        TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
        return new GsonResponseBodyConverter<>(gson, adapter);
      }
    
    //返回解析okhttp3.RequsetBody的Converter实例
      @Override
      public Converter<?, RequestBody> requestBodyConverter(Type type,
          Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
        TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
        return new GsonRequestBodyConverter<>(gson, adapter);
      }
    }
    public interface Converter<F, T> {
      @Nullable T convert(F value) throws IOException;
    
      /** Creates {@link Converter} instances based on a type and target usage. */
      abstract class Factory {
        /**
         * Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if
         * {@code type} cannot be handled by this factory. This is used to create converters for
         * response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>}
         * declaration.
         */
        public @Nullable Converter<ResponseBody, ?> responseBodyConverter(Type type,
            Annotation[] annotations, Retrofit retrofit) {
          return null;
        }
    
        /**
         * Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if
         * {@code type} cannot be handled by this factory. This is used to create converters for types
         * specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap}
         * values.
         */
        public @Nullable Converter<?, RequestBody> requestBodyConverter(Type type,
            Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
          return null;
        }
    
        /**
         * Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if
         * {@code type} cannot be handled by this factory. This is used to create converters for types
         * specified by {@link Field @Field}, {@link FieldMap @FieldMap} values,
         * {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path},
         * {@link Query @Query}, and {@link QueryMap @QueryMap} values.
         */
        public @Nullable Converter<?, String> stringConverter(Type type, Annotation[] annotations,
            Retrofit retrofit) {
          return null;
        }
    
        /**
         * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
         * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
         */
        protected static Type getParameterUpperBound(int index, ParameterizedType type) {
          return Utils.getParameterUpperBound(index, type);
        }
    
        /**
         * Extract the raw class type from {@code type}. For example, the type representing
         * {@code List<? extends Runnable>} returns {@code List.class}.
         */
        protected static Class<?> getRawType(Type type) {
          return Utils.getRawType(type);
        }
      }
    }

    CallAdapterFactory

    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())

    public Builder addCallAdapterFactory(CallAdapter.Factory factory) {
      callAdapterFactories.add(checkNotNull(factory, "factory == null"));
      return this;
    }
    public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
      /**
       * Returns an instance which creates synchronous observables that do not operate on any scheduler
       * by default.
       */
      public static RxJava2CallAdapterFactory create() {
        return new RxJava2CallAdapterFactory(null, false);
      }
    
      private final @Nullable Scheduler scheduler;
      private final boolean isAsync;
    
      private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
        this.scheduler = scheduler;
        this.isAsync = isAsync;
      }
      ...
    }
    public interface CallAdapter<R, T> {
      Type responseType();
    
      //注意这里的Call其实是Retrofit自己写的Call,并不是okhttp里的。
      T adapt(Call<R> call);
    
      /**
       * Creates {@link CallAdapter} instances based on the return type of {@linkplain
       * Retrofit#create(Class) the service interface} methods.
       */
      abstract class Factory {
        /**
         * Returns a call adapter for interface methods that return {@code returnType}, or null if it
         * cannot be handled by this factory.
         */
        public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
            Retrofit retrofit);
    
        /**
         * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
         * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
         */
        protected static Type getParameterUpperBound(int index, ParameterizedType type) {
          return Utils.getParameterUpperBound(index, type);
        }
    
        /**
         * Extract the raw class type from {@code type}. For example, the type representing
         * {@code List<? extends Runnable>} returns {@code List.class}.
         */
        protected static Class<?> getRawType(Type type) {
          return Utils.getRawType(type);
        }
      }
    }

    上边只是暂时列出来,后边会慢慢分析。

    然后看下build()

    public Retrofit build() {
    // 没有设置时会自动创建一个OkHttpClient
      okhttp3.Call.Factory callFactory = this.callFactory;
      if (callFactory == null) {
        callFactory = new OkHttpClient();
      }
    
    // platform是Android,defaultCallbackExecutor是主线程handler。
      Executor callbackExecutor = this.callbackExecutor;
      if (callbackExecutor == null) {
        callbackExecutor = platform.defaultCallbackExecutor();
      }
    
    // 可以看到callAdapterFactories包含了我们设置的,还有platform自带的
      // Make a defensive copy of the adapters and add the default Call adapter.
      List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
      callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
    
    // 而converterFactories也是类似,包含了我们设置的,还有自带的几个。
      // Make a defensive copy of the converters.
      List<Converter.Factory> converterFactories = new ArrayList<>(
          1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());
    
      // Add the built-in converter factory first. This prevents overriding its behavior but also
      // ensures correct behavior when using converters that consume all types.
      converterFactories.add(new BuiltInConverters());
      converterFactories.addAll(this.converterFactories);
      converterFactories.addAll(platform.defaultConverterFactories());
    
    
      return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
          unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
    }

    接着看retrofit.create

    IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
    public <T> T create(final Class<T> service) {
      ...
      return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
          new InvocationHandler() {
    // 此处platform是Android,抽象类Platform有两个继承类,一个叫Android,还有一个Java8。
            private final Platform platform = Platform.get();
            private final Object[] emptyArgs = new Object[0];
    
            @Override public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
              // 如果是object的方法则直接执行
              if (method.getDeclaringClass() == Object.class) {
                return method.invoke(this, args);
              }
    
    // jdk8引入的接口默认方法,不过由于Java8这个类实现了invokeDefaultMethod,而Android这个类没有实现此方法所以跳
              if (platform.isDefaultMethod(method)) {
                return platform.invokeDefaultMethod(method, service, proxy, args);
              }
    
              return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
            }
          });
    }

    可以看到其实是使用了动态代理的方法,来把原类型创建出一个代理对象,

    接着我们通过这个代理对象调用方法,

    Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
    就会执行InvocationHandler.invoke方法,

    invoke方法里,如果是object的方法则直接执行并返回,接着默认方法也跳过,

    直接看loadServiceMethod

    ServiceMethod<?> loadServiceMethod(Method method) {
      ServiceMethod<?> result = serviceMethodCache.get(method);
      if (result != null) return result;
    
      synchronized (serviceMethodCache) {
        result = serviceMethodCache.get(method);
        if (result == null) {
          result = ServiceMethod.parseAnnotations(this, method);
          serviceMethodCache.put(method, result);
        }
      }
      return result;
    }

    ServiceMethod

    static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
    // 这个类是用来把我们在方法上的注解和之后传递的参数生成一个okhttp的request,下边会用到。
      RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
    
      Type returnType = method.getGenericReturnType();
      if (Utils.hasUnresolvableType(returnType)) {
        throw methodError(method,
            "Method return type must not include a type variable or wildcard: %s", returnType);
      }
    
    // 返回类型不能时void
      if (returnType == void.class) {
        throw methodError(method, "Service methods cannot return void.");
      }
    
      return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
    }

    HttpServiceMethod

    static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
        Retrofit retrofit, Method method, RequestFactory requestFactory) {
      boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
      boolean continuationWantsResponse = false;
      boolean continuationBodyNullable = false;
    
    // 获取方法上的注解
      Annotation[] annotations = method.getAnnotations();
      Type adapterType;
      if (isKotlinSuspendFunction) {
        ...
      } else {
    // 方法的返回Type类型
        adapterType = method.getGenericReturnType();
      }
    
    // 在下边进行分析
      CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations);
    
    
    
    // 校验返回类型是否正确,即Response<IpModel>
      Type responseType = callAdapter.responseType();
    // 就是说返回类型不能时okhttp3.Response
      if (responseType == okhttp3.Response.class) {
        throw methodError(method, "'"
            + getRawType(responseType).getName()
            + "' is not a valid response body type. Did you mean ResponseBody?");
      }
    // 返回类型不能是Response,必须要包含泛型才行Response<String>,这个Response是retrofit2里定义的,不是okhttp3.Response
      if (responseType == Response.class) {
        throw methodError(method, "Response must include generic type (e.g., Response<String>)");
      }
      // TODO support Unit for Kotlin?
      if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) {
        throw methodError(method, "HEAD method must use Void as response type.");
      }
    
    
      // 在下边进行分析
      Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType);
      
    // callFactory 其实就是OkHttpClient
      okhttp3.Call.Factory callFactory = retrofit.callFactory;
      if (!isKotlinSuspendFunction) {
        return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
      } else 
    ...
      }
    }

    最后创建了一个CallAdapted对象返回,

    CallAdapted继承关系:

    CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>

    HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>

    createCallAdapter

    HttpServiceMethod.createCallAdapter

    private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
        Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
      try {
        //noinspection unchecked
        return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
      } catch (RuntimeException e) { // Wide exception range because factories are user code.
        throw methodError(method, e, "Unable to create call adapter for %s", returnType);
      }

    retrofit.callAdapter

    public CallAdapter<?, ?> callAdapter(Type returnType, Annotation[] annotations) {
      return nextCallAdapter(null, returnType, annotations);
    }
    
    public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
    
      int start = callAdapterFactories.indexOf(skipPast) + 1;
      for (int i = start, count = callAdapterFactories.size(); i < count; i++) {
        CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
        if (adapter != null) {
          return adapter;
        }
      }
    
      ...
      throw new IllegalArgumentException(builder.toString());
    }

    总的来说就是从我们之前设置的和自带的calladapterFactory中找到一个,调用get获取一个CallAdapter的就直接返回。

    就用RxJava2CallAdapterFactory.get来说明:

    @Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    // 我们的returnType是Observable<Response<IpModel>>的Type。
    // 此方法返回Observable,具体看下边getRawType源码
      Class<?> rawType = getRawType(returnType);
    
    // 显然下边都为false
      boolean isFlowable = rawType == Flowable.class;
      boolean isSingle = rawType == Single.class;
      boolean isMaybe = rawType == Maybe.class;
      if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
        return null;
      }
    
      boolean isResult = false;
      boolean isBody = false;
      Type responseType;
      
    // 返回泛型参数,即Response<IpModel>
      Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
    
    // 再次返回Response<IpModel>的RawType,即retrofit的Response
      Class<?> rawObservableType = getRawType(observableType);
      if (rawObservableType == Response.class) {
        // 再次返回Response<IpModel>的UpperBound,即IpModel 
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      } else if (rawObservableType == Result.class) {
        if (!(observableType instanceof ParameterizedType)) {
          throw new IllegalStateException("Result must be parameterized"
              + " as Result<Foo> or Result<? extends Foo>");
        }
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
        isResult = true;
      } else {
        responseType = observableType;
        isBody = true;
      }
    
    // 由上边可知,传递进构造函数的Boolean都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
    // responseType为IpModel 
      return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
    }

    Utils.getRawType

    static Class<?> getRawType(Type type) {
    // 是具体类型
      if (type instanceof Class<?>) {
        // Type is a normal class.
        return (Class<?>) type;
      }
    
    // 是带泛型的类型
      if (type instanceof ParameterizedType) {
        ParameterizedType parameterizedType = (ParameterizedType) type;
    
        // 返回Observable
        Type rawType = parameterizedType.getRawType();
        if (!(rawType instanceof Class)) throw new IllegalArgumentException();
        return (Class<?>) rawType;
      }
    
    // 其他类型
      ...

    Utils.getParameterUpperBound

    static Type getParameterUpperBound(int index, ParameterizedType type) {
      Type[] types = type.getActualTypeArguments();
      Type paramType = types[index];
      return paramType;
    }

    createResponseConverter

    HttpServiceMethod.createResponseConverter

    private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(Retrofit retrofit, Method method, Type responseType) {
      Annotation[] annotations = method.getAnnotations();
      try {
        return retrofit.responseBodyConverter(responseType, annotations);
      } catch (RuntimeException e) { // Wide exception range because factories are user code.
        throw methodError(method, e, "Unable to create converter for %s", responseType);
      }
    }

    retrofit.responseBodyConverter

    public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) {
      return nextResponseBodyConverter(null, type, annotations);
    }
    
    public <T> Converter<ResponseBody, T> nextResponseBodyConverter(@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) {
      int start = converterFactories.indexOf(skipPast) + 1;
      for (int i = start, count = converterFactories.size(); i < count; i++) {
        Converter<ResponseBody, ?> converter = converterFactories.get(i).responseBodyConverter(type, annotations, this);
        if (converter != null) {
          //noinspection unchecked
          return (Converter<ResponseBody, T>) converter;
        }
      }
    
      ...
      throw new IllegalArgumentException(builder.toString());
    }

    总的来说就是从我们之前设置的和自带的converterFactory中找到一个,然后获取具体的responseBodyConverter。

    就用GsonConverterFactory.responseBodyConverter来说明:

    @Override
    public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
      TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
      return new GsonResponseBodyConverter<>(gson, adapter);
    }

    GsonResponseBodyConverter

    GsonRequestBodyConverter(Gson gson, TypeAdapter<T> adapter) {
      this.gson = gson;
      this.adapter = adapter;
    }
    
    @Override 
    public RequestBody convert(T value) throws IOException {
      Buffer buffer = new Buffer();
      Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8);
      JsonWriter jsonWriter = gson.newJsonWriter(writer);
      adapter.write(jsonWriter, value);
      jsonWriter.close();
      return RequestBody.create(MEDIA_TYPE, buffer.readByteString());
    }

    loadServiceMethod(method).invoke

    一圈分析后在返回上边的retrofit.create内部分invoke的最后

    loadServiceMethod(method).invoke(args != null ? args : emptyArgs);

    由上边可知loadServiceMethod方法返回的是CallAdapted,

    而CallAdapted继承关系:

    CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>

    HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>

    调用invoke是调用到的HttpServiceMethod.invoke

    @Override final @Nullable ReturnT invoke(Object[] args) {
      Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
      return adapt(call, args);
    }

    注意此处的call都是retrofit的,不是okhttp的。

    在其中创建了个OkHttpCall对象,顾名思义,里边肯定就是通过okhttp的call进行网络请求的,绕了一大圈终于找到实际请求的地方了。

    接着看adapt

    adapt实际调用的是CallAdapted.adapt

    @Override 
    protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
      return callAdapter.adapt(call);
    }

    此处的callAdapter其实就是上边的RxJava2CallAdapter,

    所以就去RxJava2CallAdapter中看看

    @Override 
    public Object adapt(Call<R> call) {
      Observable<Response<R>> responseObservable = isAsync
          ? new CallEnqueueObservable<>(call)
          : new CallExecuteObservable<>(call);
    
      Observable<?> observable;
      if (isResult) {
        observable = new ResultObservable<>(responseObservable);
      } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
      } else {
        observable = responseObservable;
      }
    
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
    
      if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
      }
      if (isSingle) {
        return observable.singleOrError();
      }
      if (isMaybe) {
        return observable.singleElement();
      }
      if (isCompletable) {
        return observable.ignoreElements();
      }
      return RxJavaPlugins.onAssembly(observable);
    }

    由上可知

    • isAsync,isResult,isBody为false,
    • scheduler = null
    • isFlowable,isSingle,isMaybe,isCompletable都为false

    所以说最终返回就是new CallExecuteObservable<>(call);

    而RxJavaPlugins.onAssembly(observable);中

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

    我们并没有对rxjava设置hook,所以返回的还是CallExecuteObservable,

    CallExecuteObservable创建时传递的call就是OkHttpCall。

    接着就是rxjava操作了

    这里顺带把rxjava的一些源码也简单分析了。

        Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
        ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Response<IpModel>>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                    }
    
                    @Override
                    public void onNext(@NonNull Response<IpModel> ipModelResponse) {
                        IpModel ipModel = ipModelResponse.body();
                        if (ipModel == null) {
                            return;
                        }
                        IpData data = ipModel.getData();
                        if (data == null) {
                            return;
                        }
                        mEt.setText(getCSData(data));
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
                        mEt.setText(e.toString());
                        e.printStackTrace();
                    }
    
                    @Override
                    public void onComplete() {
                    }
                });

    rxjava每次调用一个转换操作,都会返回一个不同的observable,这个observable会记录上层的observable,从而形成一个从上到下的链,所以也叫链式操作。

    直到最后调用subscribe,此时会触发向上订阅,即下层都会调用上层的subscribe,当然每层observable都有不同的subscribeActual实现,所以每层其实是上层的observer,同时又是下层的observable。

    直到调用到顶层层的subscribeActual,即本例中的CallExecuteObservable的subscribeActual:

    @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
      // Since Call is a one-shot type, clone it for each new observer.
    // 就是OkHttpCall
      Call<T> call = originalCall.clone();
      CallDisposable disposable = new CallDisposable(call);
      observer.onSubscribe(disposable);
      if (disposable.isDisposed()) {
        return;
      }
    
      boolean terminated = false;
      try {
    // 此处会去调用OkHttpCall的execute,里边肯定就是okhttp的call.execute
        Response<T> response = call.execute();
        if (!disposable.isDisposed()) {
    // 开始往下层传递消息
          observer.onNext(response);
        }
        if (!disposable.isDisposed()) {
          terminated = true;
          observer.onComplete();
        }
      } catch (Throwable t) {
        ...
      }
    }

    OkHttpCall.execute

    @Override 
    public Response<T> execute() throws IOException {
      okhttp3.Call call;
    
      synchronized (this) {
    //正确性检查
        ...
    
        call = rawCall;
        if (call == null) {
          try {
    // 创建一个新的网络请求,看下边代码
            call = rawCall = createRawCall();
          } catch (IOException | RuntimeException | Error e) {
            throwIfFatal(e); //  Do not assign a fatal error to creationFailure.
            creationFailure = e;
            throw e;
          }
        }
      }
    
      if (canceled) {
        call.cancel();
      }
      
    // 解析 阻塞式call.execute() 返回的okhttp3.Response,看下边代码
      return parseResponse(call.execute());
    }
    private okhttp3.Call createRawCall() throws IOException {
      okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
      return call;
    }

    此处的requestFactory就是上边ServiceMethod.parseAnnotations中创建的RequestFactory,通过RequestFactory构建出来一个okhttp的request对象,

    最后生成一个okhttp3.Call返回。

    Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
      ResponseBody rawBody = rawResponse.body();
    
      // Remove the body's source (the only stateful object) so we can pass the response along.
      rawResponse = rawResponse.newBuilder()
          .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
          .build();
    
      int code = rawResponse.code();
      if (code < 200 || code >= 300) {
        try {
          // Buffer the entire body to avoid future I/O.
          ResponseBody bufferedBody = Utils.buffer(rawBody);
          return Response.error(bufferedBody, rawResponse);
        } finally {
          rawBody.close();
        }
      }
    
      if (code == 204 || code == 205) {
        rawBody.close();
        return Response.success(null, rawResponse);
      }
    
      ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
      try {
    // 此处会用我们之前设置的Converter(即GsonResponseBodyConverter)来解析出具体的bean对象,
        T body = responseConverter.convert(catchingBody);
        return Response.success(body, rawResponse);
      } catch (RuntimeException e) {
        // If the underlying source threw an exception, propagate that rather than indicating it was
        // a runtime exception.
        catchingBody.throwIfCaught();
        throw e;
      }
    }

    GsonResponseBodyConverter.convert

    @Override public T convert(ResponseBody value) throws IOException {
      JsonReader jsonReader = gson.newJsonReader(value.charStream());
      try {
        T result = adapter.read(jsonReader);
        if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
          throw new JsonIOException("JSON document was not fully consumed.");
        }
        return result;
      } finally {
        value.close();
      }

    observer.onNext(response);

    向下传递,此时还是subscribeOn(Schedulers.io())指定的线程上操作的,

    当传递到observeOn(AndroidSchedulers.mainThread())时,此observable会把线程转换成mainThread,

    最后传递到subscribe传递的observer的onNext中

     

    其他

    返回值中带不带Response逻辑有什么区别

    Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);

    上边的分析都是基于带Response的,

    那如果定义接口时不带呢,即

    Observable<IpModel> getIpMsg(@Query("ip") String ip);

    那么接着上边的createCallAdapter分析里的RxJava2CallAdapterFactory.get来说明:

    @Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
    // 我们的returnType是Observable<IpModel>的Type。
    // 此方法返回Observable,具体看下边getRawType源码
      Class<?> rawType = getRawType(returnType);
    
    // 显然下边都为false
      boolean isFlowable = rawType == Flowable.class;
      boolean isSingle = rawType == Single.class;
      boolean isMaybe = rawType == Maybe.class;
      if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
        return null;
      }
    
      boolean isResult = false;
      boolean isBody = false;
      Type responseType;
      
    // 返回泛型参数,即IpModel
      Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
    
    // 还是IpModel
      Class<?> rawObservableType = getRawType(observableType);
      if (rawObservableType == Response.class) {
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
      } else if (rawObservableType == Result.class) {
        if (!(observableType instanceof ParameterizedType)) {
          throw new IllegalStateException("Result must be parameterized"
              + " as Result<Foo> or Result<? extends Foo>");
        }
        responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
        isResult = true;
      } else {
    // 此时会进入此逻辑,isBody为true了
        responseType = observableType;
        isBody = true;
      }
    
    // 由上边可知,传递进构造函数的Boolean除了isBody为true,其他都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false,
    // responseType为IpModel 
      return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
    }

    然后接着loadServiceMethod(method).invoke里

    RxJava2CallAdapter.adapt

    @Override 
    public Object adapt(Call<R> call) {
      Observable<Response<R>> responseObservable = isAsync
          ? new CallEnqueueObservable<>(call)
          : new CallExecuteObservable<>(call);
    
      Observable<?> observable;
      if (isResult) {
        observable = new ResultObservable<>(responseObservable);
      } else if (isBody) {
        observable = new BodyObservable<>(responseObservable);
      } else {
        observable = responseObservable;
      }
    
      if (scheduler != null) {
        observable = observable.subscribeOn(scheduler);
      }
    
      if (isFlowable) {
        return observable.toFlowable(BackpressureStrategy.LATEST);
      }
      if (isSingle) {
        return observable.singleOrError();
      }
      if (isMaybe) {
        return observable.singleElement();
      }
      if (isCompletable) {
        return observable.ignoreElements();
      }
      return RxJavaPlugins.onAssembly(observable);
    }

    由上可知

    • isAsync,isResult,
    • isBody为true,
    • scheduler = null,
    • isFlowable,isSingle,isMaybe,isCompletable都为false

    所以说最终返回就是new BodyObservable<>(responseObservable);

    BodyObservable(Observable<Response<T>> upstream) {
      this.upstream = upstream;
    }
    
    @Override protected void subscribeActual(Observer<? super T> observer) {
      upstream.subscribe(new BodyObserver<T>(observer));
    }

    就是说最上层是responseObservable,

    那么当responseObservable开始下传数据时,会调用BodyObserver的onNext:

    @Override 
    public void onNext(Response<R> response) {
      if (response.isSuccessful()) {
    // 会把body直接传递到下层,即IpModal
        observer.onNext(response.body());
      } else {
        terminated = true;
        Throwable t = new HttpException(response);
        try {
          observer.onError(t);
        } catch (Throwable inner) {
          Exceptions.throwIfFatal(inner);
          RxJavaPlugins.onError(new CompositeException(t, inner));
        }
      }
    }

    此处的response是retrofit的,

    response会携带更多的此次网络请求的信息,如果只返回实际的bean/modal对象,那么就不能够有更多控制。

  • 相关阅读:
    10个最常见的JS错误
    有哪些新手程序员不知道的小技巧?
    有哪些新手程序员不知道的小技巧?
    有哪些新手程序员不知道的小技巧?
    有哪些新手程序员不知道的小技巧?
    有哪些新手程序员不知道的小技巧?
    有哪些新手程序员不知道的小技巧?
    EF 传递的主键值的数量必须与实体上定义的主键值的数量匹配 原因
    ACM2054_A=B
    五一游记
  • 原文地址:https://www.cnblogs.com/muouren/p/11768914.html
Copyright © 2011-2022 走看看