美文网首页
线程池执行过程

线程池执行过程

作者: zianL | 来源:发表于2021-06-06 09:47 被阅读0次

线程池参数:

1. corePoolSize

线程池的核心线程数量。初始是不创建线程的。当有任务提交到线程池时,判定如果已经创建的线程数量小于核心数量,且没有空闲线程时,则会新建一个线程去执行新提交的任务。如果已经达到核心线程数量, 则会加入到阻塞队列中。

2.maximumPoolSize

线程池的最大容量。当线程池的阻塞队列放满了, 并且线程数量还未达到线程池的最大线程数量, 则会创建新的线程,直到达到最大值

3.keepAliveTime

当阻塞队列里面的任务被执行完了, 且有空闲线程时,指定大于核心线程池数量的部分空闲线程的存活时间, 毕竟线程也是需要消耗资源的,及时回收很有必要。当线程空闲的时间超过这个时间后,会回收掉一部分空闲线程,使其线程池中的线程数量不大于核心线程的数量

4.unit 和keepAliveTIme

配套使用 这里需要指定时间的单位

5.workQueue

阻塞队列,当没有空闲线程时,多余的任务缓存的地方。

6.threadFactory

线程工厂,用来创建线程时,设定线程的一些参数

7.handler

当线程数量达到最大值时,且阻塞队列慢了, 后续在提交任务时,没有地方可以接受继续的提交的任务。这种情况下的一个拒绝策略。

runWorker 流程

runworker流程.jpg
final void runWorker(Worker w) {
      Thread wt = Thread.currentThread();
      Runnable task = w.firstTask;
      w.firstTask = null;
      w.unlock(); // allow interrupts
      boolean completedAbruptly = true;
      try {
          //循环获取线程任务 直到返回空 结束当前线程
          while (task != null || (task = getTask()) != null) {
              w.lock();
              // If pool is stopping, ensure thread is interrupted;
              // if not, ensure thread is not interrupted.  This
              // requires a recheck in second case to deal with
              // shutdownNow race while clearing interrupt
            //如果线程池是停止的 确保线程被终止,反之确保线程不被终止
              if ((runStateAtLeast(ctl.get(), STOP) ||
                   (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                  !wt.isInterrupted())
                  wt.interrupt();
              try {
                  beforeExecute(wt, task);
                  Throwable thrown = null;
                  try {
                      //执行线程的run方法
                      task.run();
                  } catch (RuntimeException x) {
                      thrown = x; throw x;
                  } catch (Error x) {
                      thrown = x; throw x;
                  } catch (Throwable x) {
                      thrown = x; throw new Error(x);
                  } finally {
                      afterExecute(task, thrown);
                  }
              } finally {
                  task = null;
                  w.completedTasks++;
                  w.unlock();
              }
          }
          completedAbruptly = false;
      } finally {
          processWorkerExit(w, completedAbruptly);
      }
  }

getTask() 获取线程任务

  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        //自旋获取
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //判断线程池状态是暂停,并且工作队列为空 则递减线程数并返回null  让runworker 方法跳出循环
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //allowCoreThreadTimeOut 默认 false
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //wc > maximumPoolSize 为true  time 为true 
            //这时 timedOut 为false时 任务为空,执行线程递减 返回null 让runworker 结束循环
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
//timed=true 多余的线程通过poll 获取线程任务  false 核心线程通过take()获取线程任务,当队列为空时线程可堵塞,直至有任务加入唤醒线程
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

队列take();

   public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
              //该线程进入等待状态,释放资源
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

workQueue.offer(command) 唤醒等待线程

 public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
   private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }
    public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

     private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
  final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒线程地方            
LockSupport.unpark(node.thread);
        return true;
    }

addWorker 流程

线程池addworker (1).jpg
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:  //循环标志
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            //判断线程池状态是否是暂停、停止 并且(任务为空,且工作队列为空 则返回false)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN && firstTask == null && ! 
 workQueue.isEmpty())) 
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //如果工作线程大于 核心线程数、最大线程数则返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //cas 自旋递减工作线程数成功 跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //状态不一致  跳出循环进行下一次循环
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // new 一个信 工作对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //判断 状态工作  或者是暂停
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        //如果当前线程是活跃状态 则抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       //添加工作成功
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();  //执行worker run 方法  该方法执行runworker
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

      /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

相关文章

  • 线程池

    线程池种类 ThreadPoolExecutor 基础线程池 线程执行任务过程 当前执行线程数 < corePoo...

  • Java线程池的原理

    先上个网图说明下线程池的执行过程 来看下线程池的执行过程 1.当线程池数量小于核心线程数 直接创建核心线程 2.队...

  • 线程池

    线程池执行过程 线程池生命周期 线程池分类 阻塞队列 拒绝策略 - ThreadPoolExecutor.Abor...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • 线程池任务执行过程

    前言 JAVA通过多线程的方式实现并发,为了方便线程池的管理,JAVA采用线程池的方式对线线程的整个生命周期进行管...

  • 线程池——ThreadPool

    为什么要用线程池? 一个线程的执行过程可以简单概述为:创建线程——》执行任务——》销毁线程。我们只关心执行任务的环...

  • Java面试题——多线程

    Java面试题——多线程 1,什么是线程池? 线程池是多线程的一种处理方式,处理过程中将任务提交给线程池,任务执行...

  • 线程池 | 执行流程、拒绝策略

    线程池执行流程 想要真正的了解线程池的执行流程,就要先从线程池的执行方法execute()说起,execute()...

  • java面试之线程池相关

    379.什么是线程池? 什么是线程池?线程池是指在初始化一个多线程应用程序过程中创建一个线程集合,然后在需要执行新...

  • 线程池

    线程池执行过程 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个...

网友评论

      本文标题:线程池执行过程

      本文链接:https://www.haomeiwen.com/subject/scrosltx.html