美文网首页
Java并发编程(四) - 线程池(上)

Java并发编程(四) - 线程池(上)

作者: ElliotG | 来源:发表于2018-10-27 17:47 被阅读0次

0. 为什么需要线程池

构造一个新的线程开销有点大,虽然线程与进程相比,是比较轻量的,但是线程的创建和关闭仍然需要花费时间,因为这涉及与操作系统的交互。而且,当线程数量过大时,会耗尽CPU和内存资源。

 

1. 什么是线程池

线程池是为了避免频繁地创建和销毁线程,对线程进行复用的。
就像数据库连接池维护数据库连接一样。
因此, 在使用线程池以后, 创建线程变成了从线程池获得空闲线程,关闭线程变成了向线程池归还线程。

 

2. 线程池的核心类

在java.util.concurrent包中, 有线程池管理的核心类。

  • ThreadPoolExecutor
    表示一个线程池。

  • Executors(执行器)
    Executors是创建线程池的工厂类,扮演着线程池工厂的角色。
    通过Executors可以取得一个拥有特定功能的线程池。

Executors的方法:

方法 描述
newCachedThreadPool() 返回一个可根据实际情况调整线程数量的线程池(无界线程池)
newFixedThreadPool(int nThreads) 返回一个固定线程数量的线程池(有界线程池)
newSingleThreadExecutor() 返回一个只有一个线程的线程池
newScheduledThreadPool() 用于调度执行的固定线程池
newWorkStealingPool() 一种适合"fork-join"任务的线程池,其中复杂的任务会分解成更简单的任务,空闲线程会"密取"较简单的任务
  • 无界线程池
    示例代码:
public class CachedThreadPoolDemo1 {
    public static void main(String[] args) {

        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(() -> {
            try {
                System.out.println("Runnable1 begin "
                        + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println("A");
                System.out.println("Runnable1   end "
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        });

        executorService.execute(() -> {
            try {
                System.out.println("Runnable2 begin "
                        + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println("B");
                System.out.println("Runnable2   end "
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        });

    }
}

运行结果:
Runnable1 begin 1594986808102
Runnable2 begin 1594986808103
A
B
Runnable2 end 1594986809122
Runnable1 end 1594986809122

  • 有界线程池
    示例代码:
public class NewFixedThreadPoolDemo {

    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        int threadNum = 5;
        int actualThreadNum = 10;
        ExecutorService es = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < actualThreadNum; i++) {
            es.submit(task);
        }
    }
}

运行结果:
1591878949865:Thread ID:16
1591878949865:Thread ID:15
1591878949865:Thread ID:13
1591878949865:Thread ID:17
1591878949865:Thread ID:14
1591878950869:Thread ID:15
1591878950869:Thread ID:14
1591878950869:Thread ID:13
1591878950869:Thread ID:17
1591878950869:Thread ID:16

从上面结果可以看出,线程的ID分了两批,而且前后时间也是分两批。
这正好就证明了,我们创建了一个5个线程的线程池,而总共10个线程,除以2就是两批。

 

3. 计划任务

如果我们需要使用线程池来周期性地执行某个任务,我们需要使用newSchedulesThreadPool()方法。
它返回一个ScheduledExecutorService,这个service并不会立刻安排执行任务,实际上它是计划任务,会在指定的时间,对任务进行调度。

下面的代码每隔2秒打印当前时间
ScheduledExecutorServiceDemo

public class ScheduledExecutorServiceDemo {

    public static void main(String[] args) {
        int threadSize = 10;
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(threadSize);
        ses.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(System.currentTimeMillis() / 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 2, TimeUnit.SECONDS);
    }
}

运行结果:
1591882377
1591882379
1591882381
1591882383
1591882385
1591882387

每隔2秒执行

 

3. 核心线程池的内部实现

事实上,newCachedThreadPool, newFixedThreadPool,newSingleThreadExecutor这3个方法都是调用的ThreadPoolExecutor的构造函数。

ThreadPoolExecutor构造函数:

public ThreadPoolExecutor(
    int corePoolSize,  // 线程池中线程数量
    int maxiumPoolSize,    // 线程池中最大线程数量
    keepAliveTime,  // 当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间
    unit,  // keepAliveTime的单位
    workQueue,  // 任务队列,被提交但尚未被执行的任务
    threadFactory, // 线程工厂,用于创建线程,一般用默认的即可
    handler  // 拒绝策略,当任务太多来不及处理,如何拒绝任务
);

三种线程池的实现方式:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads,
        nThreads, 
        0L, 
        TimeUnit.MILLISECONDS, 
        new LinkedBlockingQueue<Runnable>()
    );
}

public static ExecutorService newSingleThreadExecutor(int nThreads) {
    return new FinalizableDelegatedExecutorService(
        new ThreadPoolExecutor(1,
            1, 
            0L, 
            TimeUnit.MILLISECONDS, 
            new LinkedBlockingQueue<Runnable>()
        )
    );
}

public static ExecutorService newCachedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(0,
        Integer.MAX_VALUE, 
        60L, 
        TimeUnit.SECONDS, 
        new SynchronousQueue<Runnable>()
    );
}

newCachedThreadPool方法返回corePoolSize为0,maximumPoolSize无穷大的线程池。
这意味着在没有任务时,该线程池内无线程;而当任务被提交时,该线程池会使用空闲的线程执行任务;
若无空闲线程,则将任务加入SynchronousQueue队列。

 

4. ThreadPoolExecutor参数解释

  • corePoolSize
    指定了线程池中的线程数量

  • maximumPoolSize
    指定了线程池中的最大线程数量

  • keepAliveTime
    当线程池线程数量超过corePoolSize时, 多余的空闲线程的存活时间

  • unit
    keepAliveTime的单位(eg: TimeUnit.SECONDS)

  • workQueue
    任务队列,被提交但尚未被执行的任务

  • threadFactory
    线程工厂,用于创建线程,一般用默认的即可

  • handler
    拒绝策略,当任务太多来不及处理,如何拒绝任务

相关文章

网友评论

      本文标题:Java并发编程(四) - 线程池(上)

      本文链接:https://www.haomeiwen.com/subject/hpartqtx.html