0 类图

类图.png
一 AbstractExecutorService
1.1 提交线程执行任务
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
1.2 线程池执行任意任务
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
//限时设置
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));//提交线程池任务,并执行
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();//获取执行完成的线程池任务
if (f == null) {
if (ntasks > 0) {
--ntasks;//依次提交所有的待执行任务
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)//已无执行中任务
break;
else if (timed) {//有限时控制
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();//无数据则休眠等待
}
if (f != null) {//有任务完成
--active;
try {
return f.get();//有一个任务完成就返回任务执行结果
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);//清除
}
}
1.2.1 FutureTask
public void run() {
//更新线程状态
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//线程函数执行
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);//异常
}
if (ran)
set(result);//成功
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)//中断退出
handlePossibleCancellationInterrupt(s);
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更新状态
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();//通知完成
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//更新状态
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();//通知完成
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {//遍历等待线程任务完成的线程链表
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);//唤醒线程,继续处理
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();//子类实现执行done处理
callable = null; // to reduce footprint
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
//插入等待完成链表中,休眠等待线程任务完成。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);//中断则删除等待节点,退出
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();//初始化等待节点
else if (!queued)//插入等待队列中
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {//限时休眠等待
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else//休眠等待
LockSupport.park(this);
}
}
1.2.2 ExecutorCompletionService
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();//阻塞队列
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
//QueueingFuture封装线程池任务
executor.execute(new QueueingFuture(f));
return f;
}
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
//线程池任务FutureTask完成时调用,放入阻塞队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
1.3 线程池执行所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {//所有任务依次提交执行
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();//遍历任务,若未完成则休眠等待完成
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;//返回任务状态集合
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
二 ThreadPoolExecutor
2.1 实例化
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize 线程池保留线程数量,配置allowCoreThreadTimeOut=true,则可能线程idle超时终止。
* @param maximumPoolSize 线程池最大线程数量
* @param keepAliveTime idle线程最大存活时间
* @param unit 存活时间单位
* @param workQueue 任务被线程执行前的存储队列
* @param threadFactory 线程工厂
* @param handler 容量满时的拒绝策略
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
2.2 任务提交执行
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
2.2.1 execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1. 线程数小于corePoolSize,则启动新线程执行任务
* 2. 任务入阻塞队列后,再次检查。
* 此时可能有任务执行完释放线程,则开始执行。
* 也可能线程池被shutdown则回退入队操作
* 3. 无法入队,则添加新线程执行任务,
* 添加失败可能是线程池被shutdown或线程数达到maximumPoolSize,
* 则使用拒绝策略拒绝任务
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//工作线程数与池大小比较
if (addWorker(command, true))
return;
c = ctl.get();
}//新增线程任务失败则二次检查,入任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//线程池被关闭,则删除入队的任务,并reject
if (! isRunning(recheck) && remove(command))
reject(command);
//已无工作线程,则添加一个null任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//入队失败,新建线程执行
reject(command);//失败则拒绝
}
//工作线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//工作状态首位为1,即是负数
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
2.2.2 addWorker()
- 带标签的break,终止整个标签下的for循环
- 带标签的continue,终止标签下的本次循环,继续下次循环。
- 线程池中工作线程的集合,使用锁
ReentrantLock mainLock控制并发
private final HashSet<Worker> workers = new HashSet<Worker>();
- addWorker()尝试添加工作线程
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
private boolean compare AndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;//shutdown状态,队列非空,则不添加null任务
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;//容量已超,则返回false,不添加
if (compareAndIncrementWorkerCount(c))
break retry;//尝试添加工作计数,成功跳出标签循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)//线程池状态变更,则标签循环重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//初始化worker,并使用线程工厂新建线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//锁控制
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//运行中线程池可添加任务,或shutdown线程池才可添加null任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 不可使用已运行的线程
throw new IllegalThreadStateException();
workers.add(w);//添加工作任务
int s = workers.size();
if (s > largestPoolSize)//更新池任务大小
largestPoolSize = s;
workerAdded = true;//添加成功标记
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//添加成功,则启动任务线程
workerStarted = true; //启动完成标记
}
}
} finally {
if (! workerStarted)//任务工作线程启动失败
addWorkerFailed(w);
}
return workerStarted;
}
- Worker继承AbstractQueuedSynchronizer,并发锁控制。实现独占锁。
-
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);//删除集合中工作任务
decrementWorkerCount();
tryTerminate();//终止线程
} finally {
mainLock.unlock();
}
}
//降低工作线程数
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
- 尝试终止线程池,每次线程池变更操作都会调用一次进行尝试。
1 运行中的不被终止
2 已进入终止处理的,不许再做处理
3 shutdown状态但任务队列不为空,等待任务队列中任务都执行后才可终止
4 shutdown状态,任务队列为空,有工作中线程,通知一个idle线程中断信号。
5 shutdown状态,任务队列为空,无工作中线程,进入终止流程。修改状态,避免并发终止操作。终止完成后通知所有等待线程池终止的线程。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
//运行中或已经进入终止阶段或shutdown但任务队列非空,则不再次尝试终止处理
return;
if (workerCountOf(c) != 0) { // 先向空闲线程发送中断信号
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {//修改线程池状态为终止中
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//修改线程池状态为已终止,
ctl.set(ctlOf(TERMINATED, 0));
//通知所有termination条件对象等待线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {// 遍历工作任务
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {//加锁
try {
t.interrupt();//通知中断
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)//仅通知一个任务
break;
}
} finally {
mainLock.unlock();
}
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
//已终止,则不许再等待
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
//限时等待终止条件对象通知。
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
2.2.3 remove()
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
2.2.4 reject()
final void reject(Runnable command) {
//调用拒绝策略,拒绝任务
handler.rejectedExecution(command, this);
}
2.3 线程池终止
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
- shutdown(),终止线程池,等待所有工作中线程及等待队列中任务执行完成。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();//中断所有空闲的任务线程
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();//尝试终止线程池
}
- shutdownNow,停止线程池,通知所有工作中任务线程中断,获取等待队列中的任务,返回给调用房
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();//通知所有任务线程中断
tasks = drainQueue();//获取等待队列中的任务,返回给调用房
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
2.4 拒绝策略
| 策略 |
功能 |
| CallerRunsPolicy |
直接在调用线程中执行任务 |
| AbortPolicy |
抛异常RejectedExecutionException终止调用线程 |
| DiscardPolicy |
忽略本次任务 |
| DiscardOldestPolicy |
丢弃线程池任务队列中第一个等待执行的任务,执行本次任务。 |
2.5 任务执行
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//有初始化任务firstTask,则执行初始化任务,
//否则task = getTask()),从任务阻塞队列中获取任务执行
//没有任务,则线程退出
while (task != null || (task = getTask()) != null) {
w.lock();//任务加锁
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();//线程池终止,则通知线程中断
try {
beforeExecute(wt, task);//模版模式方法调用
Throwable thrown = null;
try {
task.run();//任务执行
} catch (RuntimeException x) {
thrown = x; throw x;//抛异常,则该线程无法再被调用
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {//模版模式方法调用
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;//统计数据
w.unlock();
}
}
completedAbruptly = false;
} finally {//工作任务退出处理
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {//线程池终止状态,则不处理
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//allowCoreThreadTimeOut=true,则core线程有空闲超时,否则超过corePoolSize的线程空闲超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//线程数量超限,超时则返回null,任务线程退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {//限时等待从任务队列中获取待执行任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //异常终止的则需要减工作线程数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//加锁删除任务集合中的任务
} finally {
mainLock.unlock();
}
tryTerminate();//尝试一次线程池终止处理
int c = ctl.get();
if (runStateLessThan(c, STOP)) {//shutdown或running状态
if (!completedAbruptly) {//正常执行完成终止
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)//超过线程数限制,则线程退出,不处理
return; // replacement not needed
}
//添加空线程,保证core线程数量,等待执行阻塞队列中任务。
addWorker(null, false);
}
}
2.5 任务处理流程

处理流程.png
三 ScheduledThreadPoolExecutor
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;//任务执行完成后重新放入等待执行队列中
delayedExecute(t);
return t;
}
3.1 ScheduledFutureTask
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns; //任务执行延迟时间
this.period = period; //任务执行周期时间
this.sequenceNumber = sequencer.getAndIncrement();
}
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)//延迟任务执行一次
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();//更新下次周期的运行时间
reExecutePeriodic(outerTask);//周期任务重新添加
}
}
- 在DelayedWorkQueue阻塞队列中,按time计算在队列中的优先级
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
3.2 delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);//线程池停止,则按拒绝策略拒绝任务
else {
super.getQueue().add(task);//队列添加任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);//删除任务
else
ensurePrestart();//添加线程执行任务
}
}
3.3 DelayedWorkQueue
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)//空队列,则休眠等待条件对象
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)//延迟时间已经到达,则获取任务执行
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)//线程已经在等待了。
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {//限时等待任务时间达到
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();//条件对象通知
lock.unlock();
}
}
网友评论