前言
项目中常常与第三方系统交互的场景,但是往往我们不能保证每一次网络请求都是正常的,但是常规的做法是对发起http请求的方法进行异常捕获,在处理异常中进行一次重试,若还是失败就丢弃该请求,很明显这么不能满足我们业务场景;
方案
我们基于异常捕获重试的方案进行进一步改进,获取请求参数、执行的方法、该方法所属类 封装成一个对象,放入队列。由异步线程去消费这个对象,从这么对象中拿到类、和方法名反射获取methor ,构造请求参数,重新执行该方法直至成功为止;
package com.gzcb.creditcard.quartz.utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
/**
* @description: 异常补偿队列
* @author: libingdao
* @create: 2019-11-04 15:06
**/
@Component
@Slf4j
public class ExceptionQueue {
private static final PriorityBlockingQueue<ExceptionCompensateDTO> message = new PriorityBlockingQueue();
private final ExecutorService pools = Executors.newCachedThreadPool();
private final ExecutorService subpools = Executors.newFixedThreadPool(2);
/**
* 是否启动消费者
*/
private volatile boolean enable = true;
@Async
public void consumer(){
while (!message.isEmpty()) {
ExceptionCompensateDTO poll = message.poll();
pools.submit(()->{
//出队并删除,假如执行失败重新入队
if(!StringUtils.isEmpty(poll)){
log.error("业务执行: {}",poll);
ClassMethod.invokeMethod(poll.getObj(),poll.getMethodName(),poll.getArgs());
}
});
}
enable = true;
log.error("子线程执行结束");
}
public void add(ExceptionCompensateDTO exceptionCompensateDTO){
log.info("exceptionCompensateDTO: {}",exceptionCompensateDTO);
if(!StringUtils.isEmpty(exceptionCompensateDTO)){
message.add(exceptionCompensateDTO);
}
log.debug("enable:{}",enable);
if(enable){
enable = false;
subpools.submit(()->{
log.debug("启动子线程");
consumer();
});
}
}
}
ExceptionQueue 控制消息的出入队列
package com.gzcb.creditcard.quartz.utils;
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Method;
/**
* @description:
* @author: libingdao
* @create: 2019-11-07 11:06
**/
public class ClassMethod {
/**
* @param obj 调用此方法的对象
* @param methodName 方法名称
* @param args 调用的这个方法的参数参数列表
*/
public static void invokeMethod(Object obj, String methodName, Object[] args) {
Class argsClass[] = null;
//1.参数存在
if (args != null) {
int len = args.length;
argsClass = new Class[len];
//2.根据参数得到相应的 Class的类对象
for (int i = 0; i < len; ++i) {
argsClass [i] = args[i].getClass();
}
}
// 找到方法
Method method = ReflectionUtils.findMethod(obj.getClass(), methodName, argsClass);
// 执行方法
ReflectionUtils.invokeMethod(method, obj, args);
}
}
ClassMethod 反射获取被执行者对象和方法、参数然后执行;
package com.gzcb.creditcard.quartz.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @description:
* @author: libingdao
* @create: 2019-11-07 11:04
**/
@Component
public class DeclareMethodA {
public DeclareMethodA(){}
@Autowired Test test;
public void showA(Integer a,String name,NewClass nc){
System.out.println("Integer="+a+" name="+name+" NewClass="+nc);
test.xx();
}
public static class NewClass{
}
}
DeclareMethodA 被执行对象
package com.gzcb.creditcard.quartz.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @description:
* @author: libingdao
* @create: 2019-11-07 11:04
**/
@Component
public class DeclareMethodB {
public DeclareMethodB(){}
@Autowired Test test;
public void show11(Integer a,String name){
System.out.println("Integer11="+a+" name11="+name);
test.xx();
}
public static class NewClass{
}
}
DeclareMethodB被执行对象
package com.gzcb.creditcard.quartz.utils;
import org.springframework.stereotype.Component;
/**
* @description:
* @author: libingdao
* @create: 2019-11-07 11:11
**/
@Component
public class Test {
public void xx(){
System.out.println("test.xx");
}
}
Test 对象主要是验证能否调通spring bean
package com.gzcb.creditcard.quartz.utils;
import lombok.Data;
/**
* @description:
* @author: libingdao
* @create: 2019-11-04 15:13
**/
@Data
public class ExceptionCompensateDTO implements Comparable{
public Object obj;
public String methodName;
public Object[] args;
public ExceptionCompensateDTO(Object obj, String methodName, Object[] args) {
this.obj = obj;
this.methodName = methodName;
this.args = args;
}
@Override
public int compareTo(Object o) {
return 0;
}
}
package com.gzcb.creditcard.quartz.sms;
import com.alibaba.fastjson.JSONObject;
import com.gzcb.creditcard.QuartzApplication;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateBody;
import com.gzcb.creditcard.config.sms.bean.SmsTemplateQuery;
import com.gzcb.creditcard.quartz.utils.*;
import com.gzcb.creditcard.service.SMSService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = QuartzApplication.class)
public class SmsTemplateServiceImplTest {
@Autowired
DeclareMethodA declareMethod;
@Autowired
DeclareMethodB declareMethod11;
@Autowired ExceptionQueueTesy exceptionQueueTesy;
static ExecutorService mainpools = Executors.newCachedThreadPool();
@Test
public void exceptionQueueTesy(){
for (Integer i = 0;i<1000;i++) {
Integer ss = i;
mainpools.submit(()->{
for (Integer j = 0;j<100;j++) {
ExceptionCompensateDTO dto =
new ExceptionCompensateDTO(declareMethod,"show",new Object[]{ss+j,"今天天气不错"+ss+j,new DeclareMethod.NewClass()});
exceptionQueueTesy.add(dto);
}
});
mainpools.submit(()->{
for (Integer j = 0;j<100;j++) {
ExceptionCompensateDTO dto =
new ExceptionCompensateDTO(declareMethod11,"show11",new Object[]{ss+j,"今天天气很糟糕"+ss+j});
exceptionQueueTesy.add(dto);
}
});
}
try {
Thread.sleep(2000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("主线程执行结束");
}
}
测试模拟多线程请求
网友评论