社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
OkHttp是一个非常优秀的网络请求框架,已被谷歌加入到Android的源码中。目前比较流行的Retrofit也是默认使用OkHttp的。所以OkHttp的源码是一个不容错过的学习资源,学习源码之前,务必熟练使用这个框架,否则就是跟自己过不去。
use -> running source code -> reading & learning the source code.
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://www.baidu.com")
.build();
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
System.out.println("成功");
}
} catch (IOException e) {
e.printStackTrace();
}
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://www.baidu.com")
.build();
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
System.out.println("成功");
}
} catch (IOException e) {
e.printStackTrace();
}
还是以上面的这段最基本的用法作为源码分析的入口。 public OkHttpClient() {
this(new Builder());
}
public Builder() {
dispatcher = new Dispatcher();
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
可以看到我们简单的一句new OkHttpClient(),OkHttp就已经为我们做了很多工作,很多我们需要的参数在这里都获得默认值。各字段含义如下:/** TLS 连接 */
public static final ConnectionSpec MODERN_TLS = new Builder(true)
.cipherSuites(APPROVED_CIPHER_SUITES)
.tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
.supportsTlsExtensions(true)
.build();
/** 未加密、未认证的Http连接. */
public static final ConnectionSpec CLEARTEXT = new Builder(false).build();
可以看出一个是针对TLS连接的配置,一个是针对普通的Http连接的配置;@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
可以看到同步请求生成的是RealCall对象,而异步请求生成的是AsyncCall对象。AsyncCall说到底其实就是Runnable的子类。 synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
可以看到,只是简单的将同步任务当到了runningSyncCalls集合中。 void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//将请求移除集合
if (promoteCalls) promoteCalls();
...
}
...
}
对于同步请求,只是简单的将同步请求移除runningSyncCalls集合。promoteCalls参数是false,因此不会执行promoteCalls方法,promoteCalls方法用于遍历并执行异步请求待执行集合中的请求。 synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
可以看到如果正在执行的请求总数<=64 && 单个Host正在执行的请求<=5,则将请求加入到runningAsyncCalls集合中,紧接着就是利用线程池执行该请求,否则就将该请求放入readyAsyncCalls集合中。上面我们已经说了,AsyncCall是Runnable的子类(间接),因此,在线程池中最终会调用AsyncCall的execute()方法执行异步请求: @Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//拦截器链
if (retryAndFollowUpInterceptor.isCanceled()) {//重试失败,回调onFailure方法
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);//结束
}
}
此处的执行逻辑和同步的执行逻辑基本相同,区别在最后一句代码:client.dispatcher().finished(this);因为这是一个异步任务,所以会调用另外一个finish方法:void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//将请求移除集合
if (promoteCalls) promoteCalls();
...
}
...
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
该方法主要是遍历执行readyRunningCalls集合中待执行的请求,当然前提是正在执行的Call总数没有超过64,并且readyAsyncCalls集合不为空。如果readyAsyncCalls集合为空,则意味着请求差不多都执行了。放入runningAsyncCalls集合中的请求会继续走上述的流程,直到全部的请求被执行。Response result = getResponseWithInterceptorChain();
没错,在getResponseWithInterceptorChain();方法中我们就用到了这个RealInterceptorChain类。Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
可以看到,在该方法中,我们依次添加了用户自定义的interceptor、retryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、 networkInterceptors、CallServerInterceptor,并将这些拦截器传递给了这个RealInterceptorChain。拦截器之所以可以依次调用,并最终再从后先前返回Response,都依赖于RealInterceptorChain的proceed方法。 public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
......
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
......
return response;
}
该方法最核心的代码就是中间的这几句,执行当前拦截器的Intercept方法,并调用下一个(index+1)拦截器。下一个(index+1)拦截器的调用依赖于当前拦截器的Intercept方法中,对RealInterceptorChain的proceed方法的调用:response = realChain.proceed(request, streamAllocation, null, null);
可以看到当前拦截器的Response依赖于下一个拦截器的Intercept的Response。因此,就会沿着这条拦截器链依次调用每一个拦截器,当执行到最后一个拦截器之后,就会沿着相反的方向依次返回Response,最终得到我们需要的“终极版”Response。 @Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();//获取Request对象
RealInterceptorChain realChain = (RealInterceptorChain) chain;//获取拦截器链对象,用于后面的chain.proceed(...)方法
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();//监听器
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
call, eventListener, callStackTrace);
int followUpCount = 0;
Response priorResponse = null;
while (true) {//循环
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);//调用下一个拦截器
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {//路由异常,尝试恢复,如果再失败就抛出异常
throw e.getLastConnectException();
}
releaseConnection = false;
continue;//继续重试
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;连接关闭异常,尝试恢复
releaseConnection = false;
continue;//继续重试
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {//前一个重试得到的Response
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
//Figures out the HTTP request to make in response to receiving {@code userResponse}. This will
//either add authentication headers, follow redirects or handle a client request timeout. If a
//follow-up is either unnecessary or not applicable, this returns null.
// followUpRequest方法的主要作用就是为新的重试Request添加验证头等内容
Request followUp = followUpRequest(response);
if (followUp == null) {//如果一个请求得到的响应code是200,则followUp是为null的。
if (!forWebSocket) { streamAllocation.release(); } return response; }
closeQuietly(response.body());
//-------------------------------异常处理---------------------------------------------
// if (++followUpCount > MAX_FOLLOW_UPS) {//超过最大的次数,抛出异常
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount); }
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
} throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
} if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(followUp.url()), call, eventListener, callStackTrace);
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?");
}
//--------------------------------------------------------------------------------
request = followUp;//得到处理之后的Request,以用来继续请求,在哪继续请求?肯定还是沿着拦截器链继续搞呗
priorResponse = response;//由priorResponse持有
}
}
}
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
//----------------------request----------------------------------------------
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {//添加Content-Type请求头
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");//分块传输
requestBuilder.removeHeader("Content-Length");
}
}
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
//----------------------------------response----------------------------------------------
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());//保存cookie
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")//Content-Encoding、Content-Length不能用于Gzip解压缩
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
这个拦截器的源码还是很简单的,不再详细叙述。@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())//以request的url而来key,获取缓存
: null;
long now = System.currentTimeMillis();
//缓存策略类,该类决定了是使用缓存还是进行网络请求
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;//网络请求,如果为null就代表不用进行网络请求
Response cacheResponse = strategy.cacheResponse;//缓存响应,如果为null,则代表不使用缓存
if (cache != null) {//根据缓存策略,更新统计指标:请求次数、使用网络请求次数、使用缓存次数
cache.trackResponse(strategy);
}
//缓存不可用,关闭
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
//如果既无网络请求可用,又没有缓存,则返回504错误
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.缓存可用,直接返回缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);//进行网络请求,得到网络响应
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//HTTP_NOT_MODIFIED缓存有效,合并网络请求和缓存
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);//更新缓存
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
//有响应体 & 可缓存
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);//写缓存
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {//判断缓存的有效性
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
上面源码中的注释已经解释的很清楚了,下面再简单的说一下流程: private CacheStrategy getCandidate() {
// No cached response.
if (cacheResponse == null) {//没有缓存,直接网络请求
return new CacheStrategy(request, null);
}
// Drop the cached response if it's missing a required handshake.
if (request.isHttps() && cacheResponse.handshake() == null) {//https,但没有握手,直接网络请求
return new CacheStrategy(request, null);
}
// If this response shouldn't have been stored, it should never be used
// as a response source. This check should be redundant as long as the
// persistence store is well-behaved and the rules are constant.
if (!isCacheable(cacheResponse, request)) {//不可缓存,直接网络请求
return new CacheStrategy(request, null);
}
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
//请求头nocache或者请求头包含If-Modified-Since或者If-None-Match
//请求头包含If-Modified-Since或者If-None-Match意味着本地缓存过期,需要服务器验证
//本地缓存是不是还能继续使用
return new CacheStrategy(request, null);
}
CacheControl responseCaching = cacheResponse.cacheControl();
if (responseCaching.immutable()) {//强制使用缓存
return new CacheStrategy(null, cacheResponse);
}
long ageMillis = cacheResponseAge();
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
//可缓存,并且ageMillis + minFreshMillis < freshMillis + maxStaleMillis
// (意味着虽过期,但可用,只是会在响应头添加warning)
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection "Response is stale"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection "Heuristic expiration"");
}
return new CacheStrategy(null, builder.build());//使用缓存
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
String conditionName;
String conditionValue;
//流程走到这,说明缓存已经过期了
//添加请求头:If-Modified-Since或者If-None-Match
//etag与If-None-Match配合使用
//lastModified与If-Modified-Since配合使用
//前者和后者的值是相同的
//区别在于前者是响应头,后者是请求头。
//后者用于服务器进行资源比对,看看是资源是否改变了。
// 如果没有,则本地的资源虽过期还是可以用的
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
return new CacheStrategy(request, null); // No condition! Make a regular request.
}
Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build();
return new CacheStrategy(conditionalRequest, cacheResponse);
}
大致流程如下:(if-else的关系呀)/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
//我们需要网络来满足这个请求。可能是为了验证一个条件GET请求(缓存验证等)。
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
代码量是不是很少?是的。表面上看起来很少,实际上大部分的功能都被封装到其他的类里面去了,此处只是调用。所以为了代码的可读性和可维护性,该封装的还是乖乖的封装吧。HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
可以看出,主要的工作是由StreamAllocation完成。我们来看看这个StreamAllocation的newStream和connection()到底做了什么。 public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
可以看到,最关键的一步就是findHealthyConnection,这个方法的主要的作用就是找到一个可用的连接(如果连接不可用,这个过程会一直持续哦)。 private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.如果是一个新的连接,直接返回就好了
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {//判断连接是否好使
noNewStreams();//连接不好使的话,移除连接池
continue;//不healthy,就一直持续的呀
}
return candidate;
}
}
public void noNewStreams() {
Socket socket;
Connection releasedConnection;
synchronized (connectionPool) {
releasedConnection = connection;
socket = deallocate(true, false, false);// noNewStreams, released, streamFinished核心方法
if (connection != null) releasedConnection = null;
}
closeQuietly(socket);//关闭socket
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);//监听回调
}
}
上述关键代码是deallocate: private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
//以noNewStreams为true, released为false, streamFinished为false;为例
if (streamFinished) {
this.codec = null;
}
if (released) {
this.released = true;
}
Socket socket = null;
if (connection != null) {
if (noNewStreams) {
//noNewStreams是RealConnection的属性,源码的注释是这么说的:
//如果为true,则这个连接就不会再创建新的Stream了,一旦设置成true,就会一直是true
//搜索整个源码,该属性设置的地方如下:
//evitAll:关闭和移除连接池中所有的空闲连接(如果连接空闲(即连接上的Stream数为0),则noNewStreams为true);
//pruneAndGetAllocationCount:移除内存泄漏的连接及获取连接的Stream分配数;
//streamFailed:Stream分配失败;
//综上,这个属性的作用是禁止无效连接创建新的Stream的
connection.noNewStreams = true;
}
if (this.codec == null && (this.released || connection.noNewStreams)) {
release(connection);//释放Connection承载的StreamAllocations资源(connection.allocations)
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
//connectionBecameIdle:通知线程池该连接是空闲连接,可以移除或者作为待移除对象。
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();
}
}
connection = null;
}
}
return socket;//返回待关闭的Socket对象
}
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
//----------排除异常情况----------------
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
//这个方法的作用,与deallocate作用一样
//如果连接不能创建Stream,则释放资源,返回待关闭的close Socket
toClose = releaseIfNoNewStreams();
//经过releaseIfNoNewStreams,如果connection不为null,则连接是可用的
if (this.connection != null) {
// We had an already-allocated connection and it's good.
//存在可使用的已分配连接
result = this.connection;
releasedConnection = null;//为null值,则说明这个连接是有效的
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {//没有可使用的连接,去连接池中找
// Attempt to get a connection from the pool.//首先通过ConnectionPool,Address,StreamAllocation从连接池获取连接,
// 连接池后面会单独讲解*************
Internal.instance.get(connectionPool, address, this, null);//ConnectionPool,Address,StreamAllocation,Route
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;//找到了一个已分配或者连接池中的连接,此过程结束,返回
}
//否则,我们需要一个路由信息,这是一个阻塞的操作
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
//提供更加全面的路由信息,再次从连接池中获取连接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//*实在是没找到,只能生成新的连接******了
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);//添加connection的StreamAllocation添加到connection.allocations集合中*****
}
}
// If we found a pooled connection on the 2nd time around, we're done.
//如果连接是从连接池中找到的,说明是可复用的。不是新生成的,因为新生成的连接,
// 需要去连接服务器之后才能可用呀
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.//连接Server
result.connect(
connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());//将路由信息添加到routeDatabase中。
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
Internal.instance.put(connectionPool, result);//将新生成的连接放入连接池中
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
//如果是一个http2连接,由于http2连接应具有多路复用特性,
// 因此,我们需要确保http2连接的多路复用特性
if (result.isMultiplexed()) {
//deduplicate:确保http2连接的多路复用特性,重复的连接将被剔除
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
private Socket releaseIfNoNewStreams() {
assert (Thread.holdsLock(connectionPool));
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && allocatedConnection.noNewStreams) {
return deallocate(false, false, true);
}
return null;
}
这个方法是说如果连接处于nonewStream状态,则释放该连接。否则,该连接是可用的。关于noNewStream和deallocate方法前面已经解释的很清楚了。 //经过releaseIfNoNewStreams,如果connection不为null,则连接是可用的
if (this.connection != null) {
// We had an already-allocated connection and it's good.
//存在可使用的已分配连接
result = this.connection;
releasedConnection = null;//为null值,则说明这个连接是有效的
}
c)第一次连接池查找(没有提供路由信息)Internal.instance.get(connectionPool, address, this, null);//ConnectionPool,Address,StreamAllocation,Route
if (connection != null) {
foundPooledConnection = true;
result = connection;
}
如果查找到了,则将查找到的连接赋值给result。 List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
f)如果还是没找到,则只能创建新的连接了result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);//添加connection的StreamAllocation添加到connection.allocations集合中*****
g)新的连接,连接服务器// Do TCP + TLS handshakes. This is a blocking operation.//连接Server(connect方法涉及Socket的建立等)
result.connect(
connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());//将路由信息添加到routeDatabase中。
h)新的连接放入线程池 // Pool the connection.
Internal.instance.put(connectionPool, result);//将新生成的连接放入连接池中
i)如果连接是一个HTTP2连接,则需要确保多路复用的特性 //如果是一个http2连接,由于http2连接应具有多路复用特性,
// 因此,我们需要确保http2连接的多路复用特性
if (result.isMultiplexed()) {
//deduplicate:确保http2连接的多路复用特性,重复的连接将被剔除
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
在Connectinterceptor中,起到关键作用的就是ConnectionPool,既然这么关键我们就来看看这个连接池吧。void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
可以看到,在放入连接到connections(Deque)之前,可能是需要执行连接池的“清洁”任务的。连接存入连接池的操作很简单,主要看一下这个cleanUp到底做了些什么? long
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!