美文网首页
juc10-ThreadPoolExecutor与Schedul

juc10-ThreadPoolExecutor与Schedul

作者: modou1618 | 来源:发表于2019-02-05 21:55 被阅读0次

0 类图

类图.png

一 AbstractExecutorService

1.1 提交线程执行任务

  • 纪录为FutureTask类对象
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

1.2 线程池执行任意任务

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                          boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (tasks == null)
        throw new NullPointerException();
    int ntasks = tasks.size();
    if (ntasks == 0)
        throw new IllegalArgumentException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
    ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
    try {
        // Record exceptions so that if we fail to obtain any
        // result, we can throw the last exception we got.
        ExecutionException ee = null;
//限时设置
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        Iterator<? extends Callable<T>> it = tasks.iterator();

        // Start one task for sure; the rest incrementally
        futures.add(ecs.submit(it.next()));//提交线程池任务,并执行
        --ntasks;
        int active = 1;

        for (;;) {
            Future<T> f = ecs.poll();//获取执行完成的线程池任务
            if (f == null) {
                if (ntasks > 0) {
                    --ntasks;//依次提交所有的待执行任务
                    futures.add(ecs.submit(it.next()));
                    ++active;
                }
                else if (active == 0)//已无执行中任务
                    break;
                else if (timed) {//有限时控制
                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                    if (f == null)
                        throw new TimeoutException();
                    nanos = deadline - System.nanoTime();
                }
                else
                    f = ecs.take();//无数据则休眠等待
            }
            if (f != null) {//有任务完成
                --active;
                try {
                    return f.get();//有一个任务完成就返回任务执行结果
                } catch (ExecutionException eex) {
                    ee = eex;
                } catch (RuntimeException rex) {
                    ee = new ExecutionException(rex);
                }
            }
        }

        if (ee == null)
            ee = new ExecutionException();
        throw ee;

    } finally {
        for (int i = 0, size = futures.size(); i < size; i++)
            futures.get(i).cancel(true);//清除
    }
}

1.2.1 FutureTask

  • 任务执行
public void run() {
//更新线程状态
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();//线程函数执行
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);//异常
            }
            if (ran)
                set(result);//成功
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)//中断退出
            handlePossibleCancellationInterrupt(s);
    }
}
  • 成功完成通知set
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更新状态
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();//通知完成
    }
}
  • 执行异常通知setException
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更新状态
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();//通知完成
    }
}
  • 线程任务完成处理,通知等待任务线程完成的线程。
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {//遍历等待线程任务完成的线程链表
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);//唤醒线程,继续处理
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();//子类实现执行done处理

    callable = null;        // to reduce footprint
}
  • get获取任务执行结果,若任务未完成则休眠等待
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
//插入等待完成链表中,休眠等待线程任务完成。
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);//中断则删除等待节点,退出
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();//初始化等待节点
        else if (!queued)//插入等待队列中
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {//限时休眠等待
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else//休眠等待
            LockSupport.park(this);
    }
}

1.2.2 ExecutorCompletionService

  • 实例化
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();//阻塞队列
    }
  • 线程池任务提交
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
//QueueingFuture封装线程池任务
        executor.execute(new QueueingFuture(f));
        return f;
    }
  • QueueingFuture
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        //线程池任务FutureTask完成时调用,放入阻塞队列中
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
  • 阻塞队列获取完成的线程池任务
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

1.3 线程池执行所有任务

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {//所有任务依次提交执行
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get();//遍历任务,若未完成则休眠等待完成
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;//返回任务状态集合
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

二 ThreadPoolExecutor

2.1 实例化

  • 初始化几个核心参数
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize 线程池保留线程数量,配置allowCoreThreadTimeOut=true,则可能线程idle超时终止。
 * @param maximumPoolSize 线程池最大线程数量
 * @param keepAliveTime idle线程最大存活时间
 * @param unit  存活时间单位
 * @param workQueue 任务被线程执行前的存储队列
 * @param threadFactory 线程工厂
 * @param handler 容量满时的拒绝策略
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2.2 任务提交执行

  • 工作线程状态,执行中状态为负数,终止状态为正数。
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

2.2.1 execute()

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * 1. 线程数小于corePoolSize,则启动新线程执行任务
     * 2. 任务入阻塞队列后,再次检查。
     *    此时可能有任务执行完释放线程,则开始执行。
     *    也可能线程池被shutdown则回退入队操作
     * 3. 无法入队,则添加新线程执行任务,
     *    添加失败可能是线程池被shutdown或线程数达到maximumPoolSize,
     *    则使用拒绝策略拒绝任务
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {//工作线程数与池大小比较
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }//新增线程任务失败则二次检查,入任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //线程池被关闭,则删除入队的任务,并reject
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //已无工作线程,则添加一个null任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))//入队失败,新建线程执行
        reject(command);//失败则拒绝
}
//工作线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//工作状态首位为1,即是负数
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

2.2.2 addWorker()

  • 带标签的break,终止整个标签下的for循环
  • 带标签的continue,终止标签下的本次循环,继续下次循环。
  • 线程池中工作线程的集合,使用锁ReentrantLock mainLock控制并发
    private final HashSet<Worker> workers = new HashSet<Worker>();
  • addWorker()尝试添加工作线程
//获取线程池状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

private boolean compare AndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                        firstTask == null &&
                        ! workQueue.isEmpty()))
            return false;//shutdown状态,队列非空,则不添加null任务

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                return false;//容量已超,则返回false,不添加
            if (compareAndIncrementWorkerCount(c))
                break retry;//尝试添加工作计数,成功跳出标签循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)//线程池状态变更,则标签循环重试
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);//初始化worker,并使用线程工厂新建线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();//锁控制
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                //运行中线程池可添加任务,或shutdown线程池才可添加null任务
                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // 不可使用已运行的线程
                        throw new IllegalThreadStateException();
                    workers.add(w);//添加工作任务
                    int s = workers.size();
                    if (s > largestPoolSize)//更新池任务大小
                        largestPoolSize = s;
                    workerAdded = true;//添加成功标记
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();//添加成功,则启动任务线程
                workerStarted = true; //启动完成标记
            }
        }
    } finally {
        if (! workerStarted)//任务工作线程启动失败
            addWorkerFailed(w);
    }
    return workerStarted;
}
  • Worker继承AbstractQueuedSynchronizer,并发锁控制。实现独占锁。
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);//删除集合中工作任务
        decrementWorkerCount();
        tryTerminate();//终止线程
    } finally {
        mainLock.unlock();
    }
}
//降低工作线程数
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
  • 尝试终止线程池,每次线程池变更操作都会调用一次进行尝试。
    1 运行中的不被终止
    2 已进入终止处理的,不许再做处理
    3 shutdown状态但任务队列不为空,等待任务队列中任务都执行后才可终止
    4 shutdown状态,任务队列为空,有工作中线程,通知一个idle线程中断信号。
    5 shutdown状态,任务队列为空,无工作中线程,进入终止流程。修改状态,避免并发终止操作。终止完成后通知所有等待线程池终止的线程。
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
//运行中或已经进入终止阶段或shutdown但任务队列非空,则不再次尝试终止处理
            return;
        if (workerCountOf(c) != 0) { // 先向空闲线程发送中断信号
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {//修改线程池状态为终止中
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    //修改线程池状态为已终止,
                    ctl.set(ctlOf(TERMINATED, 0));
                    //通知所有termination条件对象等待线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {// 遍历工作任务
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {//加锁
                try {
                    t.interrupt();//通知中断
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)//仅通知一个任务
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
  • 等待条件对象通知线程池终止
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            //已终止,则不许再等待
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            //限时等待终止条件对象通知。
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

2.2.3 remove()

  • 删除工作任务队列中的对应任务
  • 尝试终止线程池
public boolean remove(Runnable task) {
    boolean removed = workQueue.remove(task);
    tryTerminate(); // In case SHUTDOWN and now empty
    return removed;
}

2.2.4 reject()

final void reject(Runnable command) {
    //调用拒绝策略,拒绝任务
    handler.rejectedExecution(command, this);
}

2.3 线程池终止

  • shutdown权限检查
private void checkShutdownAccess() {
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkPermission(shutdownPerm);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                security.checkAccess(w.thread);
        } finally {
            mainLock.unlock();
        }
    }
}
  • shutdown(),终止线程池,等待所有工作中线程及等待队列中任务执行完成。
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();//中断所有空闲的任务线程
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();//尝试终止线程池
}
  • shutdownNow,停止线程池,通知所有工作中任务线程中断,获取等待队列中的任务,返回给调用房
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();//通知所有任务线程中断
        tasks = drainQueue();//获取等待队列中的任务,返回给调用房
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

2.4 拒绝策略

策略 功能
CallerRunsPolicy 直接在调用线程中执行任务
AbortPolicy 抛异常RejectedExecutionException终止调用线程
DiscardPolicy 忽略本次任务
DiscardOldestPolicy 丢弃线程池任务队列中第一个等待执行的任务,执行本次任务。

2.5 任务执行

  • run函数
public void run() {
    runWorker(this);
}
  • runWorker线程执行提交的任务
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //有初始化任务firstTask,则执行初始化任务,
        //否则task = getTask()),从任务阻塞队列中获取任务执行
        //没有任务,则线程退出
        while (task != null || (task = getTask()) != null) {
            w.lock();//任务加锁
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();//线程池终止,则通知线程中断
            try {
                beforeExecute(wt, task);//模版模式方法调用
                Throwable thrown = null;
                try {
                    task.run();//任务执行
                } catch (RuntimeException x) {
                    thrown = x; throw x;//抛异常,则该线程无法再被调用
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {//模版模式方法调用
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;//统计数据
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {//工作任务退出处理
        processWorkerExit(w, completedAbruptly);
    }
}
  • getTask()
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//线程池终止状态,则不处理
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

//allowCoreThreadTimeOut=true,则core线程有空闲超时,否则超过corePoolSize的线程空闲超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
       //线程数量超限,超时则返回null,任务线程退出
        if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {//限时等待从任务队列中获取待执行任务
            Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) //异常终止的则需要减工作线程数
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);//加锁删除任务集合中的任务
    } finally {
        mainLock.unlock();
    }

    tryTerminate();//尝试一次线程池终止处理

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {//shutdown或running状态
        if (!completedAbruptly) {//正常执行完成终止
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)//超过线程数限制,则线程退出,不处理
                return; // replacement not needed
        }
//添加空线程,保证core线程数量,等待执行阻塞队列中任务。
        addWorker(null, false);
    }
}

2.5 任务处理流程

处理流程.png

三 ScheduledThreadPoolExecutor

  • 实例化,阻塞队列使用DelayedWorkQueue。类似于DelayedQueue,区别是自行实现了最小堆。
public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue(), threadFactory);
}
  • 指定延迟时间调度一次
public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                    triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}
  • 指定周期执行的调度任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                    null,
                    triggerTime(initialDelay, unit),
                    unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;//任务执行完成后重新放入等待执行队列中
    delayedExecute(t);
    return t;
}
  • 普通任务执行,即延迟时间为0的一次调度任务。

3.1 ScheduledFutureTask

  • 实例化
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;            //任务执行延迟时间
    this.period = period;   //任务执行周期时间
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • 任务执行
public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)//延迟任务执行一次
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();//更新下次周期的运行时间
        reExecutePeriodic(outerTask);//周期任务重新添加
    }
}
  • 在DelayedWorkQueue阻塞队列中,按time计算在队列中的优先级
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

3.2 delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);//线程池停止,则按拒绝策略拒绝任务
    else {
        super.getQueue().add(task);//队列添加任务
        if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
            task.cancel(false);//删除任务
        else
            ensurePrestart();//添加线程执行任务
    }
}

3.3 DelayedWorkQueue

  • 等待延迟时间达到后,获取阻塞队列中的任务
public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null)//空队列,则休眠等待条件对象
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)//延迟时间已经到达,则获取任务执行
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                if (leader != null)//线程已经在等待了。
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {//限时等待任务时间达到
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();//条件对象通知
        lock.unlock();
    }
}

相关文章

网友评论

      本文标题:juc10-ThreadPoolExecutor与Schedul

      本文链接:https://www.haomeiwen.com/subject/tlbysqtx.html