美文网首页程序员
Java内置线程池ThreadPoolExecutor源码分析教

Java内置线程池ThreadPoolExecutor源码分析教

作者: java高级编程中心 | 来源:发表于2019-02-24 22:31 被阅读7次

背景

公司业务性能优化,使用java自带的Executors. newFixedThreadPool()方法生成线程池。但是其内部定义的 LinkedBlockingQueue容量是Integer. MAX_VALUE。考虑到如果数据库中待处理数据量很大有可能会在短时间内往 LinkedBlockingQueue中填充很多数据,导致内存溢出。于是看了一下线程池这块的源码,并在此记录。

小编整理了一些java进阶学习资料和面试题,需要资料的请加JAVA高阶学习Q群:664389243 这是小编创建的java高阶学习交流群,加群一起交流学习深造。群里也有小编整理的2019年最新最全的java高阶学习资料!

类图

Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;submit() 方法

ThreadPoolExecutor继承了类AbstractExecutorService。实现了execute(Runnable)方法。

Executors提供的集中工厂方法都是调用的ThreadPoolExecutor的构造方法。因为这个构造方法参数比较多 所以提供了几个经典的实现。

ExecutorService newCachedThreadPool = Executors.newFixedThreadPool();

ExecutorService newCachedThreadPool = Executors.newSingleThreadExecutor();

ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

ExecutorService newCachedThreadPool = Executors.newScheduledThreadPool();

本篇违章主要包括以下几点内容。这也是解决背景中提到的问题的主要历程。

1.ThreadPoolExecutor构造方法

2.ExecutorService submit() 方法的实现

2.Executor execute() 方法的实现

3.reject() 拒绝策略

ThreadPoolExecutor构造方法

构造方法中赋值的成员标量:

// 构造方法中用到的成员变量

private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)

private volatile int maximumPoolSize; //线程池最大能容忍的线程数

private volatile long keepAliveTime; //线程空闲之后存货时间 (线程数量大于corePoolSize之后)

private final BlockingQueue workQueue; //任务缓存队列,用来存放等待执行的任务

private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程

private volatile RejectedExecutionHandler handler; //任务拒绝策略

通过代码可以知道 Executors提供的集中工厂方法实际都是调用的同一个ThreadPoolExecutor的构造方法。当然我们也可以通过自己调用ThreadPoolExecutor构造方法 自己设置参数 从而获得很贴合我们业务的线程池。

AbstractExecutorService submit() 方法

/**

* @throws RejectedExecutionException {@inheritDoc}

* @throws NullPointerException {@inheritDoc}

*/

public Future submit(Callable task) {

if (task == null) throw new NullPointerException();

RunnableFuture ftask = newTaskFor(task);

execute(ftask);

return ftask;

}

其实是调用了execute() 方法,execute()方法 由ThreadPoolExecutor类实现。

ThreadPoolExecutor execute()方法

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 29位

private static final int COUNT_BITS = Integer.SIZE - 3;

// 0001 1111 1111 1111 1111 1111 1111 1111

private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl

// 高三位 代表 状态

private static int runStateOf(int c) { return c & ~CAPACITY; }

// 低三位 代表 数量

private static int workerCountOf(int c) { return c & CAPACITY; }

// 把状态和数量两个值 揉在一起

// private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

// 获取到当前有效的线程数和线程池的状态

int c = ctl.get();

// 1.获取当前正在运行线程数是否小于核心线程池,是则新创建一个线程执行任务,否则将任务放到任务队列中

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

// 2.当前核心线程池中全部线程都在运行workerCountOf(c) >= corePoolSize,所以此时将线程放到任务队列中

// 线程池是否处于运行状态,且是否任务插入任务队列成功。注意这块 && 是做了优化如果前面条件失败后面语句不会处理

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

//在此检查线程池是否处于运行状态,如果不是则使刚刚的任务出队。和上面一样 && 是做了优化如果前面条件失败后面语句不会处理

if (! isRunning(recheck) && remove(command))

reject(command);

// 如果没有执行的线程,就再开启一个线程(有可能没有核心线程)

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 3.插入队列不成功 offer() 方法失败是因为队列满了,此时就新创建线程去执行任务,创建失败抛出异常

else if (!addWorker(command, false))

reject(command);

}

// CAS修改clt的值+1,成功退出cas循环,失败继续

if (compareAndIncrementWorkerCount(c))

break retry;

//将新建的线程加入到线程池中

workers.add(w);

int s = workers.size();

//修正largestPoolSize的值

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

addWorker()方法 总结起来就两部分

1.CAS+失败重试操作来将线程数加1

2.新建一个线程并启用。

RejectedExecutionHandler拒绝策略

java 内置的四种拒绝策略。

public static class AbortPolicy implements RejectedExecutionHandler // 抛出java.util.concurrent.RejectedExecutionException异常

public static class CallerRunsPolicy implements RejectedExecutionHandler //直接在 execute 方法的调用线程中运行被拒绝的任务。如果执行程序已关闭,则会丢弃该任务

public static class DiscardPolicy implements RejectedExecutionHandler // 不做任何处理 直接丢弃

public static class DiscardOldestPolicy implements RejectedExecutionHandler // 丢弃老的

自定义拒绝策略:

new RejectedExecutionHandler() {

// 自定义拒绝策略

@Override

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

try {

// 如果LinkedBlockingQueue存满了,阻塞等待有空间后再加入元素。(put方法是阻塞的)

LOGGER.info("LinkedBlockingQueue has been full ");

// put() 方法是阻塞的,如果队列没有空间会一直等待。

executor.getQueue().put(r);

LOGGER.info("thread has been put in");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

总结一点:当用java内置的一些工具的时候,如果有不理解的一定要 深入去看源码。从根本上找解决思路。

小编整理了一些java进阶学习资料和面试题,需要资料的请加JAVA高阶学习Q群:664389243 这是小编创建的java高阶学习交流群,加群一起交流学习深造。群里也有小编整理的2019年最新最全的java高阶学习资料!

相关文章

网友评论

    本文标题:Java内置线程池ThreadPoolExecutor源码分析教

    本文链接:https://www.haomeiwen.com/subject/addiyqtx.html