美文网首页
线程池第一印象~

线程池第一印象~

作者: SonyaBaby | 来源:发表于2019-05-28 18:24 被阅读0次
进阶线程池啦~.png ThreadPoolExecutor 继承关系.png ThreadPoolExecutor 方法Structure.png AbstractExecutorService 方法Structure.png ExecutorService 方法Structure.png 最顶层接口 Executor.png

ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService {
...
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
    }

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
...
}
ThreadPoolExecutor 四个构造器.png

看源码可知前三个构造器最终调用的都是第四个进行初始化工作。

workQueue 等待队列

  • ArrayBlockingQueue 使用较少。必须指定 capacity,即有界队列
  • PriorityBlockingQueue 使用较少。默认大小 DEFAULT_INITIAL_CAPACITY = 11,最大MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8,即无界有序队列
  • LinkedBlockingQueue 默认大小 Integer.MAX_VALUE,即无界队列
  • SynchronousQueue 内部并没有数据缓存空间,一旦有了插入线程和移除线程,元素很快就从插入线程移交给移除线程(快速传递元素的方式),在多任务队列中是最快处理任务的方式,元素总是以最快的方式从生产者传递给消费者。典型应用是Executors.newCachedThreadPool(),这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。
  • 线程池的排队策略与所选的 BlockingQueue 有关。

handler 拒绝处理任务时使用的策略

  • ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常,会阻止正常工作。
  • ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常,系统正常工作。
  • ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务(即将被执行的),然后重新尝试提交当前任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy 由调用线程处理该任务

核心方法
execute() 是在Executor接口中的声明,通过这个方法可以向线程池提交一个任务,交由线程池去执行
submit() 是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果
shutdown() 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow() 立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

线程池状态

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    /**
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don't accept new tasks, but process queued tasks
     *   STOP:     Don't accept new tasks, don't process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *   
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
    **/
  • 创建线程池后,初始时,线程池处于RUNNING状态;
  • 调用 shutdown(),则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  • 调用 shutdownNow(),则线程池处于STOP状态,此时线程池不能接受新的任务,并且尝试终止正在执行的任务;
  • 当线程池处于SHUTDOWN或STOP状态,队列和线程池中都为空的情况,即所有任务都已被终止,workerCount 标记为 0,则处于TIDYING状态
  • 当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁(处于TIDYING状态),任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

任务的执行相关重要参数

 // 任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
// 线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();

// Accessed only under mainLock.
// 用来存放工作集   
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 用来记录已经执行完毕的任务个数
private long completedTaskCount;

// 线程工厂类,用来创建线程
private volatile ThreadFactory threadFactory;
// 任务拒绝策略
private volatile RejectedExecutionHandler handler;
// 线程存活时间
private volatile long keepAliveTime;
// 是否允许为核心线程设置存活时间
private volatile boolean allowCoreThreadTimeOut;
// 核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int corePoolSize;

// //线程池最大能容忍的线程数. Note that the actual maximum is internally bounded by CAPACITY.
private volatile int maximumPoolSize;

// 默认的任务拒绝策略:丢弃任务并抛出RejectedExecutionException异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

corePoolSize是正常情况下线程池大小,maximumPoolSize是线程池的一种额外Support,即任务量突然过大时的额外可支持的开销
largestPoolSize只是用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
   
    /** 1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first  task.  
      * The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add  threads when it shouldn't, by returning false.
      */
    // 线程池中正在作业的线程数 < corePoolSize数
    if (workerCountOf(c) < corePoolSize) {
        // 是否可以继续向 corePool 中新增线程
        if (addWorker(command, true))
            return;
        // 再次获取当前线程池状态值
        c = ctl.get();
    }

    /** 2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method.
      * So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
      */
    // 线程池为可运行状态,同时Runnable command可以加入等待队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //  非Running状态,则从队列中去掉command并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);

       /** 3. If we cannot queue task, then we try to add a new thread.  
         * If it fails, we know we are shut down or saturated and so reject the task.
         */
         // 新增thread
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 不可以在拓展的线程池中运行该实例,执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

使用 ThreadPoolExecutor 创建线程池:

public class HelloThreadPool {
  public static void main(String[] args) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2,
      100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2));
    
    IntStream.range(0, 4).mapToObj(PrintTask::new).forEach(printTask -> {
      executor.execute(printTask);
      System.out.println("线程池中所有线程数目:" + executor.getPoolSize() + ",队列中待执行的任务数目:" +
        executor.getQueue().size() + ",已执行完的任务数目:" + executor.getCompletedTaskCount());
    });
    
    executor.shutdown();
  }
}

class PrintTask implements Runnable {
  private int taskIndex;
  
  PrintTask(int index) {
    this.taskIndex = index;
  }
  
  @Override
  public void run() {
    System.out.println(taskIndex + " is running...");
     try {
     TimeUnit.SECONDS.sleep(2);
     } catch (InterruptedException e) {
     e.printStackTrace();
     }
    System.out.println(taskIndex + " end");
  }
}

output:

0 is running...
线程池中所有线程数目:1,队列中待执行的任务数目:0,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:1,已执行完的任务数目:0
线程池中所有线程数目:1,队列中待执行的任务数目:2,已执行完的任务数目:0
线程池中所有线程数目:2,队列中待执行的任务数目:2,已执行完的任务数目:0
3 is running...
3 end
0 end
1 is running...
2 is running...
2 end
1 end

当把要执行的实例变成 5 个就会出现 RejectedExecutionException 异常:

java.util.concurrent.RejectedExecutionException: 
Task threadPool.PrintTask@7699a589 rejected from 
java.util.concurrent.ThreadPoolExecutor@58372a00[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]

java doc中,并不提倡我们直接使用 ThreadPoolExecutor ,而是使用 Executors 类中提供的几个静态方法来创建线程池

  • Executors.newCachedThreadPool() 若线程池的当前规模超过了`corePoolSize`,就会回收部分空闲的线程(根据`keepAliveTime`来回收),当需求增加时,线程池又可以智能的添加新线程来处理任务。此线程池大小`Integer.MAX_VALUE`可以认为是不做限制(使用队列`SynchronousQueue`),线程池大小完全依赖于JVM能够创建的最大线程大小

  • Executors.newSingleThreadExecutor() 创建容量为1的线程池,`corePoolSize`和`maximumPoolSize`均为1,使用无界队列`LinkedBlockingQueue`

  • Executors.newFixedThreadPool(int) 创建容量为固定个数n的线程池。`corePoolSize`和`maximumPoolSize`均为n,使用无界队列`LinkedBlockingQueue`

  • Executors.newScheduledThreadPool(int) 创建一个指定corePoolSize的线程池,支持定时及周期性任务执行

  • 如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

参考链接:https://www.cnblogs.com/dolphin0520/p/3932921.html

相关文章

  • 线程池第一印象~

    ThreadPoolExecutor 看源码可知前三个构造器最终调用的都是第四个进行初始化工作。 workQueu...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

网友评论

      本文标题:线程池第一印象~

      本文链接:https://www.haomeiwen.com/subject/kosrtctx.html