线程池

作者: arkliu | 来源:发表于2022-12-16 09:08 被阅读0次

ThreadPoolExecutor参数解释

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
  • corePoolSize 核心线程数
  • maximumPoolSize 最大线程数
  • keepAliveTime 最大空闲时间
  • unit 时间单位
  • workQueue 阻塞队列
  • threadFactory 创建线程的工厂类
  • handler 饱和处理机制

线程池的执行流程

image.png

ThreadPoolExecutor demo

package com.test.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExePollTest {

    public static void main(String[] args) {
        // 自定义线程池
        // 最大线程该如何定义:
        // 1. cpu密集型:cpu几核就是几,可以保持cpu效率最高
        System.out.println(Runtime.getRuntime().availableProcessors());
        // 2. io密集型:判断程序中,十分消耗io的线程的两倍
        ExecutorService service = new ThreadPoolExecutor(
                    2,
                    5,
                    3,
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<>(3),
                    Executors.defaultThreadFactory(),
                    new ThreadPoolExecutor.DiscardOldestPolicy()
                );
        try {
            // 最大承载线程数:队列大小+最大线程数量
            // AbortPolicy策略:超过RejectedExecutionException
            // CallerRunsPolicy策略: 超过最大承载线程数的部分,由调用该线程的线程执行
            // DiscardOldestPolicy策略: 超过最大承载线程数,不会抛出异常,丢掉任务
            // DiscardOldestPolicy策略: 超过最大承载线程数,尝试和最早的线程竞争,不会抛出异常
            for (int i = 1; i <= 9 ; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                service.execute(new Runnable() {
                    
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName()+"  ok");
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            service.shutdown();
        }
    }
}

java内置线程池

执行ExecutorService

execute(Runnable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
fixedExecutor.execute(new MyRunnable(1));
fixedExecutor.shutdown();

submit(Runnable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future future = fixedExecutor.submit(new MyRunnable(1));
try {
    //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
    System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();

submit(Callable)

ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
Future<String> future = fixedExecutor.submit(new Callable<String>() {

    @Override
    public String call() throws Exception {
        return "hello world..";
    }
});
try {
    //如果任务执行完成,future.get()方法会返回一个null,future.get()可能阻塞
    System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();
image.png

invokeAny

invokeAny会返回所有Callable任务中第一个得到执行完的Callable的结果作为返回值

// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("first runs..");
        return "first";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("second runs..");
        return "second";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("third runs..");
        return "third";
    }
});
String future = null;
try {
    //3. 执行invokeAny,会返回上面listCallable的任意一个Callable的返回值
    future = fixedExecutor.invokeAny(listCallable);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
System.out.println(future);
fixedExecutor.shutdown();
image.png

invokeAll

invokeAll会返回一个Future的List,其中对应着每个Callable任务执行后的Future对象

// 1. 创建3个固定线程的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
// 2. 创建listCallable 并初始化三个Callable
List<Callable<String>>listCallable = new ArrayList<Callable<String>>();
listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("first runs..");
        return "first";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        TimeUnit.SECONDS.sleep(2);
        System.out.println("second runs..");
        return "second";
    }
});

listCallable.add(new Callable<String>() {

    @Override
    public String call() throws Exception {
        System.out.println("third runs..");
        return "third";
    }
});
List<Future<String>> futures = null;
try {
    futures = fixedExecutor.invokeAll(listCallable);
    
    for (Future<String> future : futures) {
        System.out.println("result:"+future.get());
    }
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
fixedExecutor.shutdown();
image.png

newCachedThreadPool

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。看下面栗子:

新建一个MyRunnable

class MyRunnable implements Runnable {
    private int id;
    
    public MyRunnable(int id) {
        this.id = id;
    }
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" runs..."+id);
    }
    
    @Override
    public String toString() {
        return ""+id;
    }
}

测试

public static void testCachedThreadPool01() {
        //1. 使用工程类获取线程池对象
        ExecutorService cacheExecutor = Executors.newCachedThreadPool();
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png
// 使用工厂类获取线程池对象
    public static void testCachedThreadPool02() {
        ExecutorService cacheExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

newFixedThreadPool

最多n个线程将处于活动状态。如果提交了n个以上的线程,那么它们将保持在队列中,直到线程可用

public static void testnewFixedThreadPool01() {
        // 最多创建3个线程
        ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            fixedExecutor.submit(new MyRunnable(i));
        }
    }
image.png

工厂类获取线程池对象

// 使用工厂类获取线程池对象
    public static void testnewFixedThreadPool02() {
        ExecutorService cacheExecutor = Executors.newFixedThreadPool(3, new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

newSingleThreadExecutor

创建一个核心线程,并且最大线程也是1个,同时它具有一个无边界的阻塞队列LinkedBlockingQueue

public static void testnewSingleThreadExecutor01() {
        ExecutorService fixedExecutor = Executors.newSingleThreadExecutor();
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            fixedExecutor.submit(new MyRunnable(i));
        }
    }
image.png
// 使用工厂类获取线程池对象
    public static void testnewSingleThreadExecutor02() {
        ExecutorService cacheExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            int n = 1;
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "线程"+(n++));
            }
        });
        //2. 提交任务
        for (int i = 1; i <= 10; i++) {
            cacheExecutor.execute(new MyRunnable(i));
        }
    }
image.png

shutdown方法

shutdown将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断,并抛出RejectedExecutionException异常

ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
singleExecutor.execute(()->{
    System.out.println("thread1 runs..");
});
singleExecutor.shutdown();
// 上面已经shutdown了,后续加入进来执行的线程,将抛出RejectedExecutionException
singleExecutor.execute(()->{
    System.out.println("thread2 runs..");
});
image.png

shutdownNow

shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回

ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 10; i++) {
    singleExecutor.submit(new MyRunnable(i));
}
List<Runnable>list = singleExecutor.shutdownNow();
for (Runnable runnable : list) {
    System.out.println("线程:"+runnable+" 被取消了");
}
image.png

延迟或定期执行任务

延迟两秒执行任务

ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3);
        scheduledExecutor.schedule(()->{
            System.out.println("scheduledExecutor runs..");
        }, 2, TimeUnit.SECONDS);

间隔执行

ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(3, new ThreadFactory() {
    int n = 1;
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "线程:"+(n++));
    }
});
scheduledExecutor.scheduleAtFixedRate(new MyRunnable(1), 2, 2, TimeUnit.SECONDS);
2.gif

相关文章

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

  • 线程池

    JDK线程池 为什么要用线程池 线程池为什么这么设计 线程池原理 核心线程是否能被回收 如何回收空闲线程 Tomc...

网友评论

      本文标题:线程池

      本文链接:https://www.haomeiwen.com/subject/dzesxdtx.html