一、前言:
// 传统开启线程方式
Thread(Runnable {
//to do异步请求
}).start()
1、使用new Thread()创建线程存在的问题
- 1、如果在一个list每一个item都创建一个Thread,list量大的话会大量创建Thread,导致内存抖动,GC频繁的回收。要知道,GC的回收是在主线程的,这样会导致卡顿。
- 2、线程过多,导致各个线程竞争抢夺CPU执行权,线程的频繁切换导致效率的降低。
- 3、ListView的每一个item滑出窗口,线程无法停止也无法控制。
2、使用线程池的好处
- 1、重用已经创建的好的线程,避免频繁创建进而导致的频繁GC
- 2、控制线程并发数,合理使用系统资源,提高应用性能
- 3、可以有效的控制线程的执行,比如定时执行,取消执行等
二、ThreadPoolExecutor线程池的使用:
1、创建线程池
//创建线程池
var threadPoolExecutor = ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
Executors.defaultThreadFactory(),
RejectedExecutionHandler { _, _ ->
Log.d("ThreadPoolManager","$ThreadPoolManager RejectedExecutionHandler----")
}
)
2、创建线程池 ThreadPoolExecutor 7个参数
-
corePoolSize线程池中核心线程的数量; -
maximumPoolSize线程池中最大线程数量,等待队列的任务塞满了之后,才会触发开启非核心线程,直到总线程数达到 maximumPoolSize -
keepAliveTime非核心线程的超时时长,当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,则该参数也作用于核心线程的超时时长 -
unit第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等 -
workQueue线程池中的任务队列,该队列主要用来存储已经被提交但是尚未执行的任务。存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。这个队列任务塞满了之后,才会触发开启非核心线程 -
threadFactory为线程池提供创建新线程的功能,这个我们一般使用默认即可 -
handler 拒绝策略,当线程无法执行新任务时(一般是由于线程池中的线程数量已经达到最大数或者线程池关闭导致的),默认情况下,当线程池无法处理新线程时,会抛出一个RejectedExecutionException。
3、 线程池 ThreadPoolExecutor 的方法
- 1、
shutDown()关闭线程池,不影响已经提交的任务 - 2、
shutDownNow()关闭线程池,并尝试去终止正在执行的线程 - 3、
allowCoreThreadTimeOut(boolean value)允许核心线程闲置超时时被回收
4、 线程池封装类
/***
* Created by lyy on 2023/01/17.
* 线程池封装类
*
* 使用:ThreadPoolManager.getInstance().addTask("TAG", runnable)
* */
class ThreadPoolManager private constructor() {
private var threadPoolMap = hashMapOf<String, ThreadPoolExecutor>()
/**
* cpu数量
* */
private val CPU_COUNT = Runtime.getRuntime().availableProcessors()
/**
* 核心线程数为手机CPU数量+1
* */
private val CORE_POOL_SIZE = CPU_COUNT + 1
/**
* 最大线程数为手机CPU数量×2+1
* */
private val MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1
/**
* 线程活跃时间 秒,超时线程会被回收
* */
private val KEEP_ALIVE_TIME: Long = 60
/**
* 等待队列大小
* */
private val QUEUE_SIZE = 128
companion object {
fun getInstance() = SingleHolder.SINGLE_HOLDER
}
object SingleHolder {
val SINGLE_HOLDER = ThreadPoolManager()
}
/**
* @param tag 针对每个TAG 获取对应的线程池
* @param corePoolSize 线程池中核心线程的数量
* @param maximumPoolSize 线程池中最大线程数量
* @param keepAliveTime 非核心线程的超时时长,
* 当系统中非核心线程闲置时间超过keepAliveTime之后,则会被回收
* 如果ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,
则该参数也作用于核心线程的超时时长
* @param unit 第三个参数的单位,有纳秒、微秒、毫秒、秒、分、时、天等
* @param queueSize 等待队列的长度 一般128 (参考 AsyncTask)
* workQueue 线程池中的任务队列,
该队列主要用来存储已经被提交但是尚未执行的任务。
存储在这里的任务是由ThreadPoolExecutor的execute方法提交来的。
* threadFactory 为线程池提供创建新线程的功能,这个我们一般使用默认即可
*
* 1.ArrayBlockingQueue:这个表示一个规定了大小的BlockingQueue,ArrayBlockingQueue的构造函数接受一个int类型的数据,
* 该数据表示BlockingQueue的大小,存储在ArrayBlockingQueue中的元素按照FIFO(先进先出)的方式来进行存取。
* 2.LinkedBlockingQueue:这个表示一个大小不确定的BlockingQueue,在LinkedBlockingQueue的构造方法中可以传
* 一个int类型的数据,这样创建出来的LinkedBlockingQueue是有大小的,也可以不传,不传的话,
* LinkedBlockingQueue的大小就为Integer.MAX_VALUE
* */
private fun getThreadPool(tag: String): ThreadPoolExecutor {
var threadPoolExecutor = threadPoolMap[tag]
if (threadPoolExecutor == null) {
threadPoolExecutor = ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
ArrayBlockingQueue<Runnable>(QUEUE_SIZE),
Executors.defaultThreadFactory(),
RejectedExecutionHandler { _, _ ->
Log.d("ThreadPoolManager", "$ThreadPoolManager RejectedExecutionHandler----")
}
)
//允许核心线程闲置超时时被回收
threadPoolExecutor.allowCoreThreadTimeOut(true)
threadPoolMap[tag] = threadPoolExecutor
}
return threadPoolExecutor
}
/**
* @param tag 针对每个TAG 获取对应的线程池
* @param runnable 对应的 runnable 任务
* */
fun removeTask(tag: String, runnable: Runnable) {
getThreadPool(tag)?.queue?.remove(runnable)
}
/**
* @param tag 针对每个TAG 获取对应的线程池
* @param runnable 对应的 runnable 任务
* */
fun addTask(tag: String, runnable: Runnable) {
getThreadPool(tag).execute(runnable)
}
/**
* @param tag 针对每个TAG 获取对应的线程池
* 取消 移除线程池
* */
//shutDown():关闭线程池后不影响已经提交的任务
//shutDownNow():关闭线程池后会尝试去终止正在执行任务的线程
fun exitThreadPool(tag: String) {
var threadPoolExecutor = threadPoolMap[tag]
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow()
threadPoolMap.remove(tag)
}
}
}
5、使用:
- 1、用一个hashMap 存储对应 TAG 的每一个线程池
- 2、添加线程任务 通过 addTask(tag: String, runnable: Runnable) 添加
- 3、移除线程任务 通过 removeTask(tag: String, runnable: Runnable) 删除
// 添加线程任务
ThreadPoolManager.Companion.getInstance().addTask("google", new Runnable() {
@Override
public void run() {
//执行子线程任务...
//Thread.currentThread().getName().equals("main") //判断是否是主线程
}
});
这个要先说明一点,系统的API ThreadPoolExecutor 并没有提供接口移除任务,removeTask()方法里面是通过ThreadPoolExecutor 获取其对应的队列 BlockingQueue,通过队列 移除 线程任务 Runnable。因为这个BlockingQueue 就是我们创建 ThreadPoolExecutor 实例时通过构造方法 传进去的 ArrayBlockingQueue ,我们后续通过线程池添加任务,都会放进这个队列的。线程池执行任务,都会从这个队列里面取出来。
6、场景一 核心线程数大于 任务数
我们把 核心线程数 CORE_POOL_SIZE 设置成20 ,最大线程数 MAXIMUM_POOL_SIZE 设置成30,等待队列维持128
然后添加10个任务。很明显,任务都是一下子执行完。
// 创建十个任务
for (i in 1..10) {
val runnable = Runnable {
val sdf = SimpleDateFormat("yyyy-MM-dd HH:mm:ss:ms")
val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
Logger.d("ThreadPoolManager Runnable ---- $i time: $time" )
}
ThreadPoolManager.getInstance().addTask("TAG", runnable)
}
看下log 而且是按顺序执行的,看下时间,几乎在同一时间执行的
图片.png
7、场景二 核心线程数 小于 任务数
我们把 核心线程数 CORE_POOL_SIZE 设置成3 ,最大线程数 MAXIMUM_POOL_SIZE 设置成5,等待队列维持128。为模拟线程执行的耗时操作,每个线程执行完功能后睡眠5秒。
// 创建十个任务
for (i in 1..10) {
val runnable = Runnable {
val sdf = SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒")
val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
Logger.d("ThreadPoolManager Runnable ---- 任务$i 执行时间是: $time")
//模拟耗时操作,睡眠5秒
Thread.sleep(5000)
}
ThreadPoolManager.getInstance().addTask("TAG", runnable)
}
看打印的结果:前3(1-3)个先执行,过5秒再执行3(4-6)个,再过5秒再执行三个(7-9),最后执行第10个
图片.png
分析线程池的流程:
- 1、因为任务数是10,最大线程数是5,核心线程只有3,等待队列维持128.
- 2、所以当向任务队列添加前3个任务时,都会先创建3个核心线程出来执行前面三个任务.所以任务1,2,3会最先执行。
- 3、当向线程池添加到第4个任务时,因为核心线程已经达到上限了,但任务队列里面(最大128)还没填满 ,所以不会开启新的非核心线程去执行任务,而是把任务放进等待队列里面等待,等有空闲的核心线程再去执行等待队列的任务。
- 4、所以都是3个任务3个任务的执行。
注意:特别重要-至于其他场景,是遵循以下规律的:
- 1、execute一个线程之后,如果线程池中的线程数未达到核心线程数,则会立马启用一个核心线程去执行
- 2、execute一个线程之后,如果线程池中的线程数已经达到核心线程数,且workQueue未满,则将新线程放入workQueue中等待执行
- 3、execute一个线程之后,如果线程池中的线程数已经达到核心线程数但未超过非核心线程数,且workQueue已满,则开启一个非核心线程来执行任务
- 4、execute一个线程之后,如果线程池中的线程数已经超过非核心线程数,则拒绝执行该任务
8、场景三 移除已经提交的任务
这个场景是,我提交了一个任务到线程池,但是在未执行时想取消这个任务。这个场景在RecycleView的每个item快速滑动时很常见。item进入窗口时,要执行一个异步任务,但还没执行到,这个item就快速滑出屏幕。这个时候,就可以通过线程池的队列里把这个任务移除掉。废话少说,上模拟代码。
我们把 核心线程数 CORE_POOL_SIZE 设置成3 ,最大线程数 MAXIMUM_POOL_SIZE 设置成5,等待队列维持128。
var runnable6: Runnable? = null
var runnable7: Runnable? = null
var runnable8: Runnable? = null
var runnable9: Runnable? = null
for (i in 1..10) {
val runnable = Runnable {
val sdf = SimpleDateFormat("yyyy年MM月dd日 HH时mm分ss秒")
val time = sdf.format(Date(java.lang.Long.parseLong(System.currentTimeMillis().toString())))
Logger.d("ThreadPoolManager Runnable ---- 任务$i 执行时间是: $time")
//模拟耗时操作,睡眠5秒
Thread.sleep(5000)
}
ThreadPoolManager.getInstance().addTask("TAG", runnable)
if (i == 6) {
runnable6 = runnable
}
if (i == 7) {
runnable7 = runnable
}
if (i == 8) {
runnable8 = runnable
}
if (i == 9) {
runnable9 = runnable
}
}
//添加后,立马移除
ThreadPoolManager.getInstance().removeTask("TAG", runnable6!!)
ThreadPoolManager.getInstance().removeTask("TAG", runnable7!!)
ThreadPoolManager.getInstance().removeTask("TAG", runnable8!!)
ThreadPoolManager.getInstance().removeTask("TAG", runnable9!!)
运行结果:
图片.png
分析线程池的流程:
- 1、因为任务数是10,最大线程数是5,核心线程只有3,等待队列维持128.
- 2、所以当向任务队列添加前3个任务时,都会先创建3个核心线程出来执行前面三个任务.所以任务1,2,3会最先执行。
- 3、当向线程池添加到第4-10个任务时,把任务放进等待队列里面等待。
- 4、所以通过当前线程池,获取对应队列 queue,然后remove掉这个任务。
这个removeTask()就是这么实现的:
/**
* @param tag 针对每个TAG 获取对应的线程池
* @param runnable 对应的 runnable 任务
* */
fun removeTask(tag: String, runnable: Runnable) {
getThreadPool(tag)?.queue?.remove(runnable)
}
这个queue,就是我们创建线程池实例时,通过构造放传进去的ArrayBlockingQueue(),这个队列用来存放等待执行的任务Runnable。通过这个队列删除了就不会执行。
这个源码通过源码亦可以验证的:
ThreadPoolExecutor的构造方法:
图片.png
通过queue获取的
图片.png
通过定位代码,明显是同一个变量。
三、Java的四种线程池封装
1、newCachedThreadPool:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:
/**
* newCachedThreadPool线程池
*/
fun cachedThreadPool() {
val newCachedThreadPool = Executors.newCachedThreadPool()
newCachedThreadPool.execute {
//执行子线程功能
}
}
底层源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
2、 newFixedThreadPool:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:
/**
* newFixedThreadPool线程池
*/
fun fixedThreadPool() {
val newFixedThreadPool = Executors.newFixedThreadPool(3)
newFixedThreadPool.execute {
//执行子线程功能
}
}
底层源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
3、newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:
/**
* newScheduledThreadPool线程池
*/
fun scheduledThreadPool() {
val newScheduledThreadPool = Executors.newScheduledThreadPool(3)
newScheduledThreadPool.execute {
//执行子线程功能
}
}
底层源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
4、newSingleThreadExecutor:创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:
/**
* newSingleThreadExecutor线程池
*/
fun singleThreadExecutor() {
val newSingleThreadExecutor = Executors.newSingleThreadExecutor()
newSingleThreadExecutor.execute {
//执行子线程功能
}
}
底层源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
参考:https://blog.csdn.net/Leo_Liang_jie/article/details/90263072









网友评论