线程池
一、 线程池(ThreadPoolExecutor)的构造器的各个参数含义。
1、 corePoolSize:核心线程数,指线程池不关闭就一直存活的线程数;
2、 maximumPoolSize:最大线程数,指线程池能同时存活的最大线程数;
3、 keepAliveTime:空闲的线程保留的时间,指非核心线程空闲的最大时间,超过这个时间就会将这些空闲的非核心线程销毁掉;
4、 unit:空闲线程的保留时间单位;
5、 workQueue:工作的阻塞队列,存储等待执行的任务;
6、 threadFactory:线程工厂,指定了线程池中的线程的创建方式和销毁方式;
7、 handler:拒绝策略,指线程池在达到上限(达到最大线程数且任务队列也满了)的情况下的执行逻辑。
*1)ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。
*2)ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
*3)ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
*4)ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
二、 线程池参数的变化和线程池中线程的生命周期的关系。
1、 当所有任务执行完毕时,活跃线程数为0;
2、 当未处理完的任务数大于核心线程数且小于等于核心数与任务队列的大小之和{corePoolSize <toExecuteRunnableCount<=(corePoolSize+ workQueueSize)}时,线程池不会再新创建线程去处理任务队列中的任务,而是等到核心线程将自己的任务执行完后去执行。
3、 当未处理完的任务数大于核心线程数与任务队列的大小之和且小于最大线程数与任务队列大小之和{(corePoolSize+ workQueueSize)<toExecuteRunnableCount <=(maximumPoolSize+ workQueueSize)}时,线程池会再新创建线程去处理任务队列中的任务。
4、 当未处理完的任务数大于最大线程数与任务队列大小之和{ toExecuteRunnableCount >( maximumPoolSize+ workQueueSize)}时,线程池会将多出来的任务去执行拒绝策略。
5、 当空闲线程的空闲时间超过设置的最大空闲时间时,线程池将销毁该空闲的非核心线程。
image.png
private static ThreadPoolExecutor createThreadPoolExecutor(){
//核心线程数,指线程池不关闭就一直存活的线程数;
int corePoolSize = 5;
//最大线程数,指线程池能同时存活的最大线程数
int maximumPoolSize = 20;
//空闲的线程保留的时间,指非核心线程空闲的最大时间,超过这个时间就会将这些空闲的非核心线程销毁掉;
int keepAliveTime = 10;
//任务等待队列
BlockingQueue<Runnable> workerQueue = new ArrayBlockingQueue<>(1);
//线程工厂,指定了线程池中的线程的创建方式和销毁方式;
MyThreadFactory myThreadFactory = new MyThreadFactory();
//拒绝策略,指线程池在达到上限(达到最大线程数且任务队列也满了)的情况下的执行逻辑。
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.SECONDS,
workerQueue,myThreadFactory,handler);
System.out.println("线程池创建完成------------------------------");
return executor;
}
关闭线程池的方法
shutdown:中断所有空闲线程,那么空闲的线程就会被中断然后被销毁,工作的线程就会等将当前线程池中的任务执行完毕后被销毁(关闭线程池)。
是一个非阻塞方法。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
shutdownNow:立即中断所有线程,立即关闭线程池,将空闲线程销毁,将任务队列中未执行的任务return。注意:会等待当前工作的线程将当前执行的任务完毕后才会销毁所有线程。是一个非阻塞方法。
awaitTermination:给定一个阻塞的最大时间,
如果在时间内执行完了所有任务就在完成任务那一刻解除阻塞;如果在时间内没有执行完所有任务那么就在达到最大时间时让调用者解除阻塞,注意:但是线程池并没有关闭。如果要关闭可以在自定义线程工厂中将线程设为守护线程,那么当main线程执行完毕后线程池也就关闭了。
是一个阻塞方法。
利用Executors创建线程的使用
1.newCachedThreadPool()
新任务会直接创建线程去处理,处理完的线程直接销毁,因为核心线程数为0,任务队列是SynchronousQueue,有自动关闭功能(因为核心线程为0).
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2.newFixedThreadPool(5)
始终保持核心线程的数量进行对任务的执行,不会新创建线程,因为任务队列是:LinkedBlockingQueue,且无边界.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3.newSingleThreadExecutor()
只有一条线程的线程池,不会新创建线程,因为核心线程数和最大线程数都是1,任务队列是LinkedBlockingQueue,且无边界。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4.newScheduledThreadPool(5)
具有延时效果和周期性执行的固定大小的线程池.
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
ThreadPoolExecutor 基本工作流程
1.当提交任务的时候,发现当前活跃线程数小于核心线程数,那么创建线程执行
2.当前活跃线程数大于等于核心线程数的时候,将任务信息填充到阻塞队列中。
3.当队列已满,并且最大线程数 > 核心线程数,创建线程执行任务
4.当队列已满,并且当前线程数 >=最大线程数,那么默认开始拒绝任务
5.如果当前线程数>核心线程数,并且小于等于最大线程数,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
ctl这个变量包含两个部分,线程池运行状态与线程池内有效线程数量,高3位保存runState,低29位保存workerCount。
COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
线程的状态信息:
RUNNING :运行态,能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
TIDYING(整理):如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED(终结):在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:
线程池不是RUNNING状态;
线程池状态不是TIDYING状态或TERMINATED状态;
如果线程池状态是SHUTDOWN并且workerQueue为空;
workerCount为0;
设置TIDYING状态成功。

//线程池执行任务
//1.当提交任务的时候,发现当前活跃线程数小于核心线程数,那么创建线程执行
//2.当前活跃线程数大于等于核心线程数的时候,将任务信息填充到阻塞队列中。
//3.当队列已满,并且最大线程数 > 核心线程数,创建线程执行任务
//4.当队列已满,并且当前线程数 >=最大线程数,那么默认开始拒绝任务
//5.如果当前线程数>核心线程数,并且小于等于最大线程数,那么当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//workerCountOf(c) : c & 高3位为0,低29位为1的CAPACITY,用于获取低29位的有效线程数量
//runStateOf(int c) 方法:c & 高3位为1,低29位为0的~CAPACITY,用于获取高3位保存的线程池状态
//ctlOf(int rs, int wc) 方法:参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl
//也就是说32位含义:(高三位表示状态)+ (低29位表示线程数量)。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果运行的线程少于corePoolSize,则尝试开启一个新线程去运行command
//addWorker(command, true)会产生一个新线程去执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 线程池处于RUNNING状态,并将任务放入workQueue队列,但不执行addWorker(表明不创建新的线程)
// 双重校验可防止添加任务到workQueue队列后,线程池状态由于意外等原因处于非RUNNING状态,
// 此时就需要从workQueue队列remove掉这个任务
// 注:offer方法不会阻塞,如果不能插入队列直接返回false。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果线程池不是running状态或者无法入队列,执行线程池的饱和策略
else if (!addWorker(command, false))
reject(command);
}
addWorker()
1)用循环CAS操作来将线程数加1;
2)新建一个线程,并将这个线程封装成Worker,worker实现Runnable接口,线程调用start方法,执行Worker 的run方法,直接调用runWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取线程池运行状态,
// 线程池的运行状态:runnbale=-1,shutdown=0,stop=1,tidying=2,terminated=3
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// CAS算法
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果添加任务成功,则跳出retry,也就是跳出整个循环体
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
// 通过线程池的ThreadFactory创建一个线程,用于执行这个firstTask任务
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 说明:(rs == SHUTDOWN && firstTask == null)可能是workQueue中仍有未执行完成的任务,
// 创建没有初始任务的worker线程执行
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 提前检查t线程是否启动,如果是就抛非法线程状态异常
if (t.isAlive())
throw new IllegalThreadStateException();
// workQueue队列中添加Worker对象
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 往HashSet中添加worker成功,启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//getTask()循环获取阻塞队列里的任务,获取到任何则交由线程执行
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
正例:自定义线程工厂,并且根据外部特征进行分组,比如机房信息。
public class UserThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
// 定义线程组名称,在 jstack 问题排查时,非常有帮助
UserThreadFactory(String whatFeaturOfGroup) {
namePrefix = "From UserThreadFactory's " + whatFeaturOfGroup + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(null, task, name, 0, false);
System.out.println(thread.getName());
return thread;
}
}
【强制】线程池不允许使用 Executors去创建,而是通过 ThreadPoolExecutor 的方式,这
样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
ThreadPoolExecutor 的invokeAny()方法:
如果有任务完成了就取出这个任务进行返回,是一个阻塞方法 有一个重载方法可以设置超时时间,达到超时时间就抛出TimeoutException取出任务返回后,其它正在执行中的任务不会继续执行,将会被取消,使用场景:为了节约时间,多次执行同一个操作,谁先返回就用谁的.
传入一个Callable接口的任务列表,Callable会被包装成FuturnTask传入ThreadPoolExecutor 的execute方法中.
方法中维护了一个linkedBlockQueue,存放返回结构,一旦有返回结构,则将返回结构返回,并cancel其他线程.
ThreadPoolExecutor 的 invokeAll()方法:
等待所有任务完成了才将所有任务的结果凭证进行返回,是一个阻塞方法 有一个重载方法可以设置超时时间,达到超时时间就将所有任务都取消掉,如果任务在执行中,那么就抛出CancellationException
CompletionService:
由于线程池在处理Callable得到Future的时候 拿到的Future后要拿返回结果,如果没有完成就会陷入阻塞,那么为了拿到最快执行完的返回结果就出现了CompletionService
CompletionService的实现类ExecutorCompletionService(ThreadPoolExecutor)
需要传入一个线程池,在ExecutorCompletionService.submit时提交执行的任务.将任务交给ThreadPoolExecutor执行,由于FuturnTask的run方法调用玩call()方法之后,在讲返回值赋值给FuturnTask的outcome属性时,会调用finishCompletion(),并且此方法提供了一个FuturnTask未实现,可由开发者实现吃done();方法.ExecutorCompletionService中的内部类继承了FuturnTask并重写了done()方法,在done()中将当前执行完的FuturnTask方法阻塞队列中..这时在调用ExecutorCompletionService.take().get()方法就可以获取到最新完成的县城运行结果.
- 面试题:
- 1、你有了解过线程池么?
- 2、那你说说JDK提供的常用的线程池有哪些呢?
- 3、那么工作中使用哪一个线程池呢?(这里有一个坑)
- 那么该如何合理的设置线程池的参数呢?
- 从宏观上看,先看最大线程数应该怎么设置:
- 那么就要分为CPU密集型和IO密集型两种情况:
- CPU密集型:该任务需要大量的运算,但是没有阻塞,CPU一直全速运行。
- 该情况下应尽可能减少CPU对线程的切换,所以要使线程数
- 尽可能少,一般公式是:设置为CPU核心数+1个 4+1=5
- IO密集型:该任务存在大量IO操作,那么就会有大量的阻塞等待的情况,
- CPU大部分时间是在阻塞。
- 该情况下应尽可能减少阻塞的情况所带来的消耗,所以要使线程数
- 尽可能多,一般公式是:设置为CPU核心数2个 42=8
- 其它本人了解的公式:CPU核心数/(1-阻塞系数), 4/(1-0.9)=4/0.1=40个
- 阻塞系数一般取值范围为:0.8~0.9之间
- 一般取0.9
从微观上如何来设置呢? - 需要根据几个值来决定
- 1、tasks :每秒的任务数,假设为500~1000
- 2、taskCost:每个任务花费时间,假设为0.1s
- 3、responseTime:系统允许容忍的最大响应时间,假设为1s
- 那么就需要做几个计算:
- 1、corePoolSize = 每秒需要多少个线程处理?
- threadCount = tasks/(1/taskCost) =taskstaskCount = (500~1000)0.1 = 50~100 个线程。
- 得出:corePoolSize设置应该大于50
- 又根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
- 2、queueCapacity = (coreSizePool/taskCost)*responseTime
- 计算可得 queueCapacity = (80/0.1)*1 = 800。
- 意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
- 切记不能设置为Integer.MAX_VALUE,这样队列会很大,
- 线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,
- 响应时间会随之陡增。
- 3、maxPoolSize = corePoolSize+(max(tasks)- queueCapacity)/(1/taskCost)
- 计算可得 maxPoolSize = 80+(1000-800)/10 =100
- 核心线程的处理能力+(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
- 4、rejectedExecutionHandler:根据具体情况来决定,任务尽量不要随意丢弃,
- 重要任务则要利用一些缓冲机制来处理。
- 5、keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
- 以上都是理想值,实际情况下要根据机器性能来决定。
- 如果在未达到最大线程数的情况机器cpu load已经满了,
- 则需要通过升级硬件和优化代码,降低taskCost来处理。
吞吐量是指对网络、设备、端口、虚电路或其他设施,单位时间内成功地传送数据的数量
CPU负载有点类似于交通拥堵程度
单核CPU就像一条单行道。想象您是一名交警.有时这条单行道太忙了,有汽车在排队等待同行。想让人们知道这条路的交通如何。最直接的指标是就是在特定时间内,这条道路上等待多少辆汽车。如果没有汽车在等待,即将到来的驾驶员便知道他们可以马上驶过。如果有汽车在排队等候,则驾驶员就知道知道要耽误时间了。
所以,交警同志,你应该怎样去定义交通拥塞程度的?可以按照下面的规则:
0.00表示路上根本没有车。实际上,介于0.00和1.00之间都表示没有交通拥堵,到达的汽车可以直接同行。
1.00表示道路完全处于满负荷状态。一切都还不错,但是如果再增加一辆汽车,将会产生交通堵塞。
超过1.00表示有交通堵塞。2.00意味着当前的汽车总量需要两条车道才能保证不堵塞。 3.00意味着当前的汽车总量需要三条车道才能保证不堵塞。
ForkJoinPool
通常大家说的Fork/Join框架其实就是指由ForkJoinPool作为线程池、ForkJoinTask(通常实现其三个抽象子类)为任务、ForkJoinWorkerThread作为执行任务的具体线程实体这三者构成的任务调度机制。通俗的说,ForkJoin框架的作用主要是为了实现将大型复杂任务进行递归的分解,直到任务足够小才直接执行,从而递归的返回各个足够小的任务的结果汇集成一个大任务的结果,依次类推最终得出最初提交的那个大型复杂任务的结果,这和方法的递归调用思想是一样的。当然ForkJoinPool线程池为了提高任务的并行度和吞吐量做了非常多而且复杂的设计实现,其中最著名的就是任务窃取机制。
对照前面介绍的ThreadPoolExecutor执行的任务是Future的实现类FutureTask、执行线程的实体是内部类Worker,ForkJoinPool执行的任务就是Future的实现类ForkJoinTask、执行线程就是ForkJoinWorkerThread。
ForkJoinWorkerThread
该类直接继承了Thread,但是仅仅是为了增加一些额外的功能,并没有对线程的调度执行做任何更改。ForkJoinWorkerThread是被ForkJoinPool管理的工作线程,在创建出来之后都被设置成为了守护线程,由它来执行ForkJoinTasks。该类主要为了维护创建线程实例时通过ForkJoinPool为其创建的任务队列,与其他两个线程池整个线程池只有一个任务队列不同,ForkJoinPool管理的所有工作线程都拥有自己的工作队列,为了实现任务窃取机制,该队列被设计成一个双端队列,而ForkJoinWorkerThread的首要任务就是执行自己的这个双端任务队列中的任务,其次是窃取其他线程的工作队列,以下是其代码片段:
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; //线程池
final ForkJoinPool.WorkQueue workQueue//任务队列
public void run() {
if (workQueue.array == null) {
Throwable exception = null;
try {
onStart();
//在这里 循环从workQueue队列中取任务,交由线程执行
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
}
ForkJoinTask
与FutureTask一样, ForkJoinTask也是Future的子类,不过它是一个抽象类,其实现过程中与ForkJoinPool相互交叉,因此其源码在不理解ForkJoinPool的情况下很难全部看明白,这里只了解大概,ForkJoinTask的作用就是根据任务的分解实现(exec抽象方法),将任务进行拆分,并等待子任务的执行结果,由此可以组合成父任务的结果,以此类推。
ForkJoinTask有一个int类型的status字段,其高16位存储任务执行状态例如NORMAL、CANCELLED或EXCEPTIONAL,低16位预留用于用户自定义的标记。任务未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL这几个小于0的值,这几个值也是按大小顺序的:0(初始状态) > NORMAL > CANCELLED > EXCEPTIONAL.
ForkJoinTask采用了哈希数组 + 链表的数据结构
fork--安排任务异步执行
该方法其实就是将任务通过push方法加入到当前工作线程的工作队列或者提交队列(外部非ForkJoinWorkerThread线程通过submit、execute方法提交的任务),等待被线程池调度执行,这是一个非阻塞的立即返回方法。这里需要知道,ForkJoinPool线程池通过哈希数组+双端队列的方式将所有的工作线程拥有的任务队列和从外部提交的任务分别映射到哈希数组的不同槽位上,将新任务始终push到队列一端的方式可以保证比较大的任务在队列的头部,越小的任务越在尾部,这时候拥有该任务队列的线程如果按照先进后出的方式pop弹出任务执行的话(这时候的任务队列就是当着栈来使用),将会是先从小任务开始,逐渐往大任务进行。而窃取任务的其他线程从对列头部开始窃取的话,将会帮助它完成大任务。
join方法就是ForkJoinTask最核心也最复杂的方法,就是等待任务执行结束并返回执行结果,若任务被取消抛出CancellationException异常,若是其他异常导致异常结束则抛出相关RuntimeException或Error信息,这些异常还可能包括由于内部资源耗尽而导致的RejectedExecutionException,比如分配内部任务队列失败。异常的处理利用了另一个哈希数组 + 链表的结构。该方法不会由于线程被中断而抛出InterruptedException异常,而是会在等到任务执行结束之后再将中断状态复位。
该方法的执行过程中调用了一些未实现的抽象方法:exec方法就是执行任务的入口,任务的逻辑与拆分策略都由该方法实现,只有返回true才表示任务正常完成。该方法可以抛出异常以指示异常结束。getRawResult方法用于返回任务正常结束的执行结果。internalPropagateException方法则是当任务异常的回调钩子函数。一般来讲,我们都会在exec方法里面实现如下的貌似递归的拆分逻辑(伪代码):
if 任务足够小 then
执行任务;
返回结果;
else
拆分成两个子任务t1、t2
t1.fork(); //提交到任务队列
t2.fork(); //提交到任务队列
Object result = t1.join() + t2.join(); //合并结果,这里的加号仅仅代表合并结果,并不是做加法运行
return result; //返回最终结果
fork负责将任务推入队列,等待被调度执行,join则是等待执行任务,并返回结果,而join在执行任务的时候最终就是调用的exec,而exec中任务已经足够小就直接执行,否则会拆分任务之后通过fork将拆分出的子任务再次加入队列,其子任务执行的时候依然会执行exec(假设子任务的exec也是这样的实现),到时候又会继续拆分,或者足够小就直接执行,两个子任务合并结果之后是其父任务的结果,两个父任务的结果又合并成祖父任务的结果,以此类推就是递归的完成了整个任务。
ForkJoinTask 的三个抽象子类
通常我们不会直接实现ForkJoinTask,而是实现其三个抽象子类,ForkJoinTask仅仅是为了配合ForkJoinPool实现任务的调度执行,通常我们使用的时候,仅仅只需要提供任务的拆分与执行即可,RecursiveAction 用于大多数不返回结果的计算, RecursiveTask 用于返回结果的计算, CountedCompleter 用于那些操作完成之后触发其他操作的操作。
RecursiveAction很简单,作为不返回结果的任务,其getRawResult方法永远返回null,setRawResult方法则什么都不做,它增加了一个无返回值的compute抽象方法,用于当ForkJoinTask被调度执行exec方法时调用,exec方法在执行完compute之后直接返回true,表示任务正常结束,而compute方法就是留给我们去实现大任务如何拆小任务,小任务怎么执行的逻辑
RecursiveTask也很简单,既然要返回结果,所以它定义了一个表示执行结果的result字段,getRawResult/setRawResult就用来操作该字段,它增加了一个有返回值的compute抽象方法,用于当ForkJoinTask被调度执行exec方法时调用,exec方法在执行完compute之后,将compute的返回结果作为任务的执行结果赋值给result,并最终返回true表示任务正常结束,同样compute方法也是留给我们去实现大任务如何拆小任务,小任务怎么执行,并且返回任务执行结果的逻辑。
ScheduledExecutorService

ScheduleExecutorService 简述
ScheduledThreadPoolExecutor
public interface ScheduledExecutorService extends ExecutorService 可安排在给定的延迟后运行或定期执行的命令。
schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象
scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务
所有的 schedule 方法都接受相对延迟和周期作为参数,而不是绝对的时间或日期
SheduleExecutorService 是JDK 1.5出来的,比以前的Timer性能好

scheduledThreadPool.scheduleAtFixedRate
每隔一个固定的时间去开始执行线程池中提交给它的任务任务只用往线程池中扔一次,线程池会重复周期性执行,如果执行任务需要的时间大于等于了周期性执行的间隔时间,那么就按照执行任务需要的时间进行周期性间隔执行
scheduledThreadPool.scheduleWithFixedDelay
每隔一个固定的延时时间去开始执行线程池中提交给它的任务,任务只用往线程池中扔一次,线程池会重复周期性执行,如果执行任务需要的时间大于等于了周期性执行的间隔时间 那么就按照执行任务需要的时间与延时时间之和进行周期性间隔执行
ScheduledExecutorService创建线程池:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
用的是DelayedWorkQueue队列
scheduleAtFixedRate方法解读
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//延迟执行任务
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
//将任务添加至队列
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//这里
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
//这里 执行任务 同ThreadPoolExecutor
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
DelayedWorkQueue的add方法,实际上直接调用的offer
1.元素个数超过数组长度,就会调用grow()方法,进行数组扩容。
2.将新元素e添加到优先级队列中对应的位置,通过siftUp方法,保证按照元素的优先级排序。
3.如果新插入的元素是队列头,即更换了队列头,那么就要唤醒正在等待获取任务的线程。这些线程可能是因为原队列头元素的延时时间没到,而等待的。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
// 使用lock保证并发操作安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果要超过数组长度,就要进行数组扩容
if (i >= queue.length)
// 数组扩容
grow();
// 将队列中元素个数加一
size = i + 1;
// 如果是第一个元素,那么就不需要排序,直接赋值就行了
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 调用siftUp方法,使插入的元素变得有序。
siftUp(i, e);
}
// 表示新插入的元素是队列头,更换了队列头,
// 那么就要唤醒正在等待获取任务的线程。
if (queue[0] == e) {
leader = null;
// 唤醒正在等待等待获取任务的线程
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
DelayedWorkQueue的take
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
// 如果没有任务,就让线程在available条件下等待。
if (first == null)
available.await();
else {
// 获取任务的剩余延时时间
long delay = first.getDelay(NANOSECONDS);
// 如果延时时间到了,就返回这个任务,用来执行。
if (delay <= 0)
return finishPoll(first);
// 将first设置为null,当线程等待时,不持有first的引用
first = null; // don't retain ref while waiting
// 如果还是原来那个等待队列头任务的线程,
// 说明队列头任务的延时时间还没有到,继续等待。
if (leader != null)
available.await();
else {
// 记录一下当前等待队列头任务的线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当任务的延时时间到了时,能够自动超时唤醒。
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
// 唤醒等待任务的线程
available.signal();
lock.unlock();
}
}
任务执行run方法,最后调用ScheduledThreadPoolExecutor的内部类ScheduledFutureTask的run方法
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
//执行任务
else if (ScheduledFutureTask.super.runAndReset()) {
//设置下次执行时间
setNextRunTime();
//将任务丢回队列
reExecutePeriodic(outerTask);
}
}
网友评论