线程池
1.什么是线程池
线程池(Thread Pool)帮我们重复管理线程,避免创建大量的线程增加开销。
2.为什么用线程池
降低开销,线程池也可以提高响应速度,了解点 JVM 的同学可能知道,一个对象的创建大概需要经过以下几步:
- 检查对应的类是否已经被加载、解析和初始化
- 类加载后,为新生对象分配内存
- 将分配到的内存空间初始为 0
- 对对象进行关键信息的设置,比如对象的哈希码等
- 然后执行 init 方法初始化对象
线程池的作用就是复用已经创建好的线程。这样就提高了响应速度!
3.线程池种类
Executors提供了四种线程池:具体可以去看我的Github里面有简单的demo
① newCacheThreadPool() 创建一个可缓存的线程池,如果线程池长度超过处理需要,可以灵活回收空闲线程,如果线程池长度小于新处理需求,则新建线程。
②newFixedThreadPool() 创建一个定长的线程池,可以控制线程最大并发数。超过的线程会在线程队列中等待。
③newScheduleThread() 创建一个定长的线程池,支持定时,周期性执行任务
④newSingleThreadExecutor() 创建一个单线程化的线程池,他只会用唯一的工作现场类执行任务,保证所有任务按照指定顺序执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 。
package pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Author:Cream
* Date:2018/4/11
* Description:
*/
public class Test {
public static void main(String[] args) {
/**
* 可缓存线程池
* 如果线程池长度超过处理需要.可灵活回收空闲线程,若无可回收,则新建线程。
*/
ExecutorService service = Executors.newCachedThreadPool();
/**
* 定长线程池
* 可控制线程最大并发数,超出的线程会在队列中等待。
*/
ExecutorService service2 = Executors.newFixedThreadPool(2);
/**
* 定长线程池
* 支持定时及周期性任务执行。
*/
ExecutorService service3 = Executors.newScheduledThreadPool(3);
/**
* 单线程化的线程池
* 它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
*/
ExecutorService service4 = Executors.newSingleThreadExecutor();
//创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
Thread t1 = new Thread(new MyThread());
Thread t2 = new Thread(new MyThread());
Thread t3 = new Thread(new MyThread());
Thread t4 = new Thread(new MyThread());
Thread t5 = new Thread(new MyThread());
service4.execute(t1);
service4.execute(t2);
service4.execute(t3);
service4.execute(t4);
service4.execute(t5);
//关闭
service.shutdown();
service2.shutdown();
service3.shutdown();
service4.shutdown();
}
}
4.线程池的处理流程
创建线程池需要使用 ThreadPoolExecutor 类,它的构造函数参数如下:
public ThreadPoolExecutor(int corePoolSize, //核心线程的数量
int maximumPoolSize, //最大线程数量
long keepAliveTime, //超出核心线程数量以外的线程空余存活时间
TimeUnit unit, //存活时间的单位
BlockingQueue<Runnable> workQueue, //保存待执行任务的队列
ThreadFactory threadFactory, //创建新线程使用的工厂
RejectedExecutionHandler handler // 当任务无法执行时的处理器
) {...}
corePoolSize:核心池的大小。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
workQueue:一个阻塞队列,用来存储等待执行的任务
threadFactory:线程工厂,主要用来创建线程;
handler:表示当拒绝处理任务时的策略。有四个参数:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
ThreadPoolExecutor继承了AbstractExecutorService,而抽象类实现了ExecutorService接口,而这个接口又继承了Executor接口。
在ThreadPoolExecutor类中有几个非常重要的方法:execute() submit() shutdown() shutdownNow()
execute():这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit():这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown()和shutdownNow()是用来关闭线程池的。
4.深入剖析线程池实现原理(源码分析)
①1.线程池状态
在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;
下面的几个static final变量表示runState可能的几个取值。
当创建线程池后,初始时,线程池处于RUNNING状态;
如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。
②任务的执行
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private volatile long keepAliveTime; //线程存货时间
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数
每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。
corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:
public void execute(Runnable command) {
// 判断提交的任务是否为null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//当前池中线程比核心数少,新建一个线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//线程池满了,但任务队列没有满,就可以把线程添加到任务队列里面去
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果这时被关闭了,拒绝任务
if (! isRunning(recheck) && remove(command))
reject(command);
//如果之前的线程已被销毁完,新建一个线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//核心池已满,队列已满,试着创建一个新线程
else if (!addWorker(command, false))
reject(command);
}
线程池主要控制的状态是ctl,它是一个原子的整数,其包含两个概念字段:
- workerCount:有效的线程数量
- runState:线程池的状态
在executer里面调用了addWorker方法:
private boolean addWorker(Runnable firstTask, boolean core) {
//基于CAS+死循环实现的关于线程池状态,线程数量的校验与更新的逻辑,重点看主流程。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//主流程:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//把指定任务作为参数新建一个worker线程
w = new Worker(firstTask);
//t代表woker线程。不是用户提交的线程任务firstTask!!!
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//把新建的woker线程放HashSet里
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动woker线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//如果woker启动失败,执行addWorkerFailed方法,进行善后
addWorkerFailed(w);
}
return workerStarted;
}
任务缓存队列及排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。
workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:设置核心池大小
- setMaximumPoolSize:设置线程池最大能创建的线程数目大小
当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。












网友评论