美文网首页
20.多线程总结(七)-ThreadPoolExecutor线程

20.多线程总结(七)-ThreadPoolExecutor线程

作者: 任振铭 | 来源:发表于2020-04-20 16:48 被阅读0次

1.如何创建一个线程池?

        //核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 20;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 1;
        //时间单位
        TimeUnit timeUnit = TimeUnit.MINUTES;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );

2.线程池运行机制

a.new线程池时,线程池工作队列中已经被添加的Runnable是否会立即被执行?

不会,除非new之后调用prestartAllCoreThreads启动所有核心线程

//核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 20;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 1;
        //时间单位
        TimeUnit timeUnit = TimeUnit.MINUTES;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        //往工作队列添加任务
        for (int i = 0; i < 10; i++) {
            workQueue.put(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );
        //不调用不会执行
        //executor.prestartAllCoreThreads();
b.当添加的任务个数X小于核心线程数Y时,线程池会启动几个线程?

小于等于X,具体多少取决于每个任务执行的时间,我们取两种极端的情况来说明

第一

假如for循环添加了5个任务,核心线程数为10,这5个任务执行的时间相对都比较长,假设每个任务都需要6s执行完成,那么最终一定有5个线程被创建出来在运行,可以检验一下,为什么?任务数小于核心线程数的时候,在没有空闲线程的情况下会创建新的线程来执行任务

        //核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 20;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 1;
        //时间单位
        TimeUnit timeUnit = TimeUnit.MINUTES;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );
        for (int i = 0; i < 5; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
                    try {
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"执行完成");
                }
            });
        }
第二

既然举得是极端的例子,那就可能不切实际,同样假如for循环添加了5个任务,核心线程数为10,我们假线程从创建执行到完成的时间无限接近于0,那么在主线程创建这5个线程的时候,首先创建第一个,然后在即将创建第2个线程的时候,发现第一个线程已经执行完成,线程池中有一个空闲线程,那么就不会再创建新的线程,而直接使用这个线程,这样以来,整个过程中只有一个活跃线程,虽然这不太可能发生,但也能说明一下问题,既创建的线程个数是和任务有关系的

c.当添加的任务个数X大于核心线程数Y,但是小于核心线程数和工作队列Z的和时(Y < X < Y+Z),线程池会启动几个线程?

最多启动Y个核心线程,有可能小于Y,同上取决于任务执行的时间。如下,核心线程数为10,工作队列大小为10,开启了15个任务,无法被立即执行的任务会放入队列等待

        //核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 20;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 1;
        //时间单位
        TimeUnit timeUnit = TimeUnit.MINUTES;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );
        for (int i = 0; i <15 ; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
                }
            });
        }

打印结果:
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:8
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:9
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:10
线程池中活跃线程个数为:5
线程池中活跃线程个数为:5
线程池中活跃线程个数为:3
线程池中活跃线程个数为:3
线程池中活跃线程个数为:2

d.当添加的任务个数X大于核心线程数Y+ 工作队列长度Z的和(X>Y+Z)时,启动的线程个数:
(a). X(任务数)- Z(工作队列长度) <= M(线程池最大容量),此时会启动 X(任务数)- Z(工作队列长度) 个线程,如下,核心线程数10,队列长度20,线程池最大容量30,启动50个任务
//核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 30;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 30;
        //时间单位
        TimeUnit timeUnit = TimeUnit.SECONDS;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );
        for (int i = 0; i <50 ; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
                }
            });
        }

打印结果:
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:29
。。。
(b). X(任务数)- Z(工作队列长度) > M(线程池最大容量),此时会启动 M(线程池最大容量)个线程,超出的线程执行拒绝策略,如下,核心线程数10,队列长度20,线程池最大容量30,启动51个任务
//核心线程池大小
        int corePoolSize = 10;
        //线程池最大容量
        int maximunPoolSize = 30;
        //当线程数量大于核心线程数量时,多余的线程在终止之前等待新任务的最大时间
        long keepAliveTime = 30;
        //时间单位
        TimeUnit timeUnit = TimeUnit.SECONDS;
        //工作队列
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);
        //线程工厂
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        //拒绝策略
        RejectedExecutionHandler rejectHandler =  new ThreadPoolExecutor.AbortPolicy();

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                corePoolSize,
                maximunPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectHandler
        );
        for (int i = 0; i <51 ; i++) {
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程池中活跃线程个数为:"+executor.getActiveCount());
                }
            });
        }

打印结果:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@36baf30c rejected from java.util.concurrent.ThreadPoolExecutor@7a81197d[Running, pool size = 30, active threads = 30, queued tasks = 20, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at TestThreadPool.main(TestThreadPool.java:30)
线程池中活跃线程个数为:30
线程池中活跃线程个数为:30
线程池中活跃线程个数为:29
线程池中活跃线程个数为:27
线程池中活跃线程个数为:28
线程池中活跃线程个数为:28
线程池中活跃线程个数为:28
线程池中活跃线程个数为:27
。。。

3.线程池拒绝策略

我们代码中使用的拒绝策略是
RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
其实系统给我们提供了几种,都是通过继承RejectedExecutionHandler类实现的,当然也可以自定义,都是比较简单的,我们来看看系统提供的这几种,核心就在rejectedExecution方法

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

AbortPolicy:直接抛出异常,组织系统正常工作

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

CallerRunsPolicy:只要线程池没有shutDown,就在调用者线程中执行当前被丢弃的任务

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

DiscardPolicy:直接丢弃任务

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }

DiscardOldestPolicy:丢弃最老的一个任务(任务队列中第一个),再尝试提交任务

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

要自定义只需要继承RejectedExecutionHandler,重写方法rejectedExecution,在里边实现自己的策略即可

Executors框架

1.Executor、Executors、ExecutorService,ThreadPoolExecutor的区别?

Executor是一个接口,而且是线程池的顶层接口,继承关系为ThreadPoolExecutor继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,ExecutorService接口又实现了Executor接口,Executor接口只提供了一个方法execute用于任务的执行,ExecutorService接口在Executor基础上进行扩展,提供了线程任务生命周期管理的方法
,AbstractExecutorService则是ExecutorService接口的一个抽象类,ThreadPoolExecutor则是AbstractExecutorService抽象类的一个具体子类

我们这样要说的是Executors,注意他和Executor 的区别,Executors是一个线程池框架,你可以把它看作一个工具类,因为它内部是通过封装了ThreadPoolExecutor来提供一些不同功能的线程池,主要的区别就是ThreadPoolExecutor的那些参数,核心线程数啊,最大线程数啊这些不同而已

2.Executors提供的几种线程池
a.newCachedThreadPool()

这个线程池没有核心线程,最大线程数为Integer.MAX_VALUE,表明在任务足够多的情况下,他可以创建无数的线程来执行。这一特点决定了它适用于一些数量多但是执行时间较短的任务,任务执行完成之后,新的任务可以重用空闲的线程,可以保证每一个任务都立即有线程来执行。如果执行耗时长的任务会导致极多的线程被创建出来,对内存的威胁会很大。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
b.newFixedThreadPool()

这是一个固定线程数量的线程池,他的核心线程数就等于最大线程数,所以无论创建多少任务,都最多只会有这么多个线程运行,多余的任务进入队列排队,队列装不下的执行拒绝策略

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
c.newScheduledThreadPool()

能延迟执行,定时执行的线程池

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
d.newWorkStealingPool()

工作窃取,使用多个队列来减少竞争

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
e.newSingleThreadExecutor()

只有一个线程的线程池,无论提交多少任务,都是一个一个排队执行

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
f.newSingleThreadScheduledExecutor()

单线程能延迟执行,定时执行的线程池

    public static ScheduledExecutorService newSingleThreadScheduledExecutor()  {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

线程池使用注意事项

1.避免使用Executors框架创建线程池

使用Executors创建的线程池需要对Executors框架创建的各个线程池特点和使用场景有一个深刻的认识,不然很容易发生OOM,比如newFixedThreadPoolnewSingleThreadExecutor允许的请求队列长度为Integer.MAX_VALUE,可能导致请求队列中堆积大量等待的任务,导致内存耗尽

如下例子,创建一个线程数为2 的固定大小线程池,然后不断的往里边添加任务,因为队列可以存放Integer.MAX_VALUE个任务,所以内存剧增,导致发生了oom(为了oom出现的更明显,我们配置一下VM Options)


屏幕快照 2020-04-22 下午8.35.58.png
屏幕快照 2020-04-22 下午8.36.47.png

在VM Options选项中添加这些配置

-Xms60m  (设置程序初始化的时候内存栈的大小为60M)
-Xmx60m  (设置你的应用程序(不是JVM)能够使用的最大内存数60M)
-XX:+HeapDumpOnOutOfMemoryError  (发生OOM时将栈信息dump到HeapDumpPath指定的路径中)
-XX:HeapDumpPath=/Users/renzm/Desktop/a/b

添加完配置之后记得点一下save configuration选项
发生OOM时就可以去HeapDumpPath指定的路径下找到异常文件java_pid25049.hprof了。这个文件怎么查看呢,可以在IBM官网下载一个HeapAnalyzer软件,他是一个jar包,通过命令行启动,java -jar .\ha456.jar,ha456是jar包名称,启动后打开java_pid25049.hprof文件,查看chart项,大致会是这个样子

屏幕快照 2020-04-22 下午8.47.18.png
从图中可以看出,有99%以上的内存都是被线程池的工作队列占用了,可以定位到oom发生的位置
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(2);
        while(true){
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

打印:
pool-1-thread-1
pool-1-thread-2java.lang.OutOfMemoryError: GC overhead limit exceeded

Dumping heap to /Users/renzm/Desktop/a/b/java_pid25049.hprof ...
pool-1-thread-1
Heap dump file created [100583477 bytes in 0.873 secs]
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.AbstractExecutorService.newTaskFor(AbstractExecutorService.java:87)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:111)
    at TestExecutors.main(TestExecutors.java:11)
pool-1-thread-2

newCachedThreadPoolnewScheduledThreadPool 允许创建的线程数量为Integer.MAX_VALUE,可能导致大量线程被创建,导致OOM

2.创建的线程池核心线程数量不要过大

核心线程数量过大会导致CPU时间片的频繁切换

为什么要用线程池?

a.降低资源消耗。 通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
b.提高响应速度。 当任务到达时,任务可以不需要的等到线程创建就能立即执行。
c.提高线程的可管理性。 线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

《阿里巴巴Java开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险**
Executors 返回线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致OOM。
CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致OOM。

线程池的工作原理:线程池可以减少创建和销毁线程的次数,从而减少系统资源的消耗,当一个任务提交到线程池时

a. 首先判断核心线程池中的线程是否已经满了,如果没满,则创建一个核心线程执行任务,否则进入下一步
b. 判断工作队列是否已满,没有满则加入工作队列,否则执行下一步
c. 判断线程数是否达到了最大值,如果不是,则创建非核心线程执行任务,否则执行饱和策略,默认抛出异常

线程池的种类

1.FixedThreadPool:可重用固定线程数的线程池,只有核心线程,没有非核心线程,核心线程不会被回收,有任务时,有空闲的核心线程就用核心线程执行,没有则加入队列排队
2.SingleThreadExecutor:单线程线程池,只有一个核心线程,没有非核心线程,当任务到达时,如果没有运行线程,则创建一个线程执行,如果正在运行则加入队列等待,可以保证所有任务在一个线程中按照顺序执行,和FixedThreadPool的区别只有数量
3.CachedThreadPool:按需创建的线程池,没有核心线程,非核心线程有Integer.MAX_VALUE个,每次提交
任务如果有空闲线程则由空闲线程执行,没有空闲线程则创建新的线程执行,适用于大量的需要立即处理的并且耗时较短的任务
4.ScheduledThreadPoolExecutor:继承自ThreadPoolExecutor,用于延时执行任务或定期执行任务,核心线程数固定,线程总数为Integer.MAX_VALUE

相关文章

网友评论

      本文标题:20.多线程总结(七)-ThreadPoolExecutor线程

      本文链接:https://www.haomeiwen.com/subject/eejivhtx.html