业务场景
简单介绍一下为什么要做熔断:
- 业务场景中需要并发请求一批服务,每个服务都有不稳定性,有的时候部分服务会出现异常或者超时的情况。那么这些出现问题的服务将会影响整个服务的响应时间。
- 正所谓,一颗老鼠屎搞坏了一锅粥。此时,如果我们能够自动对那些不稳定的服务进行熔断,那么整体的响应时间肯定不会受到影响。
- 当然,
初步设计
- 刚开始进行设计的时候,对每一个服务都唯一进行了标记,并将这个标记转为了整数。再把所有的服务放入到LongAdder[]中。
- 由于服务是异步的,所以这里需要装饰一下结果处理者AsyncHandler,当该对象的方法被回调时才会真正触发熔断数据的处理。
public class FuseActor implements Actor {
//每个服务允许失败的次数
private final static int ALLOWED_FAILURE_COUNT = 5;
//定时任务,刷新服务resetFlag的时间
private final static int REFRESH_TIME = 2;
private LongAdder[] longAdders = new LongAdder[4000];
@Override
public Object execute(String serviceName, ProceedingJoinPoint joinPoint){
Object[] args = joinPoint.getArgs();
AsyncHandler<Response> olderAsyncHandler = (AsyncHandler<Response>) args[1];
//判断每个服务的失败次数,是否大于允许的失败次数
if (longAdders[regTagToInt((serviceName).getTag())].sum() >= ALLOWED_FAILURE_COUNT){
Response response = QueryResponse.createResponse(WebBasicCodeEnum.ALREADY_AT_PROTECTION_MODE, Response.class);
response.setStartTime(System.currentTimeMillis());
olderAsyncHandler.handleResult(response);
return null;
}else {
AsyncHandler<Response> newAsyncHandler = response -> {
if (!response.getCode().equals(IErrorCode.OK.getCode())){
longAdders[ServiceNameToInt(name)].increment();
}
olderAsyncHandler.handleResult(response);
};
args[1] = newAsyncHandler;
try {
return joinPoint.proceed(args);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return null;
}
}
private int ServiceNameToInt(String tag){
....
return Integer.parseInt(...);
}
}
- 通过上述的代码,可以知道,每过一段时间,所有的熔断的服务都会被释放,必然会导致这个时间点的整体服务响应时间变长。
- 所有的熔断的服务定时释放,是不能得知服务是否已经可用了,如果把不可用的服务释放了,仍然要经历一段时间的失败才会进入熔断,所以这样的设计不合理。
- 如果一个服务多次进入熔断状态,那么对应的释放时间是不是也需要随之增长呢?
重构与演变
- 通过上述的分析,上边提到的熔断机制只能起到一小部分的作用,还是存在一定的限制。通过阅读Hystrix的源码,借鉴可Hystrix的设计,自己搞了一个稍有味道的设计。
- 首先,对于面向对象编程的语言,我们要做更加的面向对象。这里,将每个服务的状态抽成一个类,来具体的描述服务的状态。而不能像上边一样,简单的通过一个数组中的一个LongAdder来描述一个服务的状态。
public class FuseActor implements Actor {
@Override
public ActorResult execute(String serviceName, ProceedingJoinPoint joinPoint){
Object[] args = joinPoint.getArgs();
AsyncHandler<Response> olderAsyncHandler = (AsyncHandler<Response>) args[1];
//这里通过serviceName创建对应的熔断器对象。
FuseBreaker fuseBreaker = RegFuseBreaker.create(serviceName);
//判断服务对应的熔断器是否允许请求
if (!regFuseBreaker.allowRequest()){
log.info("not allow request : {}",queryCode.getName());
Response response = QueryResponse.createResponse(WebBasicCodeEnum.ALREADY_AT_PROTECTION_MODE, Response.class);
response.setStartTime(System.currentTimeMillis());
olderAsyncHandler.handleResult(response);
return ActorResult.builder().effective(true).data(response).build();
}else {
//这里还是在装饰方法,在方法执行的时候对regFuseBreaker进行数据的处理。
AsyncHandler<Response> newAsyncHandler = response -> {
if (!response.getCode().equals(IErrorCode.OK.getCode())){
fuseBreaker.markNonSuccess();
}else {
fuseBreaker.markSuccess();
}
olderAsyncHandler.handleResult(response);
};
args[1] = newAsyncHandler;
try {
return ActorResult.builder().effective(false).data(joinPoint.proceed(args)).build();
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return ActorResult.builder().effective(false).data(null).build();
}
}
}
关键在于FuseBreaker的实现:
public class FuseBreaker {
private String name;
private final long SLEEP_WINDOW_IN_MILLISECONDS = 5000;
private final double ALLOW_REQUEST_THRESHOLD = 0.51;
private final AtomicLong failureCount = new AtomicLong();
private final AtomicLong successCount = new AtomicLong();
private final AtomicLong fuseOpenTime = new AtomicLong();
private final AtomicInteger halfOpenCount = new AtomicInteger(0);
private static final int MAX_HALF_OPEN_COUNT = 10;
private PublishSubject<Integer> failureSubject = PublishSubject.create();
private PublishSubject<Integer> successSubject = PublishSubject.create();
//final static 保证该map是类变量,并且仅会初始化一次
private final static ConcurrentMap<String,FuseBreaker> iQueryCodeRegFuseBreakerMap = new ConcurrentHashMap<>();
private FuseBreaker(String name) {
//看一下构造函数的实现,name我就不提了
this.name = name;
//这里引入的Rxjava的PublishSubject,通过该对象的一些方法,来实现的定时窗口的失败和成功次数的统计。
failureSubject.window(1,TimeUnit.SECONDS)//这里每一秒钟作为一个窗口数据源,向后发射
.flatMap(o -> o.reduce((integer, integer2) -> integer + integer2).toObservable())//对上边发射的一个数据源进行求和操作,并继续向后发送。其实就是统计上边一秒内发送的数据的和。
.window((count)15,(skip)1)//这里是上边的数据源,每15个作为一个新的窗口来向后发射。每次向后平移一个窗口
.flatMap(integerObservable -> integerObservable.scan((integer, integer2) -> integer + integer2).skip(14))//对上边每次发射的数据,一次求和
.subscribe(failureCount::set);
successSubject.window(1,TimeUnit.SECONDS)
.flatMap(o -> o.reduce((integer, integer2) -> integer + integer2).toObservable())
.window(15,1)
.flatMap(integerObservable -> integerObservable.scan((integer, integer2) -> integer + integer2).skip(14))//scan操作对第一次发送的数据应用一个函数,并将结果和下一次发送的数据同时应用该函数。最终将所有数据向后发射。skip操作是对一串数据流跳过前多少个数据。
.subscribe(successCount::set);
}
//通过ConcurrentHashMap来存储并且懒加载对象。
public static FuseBreaker create(String serviceName){
return iQueryCodeRegFuseBreakerMap.getOrDefault(serviceName, defaultBreaker(serviceName));
}
private static FuseBreaker defaultBreaker(String serviceName){
FuseBreaker res = new FuseBreaker(serviceName);
FuseBreaker absent = iQueryCodeRegFuseBreakerMap.putIfAbsent(serviceName, res);
return absent == null ? res : absent;
}
//熔断器的三个状态
enum Status {
CLOSED,HALF_OPEN,OPEN
}
private final AtomicReference<Status> status = new AtomicReference<>(Status.CLOSED);
public void markSuccess(){
if (status.compareAndSet(Status.HALF_OPEN,Status.CLOSED)){
successCount.set(0);
failureCount.set(0);
halfOpenCount.set(0);
}else if (status.get() == Status.CLOSED){
successSubject.onNext(1);
}
}
public void markNonSuccess(){
if (status.compareAndSet(Status.HALF_OPEN,Status.OPEN)){
fuseOpenTime.set(System.currentTimeMillis());
}else if (status.get() == Status.CLOSED){
failureSubject.onNext(1);
if (failureCount.get() / (failureCount.get() + successCount.get() + 1.0) > ALLOW_REQUEST_THRESHOLD
&& status.compareAndSet(Status.CLOSED,Status.OPEN)){
fuseOpenTime.set(System.currentTimeMillis());
}
}
}
public boolean allowRequest() {
log.info("current reg is :{} , status : {} , rate : {} , failureCount : {}, successCount : {}, sleepTime : {} , halfOpenCount :{}",
name, status.get(), failureCount.get() / (failureCount.get() + successCount.get() + 1.0), failureCount, successCount,
SLEEP_WINDOW_IN_MILLISECONDS * (1 << halfOpenCount.get()) - System.currentTimeMillis() + fuseOpenTime.get(), halfOpenCount);
if (status.get() == Status.CLOSED){
return true;
}else {
if (status.get() == Status.HALF_OPEN){
return false;
}else {
if (isAfterSleepWindow() && status.compareAndSet(Status.OPEN, Status.HALF_OPEN)){
if (halfOpenCount.get() <= MAX_HALF_OPEN_COUNT){
halfOpenCount.incrementAndGet();
}
return true;
}
}
}
return false;
}
private boolean isAfterSleepWindow() {
long fuseOpenTime = this.fuseOpenTime.get();
long currentTime = System.currentTimeMillis();
return SLEEP_WINDOW_IN_MILLISECONDS * (1 << halfOpenCount.get()) < currentTime - fuseOpenTime;
}
}
- 通过上述的代码,可以看到:
熔断器有CLOSED,HALF_OPEN,OPEN三个状态
- 从半开到关闭,或者到打开,我们只允许请求一次,这样,我们能保证服务可用时,熔断将会关闭,所有请求继续。服务不可用时,熔断器打开,继续熔断,并且增加熔断的时间。可以看到isAfterSleepWindow()方法中,进入半开状态的次数,将是熔断时间窗口的系数。
测试环节
通过压测,得出下列数据,对比效果,一目了然:
-
不加熔断机制的120s请求:完成总数为21次,平均响应时间为5.47s。
-
添加熔断之后的120s请求:完成总数37次,平均响应时间3.07s











网友评论