0.关键核心类
- Dispatcher 分发器类。总控:控制从runningAsyncCalls(异步任务队列)中 取走或者插入。
如果没有一个中央调度器,每个异步请求都盲目地创建线程去执行,会很快导致问题:
避免手机资源耗尽、避免服务器过载、缺乏管理(无法方便地取消一系列请求或查看请求状态。)
1.无限制地创建线程会消耗大量内存和 CPU 资源,可能导致应用崩溃或极度卡顿。
2.向同一服务器发起过多并发连接,可能会被服务器视为攻击或导致服务器响应变慢。
3.无法方便地取消一系列请求或查看请求状态。
维护线程池用于,
OkHttp 客户端中负责管理并控制异步请求执行策略的核心组件,就像一个交通指挥中心,决定哪个请求可以“上路”(进入执行状态),
以及有多少请求可以同时并发。
OkHttp 的 Dispatcher 的主要作用是管理异步请求的执行队列,控制并发请求的数量,并维护一个用于执行这些网络请求的线程池。
三大队列:
准备执行的异步请求队列 (readyAsyncCalls),这些请求已经准备好,但还在排队,等待有可用的资源(线程)来执行它们。
正在运行的异步请求队列 (runningAsyncCalls),这些请求已经从 readyAsyncCalls 中被取出,并正在由线程池中的某个线程执行网络操作。
正在运行的同步请求队列 (runningSyncCalls),因为没有回调且会阻塞调用线程,所以它们不需要“准备队列”。
但 Dispatcher 仍然会跟踪它们,主要是为了在取消请求或查看整体状态时能够包含它们。
maxRequests = 64: 最大并发请求数。默认最多允许 64 个异步请求同时处于执行状态(在 runningAsyncCalls 中)。这是全局限制。
maxRequestsPerHost = 5: 每台主机的最大并发请求数。默认对同一个主机(相同的 hostname)最多只允许 5 个请求同时执行。这是为了防止对单个服务器造成过大压力。
线程池管理:
Dispatcher 内部维护了一个 ExecutorService(线程池)来实际执行网络请求。
当请求满足执行条件时,Dispatcher 会从线程池中获取一个线程来运行它。
取消请求:
你可以通过 Call.cancel() 取消单个请求,
或者通过 Dispatcher.cancelAll() 取消所有请求(例如在 Activity 的 onDestroy 中)。
Dispatcher 会负责从各个队列中移除这些请求并中断其执行。
核心机制:
使用 readyAsyncCalls 和 runningAsyncCalls 两个队列,结合 maxRequests 和 maxRequestsPerHost 两个阀门来控制流量。
每当有新请求加入或旧请求完成时,都会触发调度检查 (promoteAndExecute)。
管理和调度异步请求的执行,控制并发度,防止资源过载。
-
getResponseWithInterceptorChain() 核心方法,获取返回数据。
构建拦截器链,然后通过依次执行拦截器链中的每个不同作用的拦截器,来获取服务器的数据返回。 -
ConnectionPool 连接池复用。url相同可以复用;哪些链接可以打开状态。
OkHttp 的 ConnectionPool 的主要作用是复用和管理与服务器的 HTTP 和 HTTP/2 连接,
从而显著减少网络请求的延迟、节省带宽和资源,并提高应用程序的性能。
连接复用、连接管理、异步清理
简单来说,ConnectionPool 是 OkHttp 高效、高性能背后的无名英雄,它通过智慧的连接管理,让你的网络请求变得更快、更省资源。
0.1用到的设计模式
- Builder设计模式:创建 Client 和 Request 对象。
适合有多个参数构建对象的场景。链式调用。 - 责任链:getResponseWithInterceptorChain().
0.2 同步和异步
同步请求:阻塞。call.execute()
发送请求后,就会进入阻塞状态,直到收到响应。
请求会立即在当前调用线程中执行并阻塞该线程。Dispatcher 只是将其添加到 runningSyncCalls 中进行跟踪,不参与线程调度。
异步请求:不阻塞,推荐使用。 call.enqueue()
会开启新的工作线程,做网络请求工作。
全面管理。请求被提交给 Dispatcher,由它负责加入队列、控制并发、分配线程池线程执行,并在完成后回调结果。
1.基本使用
//1.创建client 和 Request对象。builder模式创建。
OkHttpClient client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
public void asyRequest(){
//构建懈怠了请求信息的Request对象。
Request request = new Request.Builder().url("http://www.baidu.com")
.get().build();
Call call = client.newCall(request);//2.将Request对象封装成Call对象******
//Response response = call.execute();//3.同步请求
//System.out.println(response.body().string());
call.enqueue(new Callback() {//3.异步请求
@Override
public void onFailure(Call call, IOException e) {
System.out.println("Fail"); //子线程中
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//4.处理返回数据,目前还是子线程中
System.out.println(response.body().string());
}
});
}
1.1 同步请求的流程
同步请求代码流程:保存同步请求,移除同步请求。
client.newCall()
----RealCall.newRealCall(this,request,false)
RealCall call = new RealCall(client,originalRequest,forWebSocket)
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client,forWebSocket);
call.execute()--RealCall.execute()
client.dispatcher().executed(this)
runningSyncCalls.add(call); //【重点】同步请求添加到队列中
Response result = getResponseWithInterceptorChain()//获取结果
client.dispatcher().finished(this)//自动回收同步请求
finished(runningSyncCalls, call, false);
if(!calls.remove(call)) //runningSyncCalls 移除这个同步请求
1.2 同步请求关键代码段
Response response = call.execute();//3.同步请求
// RealCall.kt
override fun execute(): Response { //执行同步请求
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter()
callStart()
try {
client.dispatcher.executed(this) //放入同步队列
return getResponseWithInterceptorChain() //获取同步请求结果
} finally {
client.dispatcher.finished(this) //关键代码,从同步执行队列中移除请求
}
}
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest) //获得服务器返回
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}
//Dispatcher.kt
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//移除同步请求
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute()//将符合条件的调用从 readyAsyncCalls 提升到 runningAsyncCalls,并在执行器服务上运行它们
if (!isRunning && idleCallback != null) {
idleCallback.run() //闲置:我们现在没有网络任务要处理了,所有线程都在闲着
}
}
2 同步请求的流程 call.enqueue(new Callback()
异步请求流程和源码:
请求完成后,会调用 promoteCalls() 提升请求,将请求从异步等待队列移到执行队列中。
- Call call = client.newCall(request)
- call.enqueue(new Callback(){...})) //异步网络请求
//RealCall.java
@0verride public void enqueue(Callback responseCallback){
synchronized(this){
//只能执行一次
if(executed) throw new IllegalStateException("Already Executed");
executed = true; //1.标记已经执行过,
}
captureCallStackTrace();
eventListener.callStart(this);
//2.封装成一个 AsyncCall 对象
//3.enqueue实际的异步请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
AsyncCall 是继承自 NamedRunnable 的线程对象。
//Dispatcher.java
synchronized void enqueue(AsyncCall call){ //同步锁
//4.1 当前正在请求的Runnable个数小于64;且当前请求的host请求数小于5个时。
if (runningAsyncCalls,size()< maxRequests && runningCallsForHost(call)< maxRequestsPerHost){
runningAsyncCalls.add(call); //加入到异步请求执行中队列中。正在执行的任务
executorService().execute(call);//通过线程池执行异步请求call,最终会调用call的run()方法。
} else {
readyAsyncCalls.add(call);//4.2 加入到等待执行异步队列中,缓存
}
}
↓
// 线程池
public synchronized ExecutorService executorService(){
if(executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),Util,threadFactory("0kHttp Dispatcher", false));
}
return executorService;
}
//execute(call)
// 5.在NamedRunnable 中,执行run(),里面调用了execute()
//NamedRunnable.java
@Override public final void run(){
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute(); //重点。
} finally{
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();//抽象方法
↓
//RealCall.java
@Override protected void execute(){ //相当于run(),在子线程中
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//构建拦截器的链条
if(retryAndFollowUpInterceptor.isCanceled()){//重定向重试拦截器,是否已经取消
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else { //未取消
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch(IOException e){
if(signalledCallback){
// Do not signal the callback twice!
Platform.get().log(INFO,"Callback failure for "+ toLoggableString(),e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//6.【重要】
client.dispatcher().finished(this);
}
}
↓
//Dispatcher.java
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls,T call, boolean promoteCalls){
int runningCallsCount;
Runnable idleCallback;
synchronized(this){
//6.1 把请求从正在执行队列中删除
if(!calls.remove(call)) {throw new AssertionError("Call wasn't in-flight!");}
//6.2 调整整个异步请求队列
if(promoteCalls) { promoteCalls();}
//6.3 重新计算正在执行的线程数量
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if(runningCallsCount ==0 && idleCallback != null){
idleCallback.run();
}
} //异步请求结束
异步总结:
- 构架一个OkHttpClient对象;
- 构建一个Request对象,通过 OkHttpClient 和Request对象,构建出Call对象;
- 执行call的 enqueue() 方法。http请求添加到 调度器Dispatcher 中。
线程池,执行中的队列,等待队列。
----------End------------------------







网友评论