美文网首页
OKHTTP基础篇之线程池ThreadPoolExecutor(

OKHTTP基础篇之线程池ThreadPoolExecutor(

作者: CircleLee | 来源:发表于2018-12-25 17:59 被阅读50次

在开发过程中,当我们需要使用线程的时候就常常会去new一个Thread,但是这样写会有什么问题呢?
假如并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
JAVA的线程池可以解决此问题,线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

使用线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池状态

@ReachabilitySensitive
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING: 可以接收新任务,和处理阻塞队列任务;
SHUTDOWN: 不接收新任务,但是可以处理阻塞队列任务;
STOP: 既不接收新任务,也不处理阻塞队列任务,直接终止运行中的任务;
TIDYING: 所有任务都已经终止,有效线程数workcout为0,线程池进入TIDYING状态后会调用 terminated() 方法进入TERMINATED 状态;
TERMINATED:terminated()方法执行完成。

图1 线程池的状态转换过程

ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize:核心线程池大小
maximumPoolSize:线程池最大容量大小
keepAliveTime:线程池空闲时,线程存活时间
unit: keepAliveTime的时间单位
workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
threadFactory:线程工厂
handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

  • AbortPolicy:直接抛出异常,这是默认策略;
  • CallerRunsPolicy:用调用者所在的线程来执行任务;
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardPolicy:直接丢弃任务。

execute方法

ThreadPoolExecutor被初始化后,通过execute方法提交线程任务

  public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {    //1
        if (addWorker(command, true))       
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {     //2
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))    //3
            reject(command);
        else if (workerCountOf(recheck) == 0)      //4
            addWorker(null, false);
    }
    else if (!addWorker(command, false))     //5
        reject(command);
  }

处理步骤:

  1. 判断当前的活动线程数是否小于核心线程大小。如果小于,则新建一个线程放入线程池中,并启动该任务;
  2. 如果当前活动的线程数不小于核心线程池大小,判断当前线程池是否是RUNNING状态。如果是, 则将任务添加到阻塞队列workQueue中;
  3. 重新获取当前线程,并判断是否处于RUNNING状态,如果不是RUNINING状态,则从阻塞队列中删除该任务,并通过handler使用拒绝策略对该任务进行处理,整个方法返回;
  4. 如果条件3判断失败 ,判断当前线程数是否为0。如果等于0,则执行addWorker方法。需要注意的是,这里的addWorker方法第一参数为null,表示在线程池中创建一个线程,但不去启动;第二个参数为false,表示将线程池的有限线程数量的上限设置为maximumPoolSize;
  5. 回到步骤2的判断,步骤2判断失败,调用addWorker方法,如果add失败则直接拒绝任务。

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))   //1 core为true,比较核心线程池大小;false,比较最大线程池大小
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();   //2 启动线程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
  1. 当传入core为false时,检测当前线程数大小是否大于最大线程池大小,如果大于直接返回false。
  2. 启动线程。

总结执行过程:


图2 ThreadPoolExecutor执行过程
  1. 调用ThreadPoolExecutor的execute提交线程任务,检查当前线程大小,如果小于核心线程corePoolSize大小,则新创建线程执行任务;
  2. 如果大于核心线程corePoolSize大小,但是小于最大线程池maximumPoolSize大小,则将线程任务加入到BlockingQueue队列,等待处理;
  3. 如果大于最大线程池maximumPoolSize大小,则直接reject。

相关文章

网友评论

      本文标题:OKHTTP基础篇之线程池ThreadPoolExecutor(

      本文链接:https://www.haomeiwen.com/subject/bkimkqtx.html