Spring为任务调度与异步方法执行提供了注解支持。通过在方法上设置@Async注解,可使得方法被异步调用。也就是说调用者会在调用时立即返回,而被调用方法的实际执行是交给Spring的TaskExecutor来完成。
Async示例
定义接口
public interface IDemoService {
public String syncServiceInvoke() throws Exception;
public String asynServiceInvoke() throws Exception;
}
接口实现
@Service
public class IDemoServiceImpl implements IDemoService{
// 异步操作必须和被调用的线程独立开
@Resource
private AsynDemoServiceProxy asynDemoServiceProxy;
public String syncServiceInvoke() throws Exception {
long start = System.currentTimeMillis();
String str1 = asynDemoServiceProxy.asynMethod().get();
String str = syncMethod();
String str2 = asynDemoServiceProxy.asynMethod().get();
return "同步调用: 方法1:" + str1 + "方法2:" + str + "方法3:" + str2 + ",总共耗时:" + (System.currentTimeMillis() - start) + "ms";
}
public String asynServiceInvoke() throws Exception {
long start = System.currentTimeMillis();
Future<String> str1 = asynDemoServiceProxy.asynMethod();
String str = syncMethod();
Future<String> str2 = asynDemoServiceProxy.asynMethod();
return "异步调用: 方法1:" + str1.get() + "方法2:" + str + "方法3:" + str2.get() + ",总共耗时:" + (System.currentTimeMillis() - start) + "ms";
}
public String syncMethod(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "耗时:2s";
}
}
示例中分别实现了两个方法: 一个同步调用, 一个异步调用(AsyncDemoServiceProxy类中定义了方法的异步实现,这里在并行结果等待场景下,异步逻辑不能和当前主线程处于同一线程中, 负责异步会失效), 代码如下
@Component
@EnableAsync
public class AsyncDemoServiceProxy {
@Async
public Future<String> asynMethod(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<>("耗时:3s");
}
}
RestAPI定义
@Controller
@RequestMapping(value = "/api/demo")
public class AsyncDemoController extends BaseMultiController {
private static final Logger LOG = LoggerFactory.getLogger(LeopardDemoController.class);
@ResponseBody
@RequestMapping(value = "/invoke/sync", method = RequestMethod.GET)
public APIResult sync() throws Exception{
return new APIResult(ResultEnum.SUCCESS, demoService.syncServiceInvoke());
}
@ResponseBody
@RequestMapping(value = "/invoke/asyn", method = RequestMethod.GET)
public APIResult asyn() throws Exception{
return new APIResult(ResultEnum.SUCCESS, demoService.asynServiceInvoke());
}
}
开启@Async支持
SpringMvc处理器xml中配置如下内容
<!-- 支持 @Async 注解 -->
<task:annotation-driven executor="myExecutor"/>
<task:executor id="myExecutor" pool-size="5-20" queue-capacity="100"/>
使用SpringBoot的项目只需要在启动类上注解开启即可
@EnableAsync
@SpringBootApplication
public class AsyncApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncRestRetryDemoApplication.class, args);
}
}
示例测试
同步调用测试
@Test
public void sync() throws Exception {
String uri = "/api/demo/invoke/sync";
MvcResult mvcResult = this.mockMvc.perform(MockMvcRequestBuilders.get(uri)
.cookie(new Cookie("token", "61ADD4B8ED86C7E162"))
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8))
.andExpect(handler().handlerType(AsyncDemoController.class))
.andExpect(status().isOk())
.andDo(print())
.andReturn();
println(mvcResult.getResponse().getContentAsString());
}
程序执行结果(约8ms):
MockHttpServletRequest:
HTTP Method = GET
Request URI = /api/demo/invoke/sync
Parameters = {}
Headers = {Content-Type=[application/json;charset=UTF-8], Accept=[application/json;charset=UTF-8]}
Handler:
Type = com.tutorial.spring.async.api.web.controller.AsyncDemoController
Method = public com.tutorial.spring.async.api.result.APIResult com.tutorial.spring.async.api.web.controller.AsyncDemoController.sync() throws java.lang.Exception
Async:
Async started = false
Async result = null
Resolved Exception:
Type = null
ModelAndView:
View name = null
View = null
Model = null
FlashMap:
Attributes = null
MockHttpServletResponse:
Status = 200
Error message = null
Headers = {M-Appkey=[com.tutorial.spring.async.api], M-SpanName=[AsyncDemoController.sync], M-Host=[172.30.12.197], Content-Type=[application/json;charset=UTF-8], Content-Length=[126]}
Content type = application/json;charset=UTF-8
Body = {"data":"同步调用: 方法1:耗时:3s方法2:耗时:2s方法3:耗时:3s,总共耗时:8019ms","message":"成功","status":0}
Forwarded URL = null
Redirected URL = null
Cookies = []
{"data":"同步调用: 方法1:耗时:3s方法2:耗时:2s方法3:耗时:3s,总共耗时:8019ms","message":"成功","status":0}
异步调用测试
@Test
public void async() throws Exception {
String uri = "/api/demo/invoke/asyn";
MvcResult mvcResult = this.mockMvc.perform(MockMvcRequestBuilders.get(uri, 1, 1, 15)
.cookie(new Cookie("token", "61ADD4B8ED86C7E162"))
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8))
.andExpect(handler().handlerType(AsyncDemoController.class))
.andExpect(status().isOk())
.andDo(print())
.andReturn();
println(mvcResult.getResponse().getContentAsString());
}
程序执行结果(约5ms):
MockHttpServletRequest:
HTTP Method = GET
Request URI = /api/demo/invoke/asyn
Parameters = {}
Headers = {Content-Type=[application/json;charset=UTF-8], Accept=[application/json;charset=UTF-8]}
Handler:
Type = com.tutorial.spring.async.api.web.controller.AsyncDemoController
Method = public com.tutorial.spring.async.api.result.APIResult com.tutorial.spring.async.api.web.controller.AsyncDemoController.asyn() throws java.lang.Exception
Async:
Async started = false
Async result = null
Resolved Exception:
Type = null
ModelAndView:
View name = null
View = null
Model = null
FlashMap:
Attributes = null
MockHttpServletResponse:
Status = 200
Error message = null
Headers = {M-Appkey=[com.tutorial.spring.async.api], M-SpanName=[AsyncDemoController.asyn], M-Host=[172.30.12.197], Content-Type=[application/json;charset=UTF-8], Content-Length=[126]}
Content type = application/json;charset=UTF-8
Body = {"data":"异步调用: 方法1:耗时:3s方法2:耗时:2s方法3:耗时:3s,总共耗时:5011ms","message":"成功","status":0}
Forwarded URL = null
Redirected URL = null
Cookies = []
{"data":"异步调用: 方法1:耗时:3s方法2:耗时:2s方法3:耗时:3s,总共耗时:5011ms","message":"成功","status":0}
可以明显的看到, 其中串行的一个3ms的耗时被优化.相比于我们自己去定义一个线程体, 然后调用,这样的方式更优雅简单,也利于维护和调整.
异步调用的异常处理
Async在异步调用时发生异常是无法被外部调用者捕获的,默认的配置是没有处理异步调用中的异常,但是可以自定义异常处理器来处理异常。
定义自己的异常处理器:
@Configuration
public class AsyncExceptionConfig implements AsyncConfigurer {
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SpringAsyncExceptionHandler();
}
class SpringAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
System.out.println("Async异常解决");
}
}
}
自定义线程池
Async默认使用的SimpleAsyncTaskExecutor线程池,在没有开启限流机制时,默认每一个任务开启一个线程。当遇到高并发或者恶意攻击时会导致系统OOM崩溃。但是Async支持自定义线程池。
定义线程池
@Bean("threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("testExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
使用自定义线程池
@Async(value = "threadPoolTaskExecutor")
public Future<Long> count(int num) {
long sum = 0;
for (int i = 1; i <= num; ++i) {
sum += i;
}
return new AsyncResult<>(sum);
}












网友评论