Android第三方库源码分析(一):OkHttp

知识的遗忘速度真是快,以前看源码的时候只是初略看一下,也没有完整的去输出笔记,只是在个别源码处添加注释。这次算是把他重新阅读一下并整理输出。从我们最经常使用的OkHttp框架开始。

使用

导入

1
implementation 'com.squareup.okhttp3:okhttp:3.13.1'

目前最新的版本为3.13.1,根据官方的说明,3.13.0后,支持的版本为Android 5.0+(API level 21+) and Java 8+。如果还需要兼容Android 5.0以下的,那就只能使用最后的一个版本,3.12.0。接下来的分析,就是基于okhttp:3.12.0这个版本进行分析的。

1
implementation 'com.squareup.okhttp3:okhttp:3.12.0'

网络请求

分析源码之前,先列出网络请求实例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package cn.guidongyuan.okhttpstudy;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;

import java.io.IOException;

import cn.guidongyuan.deafultstudy.R;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;

public class OkHttpActivity extends AppCompatActivity {

private static final String URL = "https://api.douban.com/v2/movie/top250";
private static final String TAG = "OkHttpActivity";
public static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
private OkHttpClient mOkHttpClient;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_okhttp);

// 初始化OkHttpClient对象
mOkHttpClient = new OkHttpClient.Builder().build();
}

/**
* 同步请求
* @param url
*/
private void syncGet(String url) {
// 不能在主线程发起网络请求,所以必须创建子线程
new Thread(new Runnable() {
@Override
public void run() {
Request request = new Request.Builder()
.url(url)
.build();
try {
Response response = mOkHttpClient.newCall(request).execute();
Log.i(TAG, response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}

/**
* 异步请求
*/
private void asyncGet(String url) {
Request request = new Request.Builder()
.url(url)
.build();
Call call = mOkHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// 在子线程,如果想更新界面必须切换到主线程
Log.i(TAG, e.getMessage());
}

@Override
public void onResponse(Call call, Response response) throws IOException {
// 在子线程,如果想更新界面必须切换到主线程
Log.i(TAG, response.body().string());
}
});
}

/**
* 同步post
*/
private void syncPost(String url, String json) {
new Thread(new Runnable() {
@Override
public void run() {
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
OkHttpClient mOkHttpClient = new OkHttpClient();
Call call = mOkHttpClient.newCall(request);
try {
Response response = call.execute();
Log.i(TAG, "返回内容"+response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}

/**
* 异步post
*/
private void asyncPost(String url, String json) {
// 传入的数据类型为json
RequestBody body = RequestBody.create(JSON, json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
OkHttpClient mOkHttpClient = new OkHttpClient();
Call call = mOkHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
// 在子线程,如果想更新界面必须切换到主线程
}

@Override
public void onResponse(Call call, Response response) throws IOException {
// 在子线程,如果想更新界面必须切换到主线程
Log.d(TAG, response.body().string());
}
});
}
}

同步方法流程

发送请求后,会进入阻塞状态直到收到响应,在主线程中需要创建子线程执行

  1. 创建OkHttpClient和Request对象
  2. 将Request封装为Call对象
  3. 调用Call的execute()发送同步请求,返回Response对象

异步方法流程

onFailure()和onResponse()都是在子线程,如果要更新界面必须要切换到主线程

  1. 创建OkHttpClient和Request对象
  2. 将Request封装为Call对象
  3. 调用Call的enqueue()进行异步请求,返回Response对象

源码分析

上面介绍了OkHttpClient的使用,接下来,就开始深入学习一下其源码,首先从OkHttpClient开始

创建OkHttpClient

1
OkHttpClient mOkHttpClient = new OkHttpClient.Builder().build();

OkHttpClient对象的创建,使用了构建者模式,Builder类为OkHttpClient的内部类,查看build()方法。

1
2
3
public OkHttpClient build() {
return new OkHttpClient(this);
}
1
2
3
4
5
public Builder dispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
// 返回Builder本身的对象
return this;
}
1
2
3
4
OkHttpClient(Builder builder) {
this.dispatcher = builder.dispatcher;
......
}

可以看到在Builder中设置的参数,在最后创建OkHttpClient的时候都设置进去。有了OkHttpClient,第二步就是创建请求Request对象了。

创建Request和Call

1
Request request = new Request.Builder().url(url).build();

Request对象的创建,同样使用了构建者模式,设置了请求的url。然后通过newCall去创建Call对象

1
2
3
4
5
Call call = mOkHttpClient.newCall(request);

@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}

可以看到,Call为接口,真正的实现类为RealCall。然后就用Call对象去发起请求了,请求分为同步和异步。

同步请求

上面介绍到,同步与异步的使用区别,在于调用的方法,同步为execute();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# RealCall.java
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
timeout.enter();
eventListener.callStart(this);
try {
// 1. 调用分发器
client.dispatcher().executed(this);
// 2. 使用职责链模式进行请求,返回Response
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
e = timeoutExit(e);
eventListener.callFailed(this, e);
throw e;
} finally {
// 3. 调用分发器执行结束后的处理
client.dispatcher().finished(this);
}
}

分析execute()方法,其实现步骤主要以上的三步:

  1. 同步请求比较简单,只是调用分发器把其放到同步任务执行队列runningSyncCalls中
1
2
3
4
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
  1. 使用职责链模式进行请求,返回Response,后续会详细介绍
  2. 同步请求finished()方法比较简单,主要就是把上面添加到runningSyncCalls队列中的RealCall移除掉

异步请求

1
2
3
4
5
6
7
8
9
10
11
# RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 使用分发器执行enqueue,并且创建一个AsyncCall对象
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

AsyncCall:可以看到,AsyncCall实现了Runnable

1
2
3
4
5
6
7
8
# AsyncCall.java															final class AsyncCall extends NamedRunnable implements Runnable {
private final Callback responseCallback;

AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 准备异步执行队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

void enqueue(AsyncCall call) {
synchronized (this) {
// 添加到异步**准备**执行任务队列readyAsyncCalls,注意是准备队列
readyAsyncCalls.add(call);
}
// 从准备队列中取出任务,添加到正在执行队列中,并启动线程池进行执行
promoteAndExecute();
}

private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));

List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
// 轮训从准备队列中取出任务
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
// 如果最大线程数超过了,或者最大请求数超过,则跳过不处理,继续存放在readyAsyncCalls中
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
// 从队列中删除
i.remove();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}

for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
// 使用线程池执行任务
asyncCall.executeOn(executorService());
}

return isRunning;
}

// 获取一个线程池,没有则创建
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}

使用线程池执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
// 线程池执行
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
// 如果成功返回,就不执行finished了,finished的执行放在某一个任务执行后其内部的处理
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

启动线程后,在线程的run()方法中,会执行

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override protected void execute() {
try {
// 处理方式与同步一样,使用职责链模式进行请求,返回Response
Response response = getResponseWithInterceptorChain();
......
}
......
} finally {
// 调用分发器执行结束后的处理,然后finished又调用promoteAndExecute()重复从准备队列中取任务,添加到执行队列中
client.dispatcher().finished(this);
}
}
}

分析enqueue()方法,其实现步骤主要以上的三步:

  1. 创建一个AsyncCall的子线程,调用分发器添加到准备队列中
  2. 从准备队列中取出任务,添加到正在执行的列表中,另外获取一个线程池去执行正在执行的任务
  3. 使用职责链模式进行请求,返回Response,后续会详细介绍
  4. 在任务的子线程中执行finished(),又重复执行步骤2

拦截器

OkHttp设计的一个精妙的地方,就是拦截器了,使得职责分明,分工明确,不同的拦截器实现不同的功能。

职责链模式

要想理解好拦截器,就要先理解好一个设计模式:职责链模式。可以参考《大话设计模式》或者 《Android源码设计模式解析与实战》 中关于职责链模式的介绍,还是比较容易理解的,这里就不详细介绍了。

getResponseWithInterceptorChain()

从该方法的被调用就可以看到,拦截器不区分同步和异步,都是调用该方法,在该方法中,会把自定义以及系统提供的拦截器都添加到列表中,然后创建拦截器链去执行,最后返回响应数据Response。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# RealCall.java
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
// 创建一系列拦截器,并将其放入一个拦截器list中
List<Interceptor> interceptors = new ArrayList<>();
// 用户自定义的拦截器
interceptors.addAll(client.interceptors());
// 重定向拦截器,用于重定向、失败重连
interceptors.add(retryAndFollowUpInterceptor);
// 负责将用户的Request转换成一个实际的网络请求Request,再调用下一个拦截器获取Response,然后将Response转换成用户的Response
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 负责控制缓存
interceptors.add(new CacheInterceptor(client.internalCache()));
// 负责进行连接主机,在这里会完成socket连接,并将连接返回
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
// 和服务器进行真正的通信,完成Http请求
interceptors.add(new CallServerInterceptor(forWebSocket));
// 创建一个拦截器链RealInterceptorChain
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 执行拦截器的proceed方法
return chain.proceed(originalRequest);
}

RealInterceptorChain可以看作是整个拦截器链的控制器,其proceed方法会逐步调用interceptors列表中的拦截器,然后又创建一个下一个拦截器链设置进去,典型的职责链模式的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# RealInterceptorChain.java
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...省略...
// Call the next interceptor in the chain.
// 创建下一个拦截器链,注意不同点,在于参数 index + 1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 获取当前的拦截器
Interceptor interceptor = interceptors.get(index);
// 执行该拦截器,并把创建好的下一个拦截器链设置到其中
Response response = interceptor.intercept(next);
...省略...
return response;
}

Interceptor有很多不同的实现类,分别有不同的职责,但是intercept()的执行流程基本如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override public Response intercept(Chain chain) throws IOException {

// 1. 拦截前处理
...省略...

// 2. 调用下一个拦截器链进行处理
Response networkResponse = chain.proceed(requestBuilder.build());

// 3. 拦截后处理
...省略...

// 4. 返回Response给上一个拦截器
return responseBuilder.build();
}

步骤:

  1. 创建一系列拦截器,并将其放入一个拦截器list中
  2. 创建一个拦截器链RealInterceptorChain,并执行拦截器的proceed方法
  3. 在proceed方法中,获取其中的一个拦截器并创建下一个拦截器链
  4. 调用intercept执行拦截器的具体实现,在中间会去执行下一个拦截器的操作

RetryAndFollowUpInterceptor

重定向失败重连拦截器

  1. 创建StreamAllocation对象
  2. 调用RealInterceptorChain.proceed(…)进行网络请求
  3. 根据异常结果或者响应结果判断是否要进行重新请求
  4. 调用下一个拦截器,对response进行处理,返回给上一个拦截器

BridgeInterceptor

桥接拦截器,主要是对发送和返回的数据进行转化

  1. 是负责将用户构建的一个 Request请求转化为能够进行网络访问的请求
  2. 将这个符合网络请求的 Request进行网络请求
  3. 将网络请求回来的响应 Response转化为用户可用的Response.

CacheInterceptor

缓存拦截器,这部分会在接下来介绍缓存机制中重点介绍

ConnectInterceptor

网络连接拦截器,这部分会在接下来介绍连接池中重点介绍

CallServerInterceptor

发送网络请求拦截器,会在下面详细介绍

缓存机制

OkHttp使用缓存非常简单,只需要在创建的使用设置一个缓存对象Cache,缓存逻辑的处理交给了CacheInterceptor拦截器进行处理。

Cache

首先看一下怎样设置使用缓存功能。

1
2
3
mOkHttpClient = new OkHttpClient.Builder()
.cache(new Cache(new File("cache"), 24*1024*1024))
.build();

查看Cache的构造函数,传入缓存的路径以及缓存的最大大小。另外可以看到其内部是使用DiskLruCache去实现的。也可以看到,创建Cache的时候,会创建其内部类InternalCache,在内部类中去调用Cache的非public方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Cache.java
Cache(File directory, long maxSize, FileSystem fileSystem) {
this.cache = DiskLruCache.create(fileSystem, directory, VERSION, ENTRY_COUNT, maxSize);
}

final InternalCache internalCache = new InternalCache(){
@Override public Response get(Request request) throws IOException {
// 实际上的执行还是Cache的方法
return Cache.this.get(request);
}

@Override public CacheRequest put(Response response) throws IOException {
return Cache.this.put(response);
}
......
}

先看一下put方法,怎样设置缓存对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Nullable CacheRequest put(Response response) {
String requestMethod = response.request().method();
if (HttpMethod.invalidatesCache(response.request().method())) {
try {
remove(response.request());
} catch (IOException ignored) {
// The cache cannot be written.
}
return null;
}
// 和上面的一样,都是验证有效性,只缓存 GET 请求,其他都不缓存
if (!requestMethod.equals("GET")) {
// Don't cache non-GET responses. We're technically allowed to cache
// HEAD requests and some POST requests, but the complexity of doing
// so is high and the benefit is low.
return null;
}

if (HttpHeaders.hasVaryAll(response)) {
return null;
}
// 创建Entry去存放要缓存的内容,缓存不是缓存整个response,只缓存部分,就都放在Entry中
Entry entry = new Entry(response);
DiskLruCache.Editor editor = null;
try {
// 通过key获取editor,key为url的md5
editor = cache.edit(key(response.request().url()));
if (editor == null) {
return null;
}
// 写入缓存
entry.writeTo(editor);
// 返回CacheRequest的实现类,后续缓存拦截器就可以通过该对象去执行相应的缓存
return new CacheRequestImpl(editor);
} catch (IOException e) {
abortQuietly(editor);
return null;
}
}

再看一下怎样获取缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Cache.java
@Nullable Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
try {
// 通过key值拿到缓存快照,记录缓存某一时刻的信息
snapshot = cache.get(key);
if (snapshot == null) {
return null;
}
}
......
try {
// 拿到entry的内容
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
} catch (IOException e) {
Util.closeQuietly(snapshot);
return null;
}
// 从entry中拿到Resonse
Response response = entry.response(snapshot);
// 再进行匹配验证,不符合则关闭流返回null
if (!entry.matches(request, response)) {
Util.closeQuietly(response.body());
return null;
}

return response;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# Entry.java
public Response response(DiskLruCache.Snapshot snapshot) {
String contentType = responseHeaders.get("Content-Type");
String contentLength = responseHeaders.get("Content-Length");
// 用Entry中的数据设置Request
Request cacheRequest = new Request.Builder()
.url(url)
.method(requestMethod, null)
.headers(varyHeaders)
.build();
// 利用Entry中的数据,以及snapshot设置body,就从缓存中获取到Response
return new Response.Builder()
.request(cacheRequest)
.protocol(protocol)
.code(code)
.message(message)
.headers(responseHeaders)
// snapshot中包含有BufferedSource类型的bodySource,CacheResponseBody作为响应体的读取
.body(new CacheResponseBody(snapshot, contentType, contentLength))
.handshake(handshake)
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(receivedResponseMillis)
.build();
}

CacheInterceptor

上面介绍了怎样获取和设置缓存对象,在OkHttp中,缓存的具体操作就是通过CacheInterceptor缓存拦截器进行执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# CacheInterceptor.java
@Override
public Response intercept(Chain chain) throws IOException {
// 获取缓存中的信息
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;

long now = System.currentTimeMillis();
// 缓存策略,内部有networkRequest和cacheResponse,去指定是否使用网络、缓存或者两者
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;

if (cache != null) {
cache.trackResponse(strategy);
}

if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}

// If we're forbidden from using the network and the cache is insufficient, fail.
// 如果没有网络且没有缓存,直接返回504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}

// If we don't need the network, we're done.
// 如果没有网络,直接返回
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}

Response networkResponse = null;
try {
// 交给下一个拦截器ConnectInterceptor进行网络请求
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}

// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
// HTTP_NOT_MODIFIED:304 表示服务器告知客户端,缓存可以使用
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();

// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
// 执行到该步,表示没有了缓存,那么就使用响应数据创建一个response
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();

if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
// 添加符合,写入缓存并且进行返回
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}

// 再次判断,如果有效就把缓存删除掉
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}

return response;
}

从上面可以知道,拦截器的缓存策略,是通过CacheStrategy,CacheStrategy的创建是通过工厂模式创建的,通过不同的判断条件设置其两个参数。如果networkRequest为null,则表示不进行网络请求;而如果cacheResponse为null,则表示没有有效的缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public CacheStrategy get() {
// 获取CacheStrategy对象
CacheStrategy candidate = getCandidate();

if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
// We're forbidden from using the network and the cache is insufficient.
return new CacheStrategy(null, null);
}

return candidate;
}

/** Returns a strategy to use assuming the request can use the network. */
private CacheStrategy getCandidate() {
...省略...
return new CacheStrategy(request, null);
}

看了上面的分析,基本可以知道Okhttp的缓存流程,总结如下:

  • OkHttp的缓存,使用的是Cache类来实现,其底层使用DiskLruCache,用url的md5作为key,response作为value
  • 缓存的获取与写入是通过CacheInterceptor进行操作,其内部创建CacheStrategy作为缓存策略,包含两个属性,networkRequest决定是否连接网络,cacheResponse是否是有效的缓存。
    • 如果没有网络且没有缓存,直接返回504
    • 没有网络,返回缓存
    • 通过下一个拦截器进行网络请求,收到304则直接使用缓存返回
    • 把网络请求后的数据保存到缓存中,返回请求的数据
  • 另外,OkHttp缓存的写入,使用的是NIO,NIO为非阻塞性,面向的是数据流,可以调高写入与读取的效率。在这里就不详细展开分析了。

连接池

学习OkHttp的另外一个重要知识点,就是连接池了。在连接池中,会维护与服务器进行数据传输的连接数目,并且会自动清除空闲的连接。

ConnectInterceptor拦截器

负责进行连接主机,在这里会完成socket连接,并将连接返回,同样是从拦截器的intercept方法进行分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# ConnectInterceptor.java
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
// 获取StreamAllocation,StreamAllocation对象在RetryAndFollowUpInterceptor创建好,通过职责链传递到此处才开始使用
StreamAllocation streamAllocation = realChain.streamAllocation();

boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 创建HttpCodec对象,httpCodec用于编码request和解码response
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
// RealConnection用来进行实际的网络传输,内部有Socket对象
RealConnection connection = streamAllocation.connection();
// 把前面创建的对象传递给下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

查看上面创建HttpCodec的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# StreamAllocation.java
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
...省略...
try {
// 找“健康的”RealConnection
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
// 通过RealConnection创建HttpCodec
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
}
}

具体是怎样找到RealConnection的,可以详细看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
// while循环直到找到return
while (true) {
// 继续看findConnection()方法
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
}
}

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
synchronized (connectionPool) {
...省略...
if (result == null) {
// 如果为空,尝试从连接池中获取,这个方法的关键点,如果获取到connection不为空(第三个参数为this,找到合适的RealConnection赋值到connection)
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// 如果没有找到,则创建一个
if (!foundPooledConnection) {
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}

// 进行实际的的网络连接
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
synchronized (connectionPool) {
// 把上面创建的对象放到线程池中
Internal.instance.put(connectionPool, result);
// 如果同时创建了到同一地址的另一个多路复用连接,那么释放此连接并获取该连接。
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
// 最后进行返回
return result;
}

从上面的分析,获取RealConnection的流程,总结如下:

  • 在ConnectInterceptor中获取StreamAllocation的引用,通过StreamAllocation去寻找RealConnection
  • 如果RealConnection不为空,那么直接返回。否则去连接池中寻找并返回,如果找不到直接创建并设置到连接池中,然后再进一步判断是否重复释放到Socket。
  • 在实际网络连接connect中,选择不同的链接方式(有隧道链接(Tunnel)和管道链接(Socket))
  • 把RealConnection和HttpCodec传递给下一个拦截器

ConnectionPool连接池类

上面介绍到从连接池中调用get和put,接下来就详细介绍。连接池ConnectionPool的创建,在Builder中默认就创建了

1
2
3
4
5
6
7
8
9
10
11
12
# OkHttpClient.java
public Builder() {
// 默认就创建连接池对象,如果先自定义连接池,可以使用下列方法
connectionPool = new ConnectionPool();
}

// 自定义连接池
public Builder connectionPool(ConnectionPool connectionPool) {
if (connectionPool == null) throw new NullPointerException("connectionPool == null");
this.connectionPool = connectionPool;
return this;
}

进去ConnectionPool看其get和put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final class ConnectionPool {

// 通过注释可以知道,创建该线程池,用来清除过期的连接
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

// 最大空余线程池的数量,在下面默认初始化中设置数量为5
private final int maxIdleConnections;
// keep-alive时间,在下面默认初始化中设置为5分钟
private final long keepAliveDurationNs;
// 存放RealConnection的队列
private final Deque<RealConnection> connections = new ArrayDeque<>();

final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;

public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
// 从connections列表中遍历,找到符合的
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
// 可以进去看其方法,就是赋值并且创建StreamAllocation的弱引入添加到connection中的引用列表中,通过列表的大小可以知道网络链接的负载量,是否超值制定的大小。后序判断RealConnection是否空余也结合了该列表数
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}

// put进去一个RealConnection,如果不是正在清除,则线程池会执行回收空闲连接的操作
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
// 执行回收操作
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
}

具体回收的判断,可以看cleanupRunnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
// 可以看到是一个死循环
while (true) {
long waitNanos = cleanup(System.nanoTime());
// 返回-1表示没有连接、空闲、正在使用,就退出while循环
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
// 计算等待时间
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
// wait进行等待,等待waitNanos时间后,自动继续执行清除
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
long cleanup(long now) {
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

// 内部有点类似于gc的标记清除算法,引入就是上面介绍到的弱引用,如果connection还在使用,跳过
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
// 空余connection+1
idleConnectionCount++;

// 找出最长的空余时间的connection
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}


if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 如果最长时间大于keep-alive或者空余大于最大,从列表中移除,并且最后执行close结束socket
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

通过分析就可以知道,清除算法,使用类似GC中的引用计算算法,如果弱引用StreamAllocation列表为0,则表示空闲需要进行回收。

CallServerInterceptor

向服务发起真正的网络请求,并且接受服务器返回的响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Override public Response intercept(Chain chain) throws IOException {
......
// 向socket写入头部信息,httpCodec为接口,实现类有Http1Codec和Http2Codec,这里我们使用的是Http1Codec
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);

Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {

if (responseBuilder == null) {
......
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// 往Request的body写入内容
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
}
......
}
// 表明完成写入工作
httpCodec.finishRequest();

if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
// 读取网络请求的返回信息头部
responseBuilder = httpCodec.readResponseHeaders(false);
}

Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);

response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

code = response.code();
}

realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);

if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}

总结:

Okhttp框架的分析基本完成,可以看到重点主要是三个,拦截器链、缓存机制以及连接池的使用。

  • 拦截器链使用的是职责链模式,将网络请求的不同职责分工到不同的拦截器中,分工明确,并且还可以添加自定义拦截器,扩展方便。
  • 缓存机制通过创建Cache,内部通过DiskLruCache去实现。另外在缓存拦截器中通过不同的缓存策划判断返回不同的请求结果。
  • 连接池的引入,可以对连接进行管理,避免频繁地建立和断开连接。在连接池中维护了一个线程池清理空闲的连接,判断连接是否可用通过判断引入计算数量。

公众号:亦袁非猿

欢迎关注,交流学习