简介:大三学生党一枚!主攻Android开发,对于Web和后端均有了解。
语录:取乎其上,得乎其中,取乎其中,得乎其下,以顶级态度写好一篇的博客。
上一篇博客我们介绍了前三个拦截器,其中比较有难度的就是CacheInterceptor
拦截器,它的底层是基于DiskLruCache
的,面试也有可能会被问到原理!本篇继续介绍剩下的两种拦截器,ConnectInterceptor
和CallServerInterceptor
拦截器。开始学习!
@TOC
一.ConnectInterceptor
1.1 源码分析
@Overridepublic Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//streamAllocation是在RetryAndFollowupInterceptor中创建的
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//重点是这句,streamAllocation.newStream()获取可用的connection
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//获取一个物理连接,提供给下一个拦截器进行IO操作
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
看起来ConnectInteceptor内部很简单,但是并非如此,只是把大部分方法都进行了封装。我们再深入研究一下他究竟是如何获取可用连接的!
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//查询一条可用连接,findHealthyConnection继续深入
RealConnection resultConnection =
findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//根据可用连接创建HttpCodec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
findHealthyConnection
方法
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//继续调用findConnection查找可用连接,而且是while循环,也就是一定会找到一条连接才会返回
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();//不健康怎么做
continue;
}
return candidate;
}
}
接着看findConnection
()
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
//首先判断之前这条stream的connection是否可用,这种情况对应重试,也就是第一次请求已经把连接建立好了,重试就无需要重新建立,直接可以复用
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
//如果之前没有建立stream的话,就去连接池进行第一次获取
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
//获取到了直接返回
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
//能够来到这里,说明第一次获取连接失败了,对路由进行处理后再次
//在连接池中进行获取
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
//为什么对路由进行处理后就可能会获取到呢?
//官方的理由是: This could match due to connection coalescing.
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
//第二次去连接池中进行获取
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
//如果还没有获取到就创建一个新的连接
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//创建一个新的连接
result = new RealConnection(connectionPool, selectedRoute);
//更新引用计数,方便后面回收
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
//新建立的连接要进行TCP握手和TLS握手
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
//把新建立的连接放入连接池中
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
//如果多路复用有冗余,也就是有多条通往一个address的连接,就要清除
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
Internal.instance.get(connectionPool, address, this, route);
是如何获得连接的呢?
Internal是个接口只有唯一的实现就是OkhttpClient
@Overridepublic RealConnection get(ConnectionPool pool, Address address,
StreamAllocation streamAllocation, Route route) {
return pool.get(address, streamAllocation, route);
}
@Nullable RealConnectionget(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
//遍历连接池,根据address和route判断是否可用连接
//重点还是这个isEligible方法是怎么判断连接可用的
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
/**
* Returns true if this connection can carry a stream allocation to {@code address}. If non-null
* {@code route} is the resolved route for a connection.
*/
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
//如果这个连接不能再承载新的流,返回
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
//如果非host域不相等直接返回
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
//host与也相等,那就完美匹配直接返回
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
//能够走到这一步说明host域不相等,但是如果是http2可以进行处理
if (http2Connection == null) return false;
// 2. The routes must share an IP address. This requires us to have a DNS address for both
// hosts, which only happens after route planning. We can't coalesce connections that use a
// proxy, since proxies don't tell us the origin server's IP address.
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
经过一些列的判断和处理以后,能够找到一条Connection
,进行IO操作
总结来看寻找连接的步骤如下:
- 查看当前连接是否可用(重试连接情况下)
- 第一次去连接池寻找可用连接
- 没有找到,对路由信息进行处理
- 第二次去连接池寻找可用连接
- 还没找到,直接创建一条可用连接
- 将该连接放入连接池中,更新引用计数
判断连接是否可用步骤如下:
- 查看该连接是否能够再承载一个Stream,如果不可以,直接返回fasle
- 查看非host域是否相等,如果不相等直接返回false
- 查看host域是否相等,如果相等完美匹配,返回true
- 如果不相等查看是否是http2连接,如果不是,返回false
- 进行一些列处理有可能返回true
1.2 原理分析
经过上面的分析,虽然ConnectInterceptor
的incept
方法比较简短,但是实际上,它的判断也是非常多的,为了能够复用连接,减少重新创建连接进行三次握手的时间消耗,Okhttp
可谓是煞费苦心!
原理:正是因为OkHttpClient
内部维护了一个连接池,才让我们能够复用连接,同时Http1x
系列中,一个Connection
对应一个逻辑上的双向Stream
,Http2
实现多路复用,就是一个Connection
可以对应多个Stream,OkhttpClient中的限制是1个。对于每一个请求Call
,ConnectInterceptor
不会立即去创建一个新的连接,而是尝试寻找一个可用的连接,如果经过一系列处理仍然没有,才会创建一个,去进行TCP+TLS
握手,当然,还需要对ConnectionPool进行清理,这里就不再啰嗦了,用的清理Socket的方法是计数+标记清理
,正是有了这样的机制,OkhttpClient
才能够进行高效的,并发性强的,低延迟的网络请求!
二.CallServerInterceptor
说到底上一个拦截器还是为了CallServerInterceptor
做铺垫的,到了这个拦截器才会真正根据之前建立的连接进行请求和响应的IO
2.1 源码分析
@Overridepublic Response intercept(Chain chain) throws IOException {
final RealInterceptorChain realChain = (RealInterceptorChain) chain;
final HttpCodec httpCodec = realChain.httpStream();
//httpCode是什么呢?可以理解为编码Http请求,解码Http响应
StreamAllocation streamAllocation = realChain.streamAllocation();
//获取之前获得的streamAllocation,并拿到connection
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();//获得经过一系列拦截器处理的请求
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);//重点查看1
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
HttpSink httpSink = null;
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method())
&& (request.body() != null || Internal.instance.isDuplex(request))) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
if (Internal.instance.isDuplex(request)) {
// Prepare a duplex body so that the application can send a request body later.
final CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, -1L));
final BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
httpSink = new HttpSink() {
@Override public BufferedSink sink() {
return bufferedRequestBody;
}
@Override public void headers(Headers headers) throws IOException {
List<Header> headerBlock = new ArrayList<>(headers.size() / 2);
for (int i = 0, size = headers.size(); i < size; i++) {
headerBlock.add(new Header(headers.name(i), headers.value(i)));
}
((Http2Codec) httpCodec).writeRequestHeaders(headerBlock);
}
@Override public void close() throws IOException {
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
}
};
} else {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
}
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
if (Internal.instance.isDuplex(request)) {
httpCodec.flushRequest();
} else {
httpCodec.finishRequest();
}
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis());
Internal.instance.httpSink(responseBuilder, httpSink);
Response response = responseBuilder.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis());
Internal.instance.httpSink(responseBuilder, httpSink);
response = responseBuilder.build();
code = response.code();
}
if (Internal.instance.isDuplex(request)) {
Response.Builder builder = response.newBuilder();
Internal.instance.setHttp2Codec(builder, (Http2Codec) httpCodec);
response = builder.build();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
2.2 原理分析
这个拦截器就是最后的一步,获取服务器原始的response
,然后再返回到上一级的每一个拦截器经过处理才会最后返回给用户进行请求成功失败的回调,这一个拦截器IO
部分是使用OKIO
,非常高效。但是这部分太底层了,面试一般问的比较少,我相信,如果我们能够把OkHttp
一些非常核心的东西说的有条有理,就可以了。
三.总结
Okhttp
源码讲解系列到此结束了,还是有很多地方没有说到,但是大体流程应该过了一遍,没办法这里面的内容是在太多了呀,小伙伴们有没有感觉对Okhttp
有了进一步的理解呢?
看一张别人做好的流程图!
这就是Okhttp
一次请求的全部过程了,非常的详细。
上面是OkHttp
的架构图!下次面试再问Okhttp
源码还怕嘛??
先别走,我有一个资源学习群要推荐给你,它是白嫖党的乐园,小白的天堂!
别再犹豫,一起来学习!