线程池

作者: 码农崛起 | 来源:发表于2018-05-14 10:01 被阅读0次
Executor.png

Executor的主要作用是解耦任务提交和任务执行(包括如何使用线程,如何调度)

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
    r.run();
  }
 }

Executor本身并不表示使用线程

ExecutorService提供了关闭机制以及提交任务返回Future对象用于追踪任务执行进度或取消任务。

FutureTask.png

先分析一下AbstractExecutorService实现中用到的Future的实现类FutureTask的实现机制

状态.png

如果当前的任务是Runnable,通过RunnableAdapter转为Callable

RunnableAdapter.png run.png

FutureTask自身实现了Runnable,包装内部的Callable或Runnable

get.png awaitDone.png report.png

Future#get实现机制:如果任务还没开始,调用线程加入任务的等待队列,等待任务完成或取消时被唤醒,否则等待任务到达最终状态,正常执行返回结果或异常时抛出ExecutionException异常

cancel.png finishCompletion.png

Future#cancle实现机制:如果任务还没开始,状态改为INTERRUPTING或CANCELLED,如果支持中断,打断当前线程(参考run方法,先设置runner线程,再修改状态,所以可能当前状态是NEW,但是runner已经设置了),然后唤醒所有之前等待的线程。

AbstractExecutorService的实现机制:任务的具体执行都委托给从Executor继承的execute方法,主要实现了submit和invokeAll,invokeAny方法。

invokeAll.png ExecutorCompletionService.png

任务的执行都委托给Executor,所有提交的任务都用QueueingFuture包装,任务执行完加入内部的BlockingQueue。

invokeAny.png

invokeAny:先提交一个任务,然后循环检查ExecutorCompletionService的阻塞队列是否有已完成的任务,有就返回,没有就再提交一个新任务,直到任务都提交完,然后阻塞。第一个任务完成后,cancel所有可以cancel的任务。

AbstractExecutorService有两个具体的子类:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又继承了ThreadPoolExecutor

ThreadPoolExecutor:

线程池运行状态.png 线程池参数.png

workQueue表示任务队列,workers表示当前执行任务的线程集合。

Worker.png

Worker继承了AbstractQueuedSynchronizer,自身就是一个简单的互斥锁,实现了Runnable,Worker在构造时内部会利用ThreadFactory产生一个线程,线程启动时,执行Worker自身的run方法。

runWorker.png

Worker执行过程中,会通过getTask获取任务,每次执行任务之前都会获取worker自身的互斥锁

getTask.png

getTask通过返回null(线程池stop,或shutdown之后任务队列为空,或者动态调整参数之后线程太多,或者获取任务超时(说明任务太少了,不需要那么多线程)),控制Worker结束循环

processWorkerExit.png

Worker循环结束有两种原因:执行的任务抛出异常,getTask返回null。
如果是后者,再次检查以确保目前的线程数不低于最低要求,线程数不够时添加worker线程。因异常而结束任务循环也会添加新的worker线程。

addWorker-1.png addWorker-2.png

添加worker失败的原因有三:线程池stop;shutdown之后任务队列为空;当前线程数超过最大线程数。worker添加成功之后,启动内部的线程,开始循环处理任务。

execute.png

关键点在于,核心线程全部启动之后,任务会先加入任务队列,只有任务队列是有界队列,且队列满了才会启动非核心线程!!!

shutdown.png interruptIdleWorkers.png tryTerminate.png

shutdown之后,修改状态为SHUTDOWN,然后打断所有idle线程,所谓idle,就是可以获取worker的互斥锁,说明worker当前在等待任务而不是执行任务,参考runWorker方法。如果当前所有worker正巧都在等待任务,所有worker都会被打断(processWorkerExit方法会在worker退出循环时调用,根据情况再添加worker)。tryTerminate中会先检查如果当前状态是SHUTDOWN但是任务队列不为空,不能进入terminal状态,如果当前是shutdown且任务队列为空且线程数为空,修改状态为过渡状态TIDYING,然后修改为最终状态TERMINATED。

shutdownNow.png

打断所有已经启动的worker,返回所有还未执行的任务。

awaitTermination.png

shutdown之后线程池并不一定关闭!!!所以正确的做法是shutdown之后调用awaitTermination等待所有任务执行完后所有线程被打断。

ThreadPoolExecutor.png

ThreadPoolExecutor可控制参数:
corePoolSize:核心线程数,worker数量小于corePoolSize时每次提交任务都启动一个core线程,可以使用set方法在运行时调整。
maximumPoolSize:最大线程数,包括core和非core线程,从上面的源码分析可以直到只有任务队列为有界队列时才会启动非core线程。
workQueue:任务队列,只有任务队列为有界队列时才会启动非core线程。
keepAliveTime:worker在指定时间内获取不到任务,说明此时人浮于事,需要裁员,getTask会返回null,结束获取任务超时的worker。
threadFactory:定义如何产生线程,默认直接new Thread。
handler:提交任务时任务队列满了或线程池shutdown之后的行为,默认抛出RejectedExecutionException异常,可选策略包括忽略(DiscardPolicy),在提交任务的线程中执行(CallerRunsPolicy),移除任务队列里最前面的任务(DiscardOldestPolicy)。
keepAliveTime:如果通过set设置了值,如果一个worker超过指定时间未获得任务就会timeout而结束循环,如果当前线程数超过了corePoolSize,不会再添加新的worker,默认不支持timeout。
allowCoreThreadTimeOut:默认线程数小于corePoolSize,timeout之后就会添加新的worker,如果设置了allowCoreThreadTimeOut,只有当前线程为0时才会添加新的worker。

下面分析一下ThreadPoolExecutor的子类ScheduledThreadPoolExecutor的实现机制:

ScheduledThreadPoolExecutor.png

从构造上看,主要是任务队列使用了DelayedWorkQueue,DelayedWorkQueue是一个简单的基于二叉堆实现的优先级阻塞无界队列,所有任务按触发时刻排序,keepAliveTime为0,不支持worker超时。从上文的分析可知,使用无界队列时是不会启动非core线程的,maximumPoolSize设置成了Integer.MAX_VALUE而不是corePoolSize,避免运行时修改corePoolSize时还要修改maximumPoolSize。

ScheduledFutureTask.png

所有提交的任务都会用ScheduledFutureTask包装

compareTo.png

任务先按触发时刻排序,同时触发的任务按提交顺序排序

run.png setNextRunTime.png triggerTime.png

如果是重复任务,任务执行完,计算下次触发时刻,重新加入任务队列。此处有一个细节:就算是fixed-rate的任务,也是上次执行完之后才会再次加入任务队列。

onShutdown.png

shutdown之后不允许提交新任务,如果是之前提交的延迟任务还没到时间或者是周期性任务,根据参数决定是否还能继续执行,默认运行继续等待执行延迟任务,不允许执行周期任务。

ForkJoinPool:日后补充!!!

下面来分析一下Executors里的静态方法构造的都是什么线程:

newFixedThreadPool.png

无界队列,不支持timeout,固定线程数。

newSingleThreadExecutor.png

newSingleThreadExecutor = newFixedThreadPool(1)

newCachedThreadPool.png

使用特殊的队列SynchronousQueue,相当于容量为1的阻塞队列,只有这样,如果已经有任务在等待执行了,再次提交任务时才会启动非core线程。

newScheduledThreadPool.png

相关文章

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

  • 线程池

    JDK线程池 为什么要用线程池 线程池为什么这么设计 线程池原理 核心线程是否能被回收 如何回收空闲线程 Tomc...

网友评论

      本文标题:线程池

      本文链接:https://www.haomeiwen.com/subject/lntprftx.html