1 场景
java中默认情况下,程序都是同步执行的。即在一个线程中执行,程序未执行完,不返回请求。
如下场景,均需要创建执行程序外额外的线程:
- 有些场景,需要
立即返回请求结果,核心程序在后台异步执行,此次情况需要使用异步线程; - 处理数据较多时,需要启用
多线程执行,提高执行速度;
2 直接创建线程
java中可借助如下类,直接创建线程:Thread、Runnable、Callable。
此种方式,不建议使用。频繁地创建线程,会占用大量的资源和时间,会大大降低系统的性能。
2.1 Thread创建线程
(1)类继承Thread类
public class ThreadClass extends Thread{
private String param;
public ThreadClass(String param){
this.param=param;
}
@Override
public void run() {
// 多线程方法
}
}
启动线程,代码如下:
new ThreadClass("my param").start();
(2)使用匿名类
new Thread(){
@Override
public void run() {
// 多线程方法
}
}.start();
2.2 Runnable创建线程
(1)类实现Runnable接口
public class RunnableClass implements Runnable {
private String param;
public RunnableClass(String param) {
this.param = param;
}
@Override
public void run() {
// 多线程方法
}
}
启动线程,代码如下:
RunnableClass runnableClass = new RunnableClass("thread param");
new Thread(runnableClass).start();
(2)使用匿名类
new Thread(new Runnable(){
@Override
public void run() {
// 多线程方法
}
}).start();
2.3 Callable创建线程
实现Callable接口创建的线程类,和Thread和Runnable的区别是:
- 可以
返回内容 - 可以
抛出异常
(1)类实现Callable接口
借助FutureTask执行线程,并获取返回结果:
public class CallableClass implements Callable<String> {
private String param;
public CallableClass(String param) {
this.param = param;
}
@Override
public String call() throws Exception {
// 自定义抛出异常,可在主线程中调用get方法时捕获
// throw new Exception("123");
// 线程返回值
return null;
}
}
启动线程,并获取结果,代码如下:
FutureTask<String> futureTask = new FutureTask<>(new CallableClass("param value"));
Thread myThread = new Thread(futureTask);
myThread.start();
try {
// 获取异步线程的返回值(调用此方法,会阻塞当前主线程)
System.out.println(futureTask.get());
} catch (InterruptedException e) {
// 线程被中断时抛出的异常
e.printStackTrace();
} catch (ExecutionException e) {
// Callable类中抛出的异常
e.printStackTrace();
}
(2)使用匿名类
FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
// 线程返回值
return null;
}
});
3 自定义线程池
3.1 关于线程池
(1)为什么使用线程池?
程序中不允许直接创建线程使用,建议使用线程池。避险new的线程太多,程序中线程数量不可控,导致程序出问题。
(2)创建几个线程池?
一个应有中,可以创建多个线程池,根据实际业务情况和服务器情况,创建对应的线程池,并调整成符合实际情况的参数配置。
建议相同场景的业务,使用同一个线程池,并进行文档记录。
3.1 工具类创建
不建议此种方式创建线程池。
jdk中的工具类java.util.concurrent.Executors,是java提供的创建线程池的工具类。
此工具类强烈建议不要用,主要是因为此工具类创建的线程池,要么是线程池的排队队列为无界队列(如Executors.newFixedThreadPool),要么是线程池的最大线程数为无界队列(如Executors.newCachedThreadPool)。
线程池中的最大线数或排队队列数,如果为无界队列,线程池将会不可控,系统将会出现严重的问题。
说明:
此处的无界队列,指的是Integer.MAX_VALUE,并不是绝对的无限制。
不建议此种方式创建线程池,此处也不再讲解。
3.2 自定义创建
3.2.1 创建说明
创建线程池,有很多种方式,此处借助于ThreadPoolExecutor来创建线程池。
建议使用ThreadPoolExecutor最完整的构造函数创建线程池:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
参数注释如下:
corePoolSize– the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize– the maximum number of threads to allow in the pool
keepAliveTime– when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit– the time unit for the keepAliveTime argument
workQueue– the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.
threadFactory– the factory to use when the executor creates a new thread
handler– the handler to use when execution is blocked because the thread bounds and queue capacities are reached
参数补充说明:
(1)当前线程数小于corePoolSize,有新提交任务,将创建新的线程。即使当前线程池中有空闲线程
(2)当前线程数等于corePoolSize,有新提交任务,会将新任务放到队列workQueue中,只有在workQueue满了后且“maximumPoolSize >corePoolSize”,才会扩充当前线程数
(3)提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
(4)当前线程数量大于corePoolSize,且线程空闲时间大于keepAliveTime后,空闲线程将被关闭。
(5)设置allowCoreThreadTimeOut为true时,线程数量不超过corePoolSize,空闲时间超过keepAliveTime后也会被关闭。
3.2.2 创建线程池
(1)创建依赖类
/**
* 自定义线程工厂(主要用于设置线程名称等属性)
* <br>
* 如没特殊要求,可使用guava中的线程工厂创建者:com.google.common.util.concurrent.ThreadFactoryBuilder
* <br>
* 如:ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("业务描述字符-pool-%d").build();
*/
public class MyThreadFactory implements ThreadFactory {
private String threadName;
private volatile AtomicInteger count = new AtomicInteger(0);
public MyThreadFactory(String threadName) {
this.threadName = threadName;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
// 设置线程名
String name = this.getClass().getSimpleName() + "_" + threadName + "_" + count.getAndAdd(1);
thread.setName(name);
return thread;
}
}
/**
* 自定义拒绝策略(排队队列满了后,会执行此策略)
* 默认拒绝策略AbortPolicy(拒绝任务,抛出异常)
*/
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
////自定义拒绝策略 ---------------
// (1)抛出异常,不执行任务
/*throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());*/
// (2)写入队列更改为阻塞方法
try {
// “非阻塞方法”offer改成“阻塞方法”put
executor.getQueue().put(r);
} catch (InterruptedException e) {
e.printStackTrace();
}
//// 额外操作--------------------
// (1)记录拒绝数量,进行日志记录及预警(根据此提示调整线程池配置或优化代码速度)
// (2)加入补偿线程池去执行
// (3)再次加入到线程池尝试执行
//-----------------------------
}
}
(2)创建线程池
// 线程排队队列(队列大小为100)
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
// 线程工厂
ThreadFactory threadFactory = new MyThreadFactory("testThread");
// 拒绝策略
RejectedExecutionHandler rejectedExecutionHandler = new MyRejectedExecutionHandler();
// 创建线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 50, 15, TimeUnit.SECONDS, workQueue, threadFactory, rejectedExecutionHandler);
3.2.3 使用线程池
(1)调用Runnable
poolExecutor.execute(new Runnable() {
@Override
public void run() {
// 输出内容:MyThreadFactory_testThread_0
System.out.println(Thread.currentThread().getName());
}
});
(2)调用callable
Future<String> future = poolExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
try {
// 输出内容:MyThreadFactory_testThread_0
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
3.2.6 监视线程池
线程池的状态属性,可以作为检测线程池状态使用。
| 属性 | 说明 |
|---|---|
| taskCount | 返回已计划执行的任务的大致总数。线程的状态可能只在近似值期间动态地返回,因为任务的状态可能只在近似值期间发生变化。 |
| completedTaskCount | 返回已完成执行的任务的大致总数。由于任务和线程的状态在计算过程中可能会动态变化,因此返回的值只是一个近似值,但在连续调用期间不会减少。 |
| largestPoolSize | 返回池中同时存在的最大线程数。 |
| poolSize | 返回池中同时存在的最大线程数。 |
| activeCount | 返回正在积极执行任务的大约线程数。 |
可以启动一个单独的线程,将线程池对象作为属性传递进去,根据实际情况,定时检测线程池的相关属性,并进行记录。
此线程也可作为被检测线程池的一个线程任务使用。
3.2.5 关闭线程池
线程池不用时,可以进行关闭,实际项目中关闭线程池的情况很少,此节仅做一个记录。
可以调用shutdown和shutdownNow关闭线程池。
(1)关闭机制
调用线程的interrupt方法来关闭线程。如线程无法响应,调用interrupt方法无法关闭线程,则线程池一直无法关闭。
(2)shutdown和shutdownNow区别
shutdown:拒绝新任务,正在执行的任务,和队列中的任务都会执行
shutdownNow:拒绝新任务、停止正在执行的任务、停止队列中任务
调用任何一个方法,isShutdown返回结果都为true。
所有任务都关闭后,线程池才真的被关闭,线程池真被关闭后,isTerminaed 返回为true。
3.3 应用DEMO
此处举例一个场景:一个业务操作,执行代码复杂且处理数据较多,需要启动多个线程进行操作,全部执行完毕后,进行汇总操作。
我们手动创建一个线程池,专门执行此业务操作,线程执行过程中,主线程阻塞。线程池内任务执行完毕后,关闭线程池,释放资源,主线程继续执行后面的汇总相关操作。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.*;
public class ThreadPoolDemoTest {
public static void main(String[] args) {
// ---------- 【一、定义线程池】 ----------
// 定义“线程工厂”
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("业务描述字符-pool-%d").build();
// 定义“阻塞队列”
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(3000);
// 定义“拒绝策略”
RejectedExecutionHandler rejectedExecutionHandler = (runnable, executor) -> {
try {
// 任务被拒绝, 主线程阻塞
executor.getQueue().put(runnable);
} catch (Exception e) {
e.printStackTrace();
}
};
// 定义“线程池”
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(16, 32, 10L, TimeUnit.SECONDS, linkedBlockingQueue, threadFactory, rejectedExecutionHandler);
// ---------- 【二、执行业务代码】 ----------
// TODO 拆分业务数据,放入到线程池中执行
try {
for (int i = 0; i < 10; i++) {
threadPoolExecutor.execute(() -> {
// 执行业务方法......
try {
System.out.println("执行业务方法......");
Thread.sleep(30 * 1000);
} catch (InterruptedException e) {
// TODO 自定义异常处理
}
});
}
// ---------- 【无、执行与线程池内任务无关的代码】 ----------
// TODO 根据实际情况,可执行与线程池内业务代码无关的代码
} catch (Exception e) {
// TODO 自定义异常处理
} finally {
// ---------- 【四、阻塞主线程,等待执行完成】 ----------
try {
// (1)不能再往线程池中添加任何任务,否则抛出异常;已添加到线程池中的任务都已经处理完成,才会退出
threadPoolExecutor.shutdown();
// (2)判断线程池中任务是否结束
// 停止方式1:挂起主线程,每隔一段时间查看线程池是否关闭。如未关闭,一段时间后继续查看,直到所有任务执行完毕
while (!threadPoolExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
// 继续等待,直到线程池关闭
}
/*
// 停止方式2:挂起主线程,隔一段时间查看线程池是否关闭。如未关闭,则手动关闭线程池
if(threadPoolExecutor.awaitTermination(1, TimeUnit.MINUTES)){
// 试图停止所有正在执行的线程,不再处理还在池队列中等待的任务
threadPoolExecutor.shutdownNow();
}
*/
} catch (Exception e) {
// TODO 自定义异常处理
}
}
// ---------- 【五、线程池内任务执行完毕,主线程阻塞结束】 ----------
// TODO:线程内所有任务,执行完毕后。执行其他方法
}
}
4 spring线程池
如果java中集成了spring环境,建议使用spring创建线程池。
4.1 maven依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.2.RELEASE</version>
</dependency>
4.2 定义线程池
/**
* 通用线程池
* @return org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数(默认为1)
executor.setCorePoolSize(5);
// 最大线程数(默认为:Integer.MAX_VALUE)
executor.setMaxPoolSize(50);
// 队列最大长度(默认为:Integer.MAX_VALUE)
executor.setQueueCapacity(1000);
// 线程池内线程允许的空闲时间/秒(默认60秒)
executor.setKeepAliveSeconds(10);
// 线程池拒绝策略(默认AbortPolicy:拒绝任务,抛出异常)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 设置线程名前缀(用于定制线程名)
executor.setThreadNamePrefix("common thread");
// 设置线程分组名(用于定制分组信息)
//executor.setThreadGroupName("common thread group");
// 设置创建线程的优先级(默认为Thread.NORM_PRIORITY)
//executor.setThreadPriority(Thread.NORM_PRIORITY);
// 是否允许核心线程数超时(默认为false,设置为true后,即使队列未满,也可实现线程的动态增加和减少)
//executor.setAllowCoreThreadTimeOut(true);
return executor;
}
4.3 调用线程池
(1)无返回值
threadPoolTaskExecutor.execute(new Runnable() {
@Override
public void run() {
// 输出:common thread1
System.out.println(Thread.currentThread().getName());
}
});
(2)有返回值
Future<String> future = threadPoolTaskExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Thread.currentThread().getName();
}
});
try {
// 输出:common thread1
System.out.println(future.get());
} catch (InterruptedException e) {
// 线程被中断时抛出的异常
e.printStackTrace();
} catch (ExecutionException e) {
// Callable类中抛出的异常
e.printStackTrace();
}
4.4 注解调用
调用spring中的线程池异步执行方法,也可通过spring中的注解@Async来实现,此注解可以加到方法上或者类上(加类上后,类上面的所有代理方法,均为异步线程方法)。
4.4.1 开启注解
(1)springBoot开启注解
通过在启动类上,加上@EnableAsync注解来开启@Async的异步线程方法。
(2)springMvc开启注解
sprignMvc通过加上如下配置,来开启@Async的异步线程方法:
<!-- executor为默认的spring线程池 -->
<task:annotation-driven executor="threadPoolTaskExecutor"/>
4.4.2 使用异步方法
@Async
public void testAsyncMethod() {
// 输出:common thread1
System.out.println(Thread.currentThread().getName());
}
4.4.3 多个线程池的情况
spring中也可以定义多个线程池,每个线程池的类型均为ThreadPoolTaskExecutor,每个线程池的name不同。
@Async注解不加参数时,使用的为默认的线程池,如需要指定线程池,可根据bean的name来指定,如下:
(1)定义非默认线程池
@Bean
public ThreadPoolTaskExecutor otherThreadPoolTaskExecutor() {
......
}
(2)调用非默认线程池
@Async(value = "otherThreadPoolTaskExecutor")
public void testAsyncMethod() {
System.out.println(Thread.currentThread().getName());
}
注解的value注释说明:
指定的异步操作的限定符值。
可用于确定在执行异步操作时要使用的
目标执行器,它与特定执行器或TaskExecutor bean定义的限定符值(或bean名称)匹配。当在类级别@Async注释上指定时,指示给定的执行器应用于类中的所有方法。方法级使用Async#value总是覆盖类级别设置的任何值。








网友评论