美文网首页
熔断机制的一次实现与演变

熔断机制的一次实现与演变

作者: 薛云龙 | 来源:发表于2018-02-28 16:59 被阅读28次

业务场景

简单介绍一下为什么要做熔断:

  • 业务场景中需要并发请求一批服务,每个服务都有不稳定性,有的时候部分服务会出现异常或者超时的情况。那么这些出现问题的服务将会影响整个服务的响应时间。
  • 正所谓,一颗老鼠屎搞坏了一锅粥。此时,如果我们能够自动对那些不稳定的服务进行熔断,那么整体的响应时间肯定不会受到影响。
  • 当然,

初步设计

  • 刚开始进行设计的时候,对每一个服务都唯一进行了标记,并将这个标记转为了整数。再把所有的服务放入到LongAdder[]中。
  • 由于服务是异步的,所以这里需要装饰一下结果处理者AsyncHandler,当该对象的方法被回调时才会真正触发熔断数据的处理。
public class FuseActor implements Actor {
    //每个服务允许失败的次数
    private final static int ALLOWED_FAILURE_COUNT = 5;
    //定时任务,刷新服务resetFlag的时间
    private final static int REFRESH_TIME = 2;
    private LongAdder[] longAdders = new LongAdder[4000];

    @Override
    public Object execute(String serviceName, ProceedingJoinPoint joinPoint){

        Object[] args = joinPoint.getArgs();
        AsyncHandler<Response> olderAsyncHandler = (AsyncHandler<Response>) args[1];
        //判断每个服务的失败次数,是否大于允许的失败次数
        if (longAdders[regTagToInt((serviceName).getTag())].sum() >= ALLOWED_FAILURE_COUNT){
           Response response = QueryResponse.createResponse(WebBasicCodeEnum.ALREADY_AT_PROTECTION_MODE, Response.class);
            response.setStartTime(System.currentTimeMillis());
            olderAsyncHandler.handleResult(response);
            return null;
        }else {
            AsyncHandler<Response> newAsyncHandler = response -> {
                if (!response.getCode().equals(IErrorCode.OK.getCode())){
                    longAdders[ServiceNameToInt(name)].increment();
                }
                olderAsyncHandler.handleResult(response);
            };
            args[1] = newAsyncHandler;
            try {
                return joinPoint.proceed(args);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return null;
        }
}
    private int ServiceNameToInt(String tag){
          ....
        return Integer.parseInt(...);
    }
}
  • 通过上述的代码,可以知道,每过一段时间,所有的熔断的服务都会被释放,必然会导致这个时间点的整体服务响应时间变长。
  • 所有的熔断的服务定时释放,是不能得知服务是否已经可用了,如果把不可用的服务释放了,仍然要经历一段时间的失败才会进入熔断,所以这样的设计不合理。
  • 如果一个服务多次进入熔断状态,那么对应的释放时间是不是也需要随之增长呢?

重构与演变

  • 通过上述的分析,上边提到的熔断机制只能起到一小部分的作用,还是存在一定的限制。通过阅读Hystrix的源码,借鉴可Hystrix的设计,自己搞了一个稍有味道的设计。
  • 首先,对于面向对象编程的语言,我们要做更加的面向对象。这里,将每个服务的状态抽成一个类,来具体的描述服务的状态。而不能像上边一样,简单的通过一个数组中的一个LongAdder来描述一个服务的状态。
public class FuseActor implements Actor { 

    @Override
    public ActorResult execute(String serviceName, ProceedingJoinPoint joinPoint){

        Object[] args = joinPoint.getArgs();
        AsyncHandler<Response> olderAsyncHandler = (AsyncHandler<Response>) args[1];
        //这里通过serviceName创建对应的熔断器对象。
        FuseBreaker fuseBreaker = RegFuseBreaker.create(serviceName);
        //判断服务对应的熔断器是否允许请求
        if (!regFuseBreaker.allowRequest()){
            log.info("not allow request : {}",queryCode.getName());
            Response response = QueryResponse.createResponse(WebBasicCodeEnum.ALREADY_AT_PROTECTION_MODE, Response.class);
            response.setStartTime(System.currentTimeMillis());
            olderAsyncHandler.handleResult(response);
            return ActorResult.builder().effective(true).data(response).build();
        }else {
            //这里还是在装饰方法,在方法执行的时候对regFuseBreaker进行数据的处理。
            AsyncHandler<Response> newAsyncHandler = response -> {
                if (!response.getCode().equals(IErrorCode.OK.getCode())){
                        fuseBreaker.markNonSuccess();
                    }else {
                        fuseBreaker.markSuccess();
                }
                olderAsyncHandler.handleResult(response);
            };
            args[1] = newAsyncHandler;

            try {
                return ActorResult.builder().effective(false).data(joinPoint.proceed(args)).build();
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return ActorResult.builder().effective(false).data(null).build();
        }
    }
}

关键在于FuseBreaker的实现:

public class FuseBreaker {

    private String name;

    private final long SLEEP_WINDOW_IN_MILLISECONDS = 5000;

    private final double ALLOW_REQUEST_THRESHOLD = 0.51;

    private final AtomicLong failureCount = new AtomicLong();

    private final AtomicLong successCount = new AtomicLong();

    private final AtomicLong fuseOpenTime = new AtomicLong();

    private final AtomicInteger halfOpenCount = new AtomicInteger(0);

    private static final int MAX_HALF_OPEN_COUNT = 10;

    private  PublishSubject<Integer> failureSubject = PublishSubject.create();

    private  PublishSubject<Integer> successSubject = PublishSubject.create();
    //final static 保证该map是类变量,并且仅会初始化一次
    private final static ConcurrentMap<String,FuseBreaker> iQueryCodeRegFuseBreakerMap = new ConcurrentHashMap<>();

    private FuseBreaker(String name) {
        //看一下构造函数的实现,name我就不提了
        this.name = name;
        //这里引入的Rxjava的PublishSubject,通过该对象的一些方法,来实现的定时窗口的失败和成功次数的统计。
        failureSubject.window(1,TimeUnit.SECONDS)//这里每一秒钟作为一个窗口数据源,向后发射
                .flatMap(o -> o.reduce((integer, integer2) -> integer + integer2).toObservable())//对上边发射的一个数据源进行求和操作,并继续向后发送。其实就是统计上边一秒内发送的数据的和。
                .window((count)15,(skip)1)//这里是上边的数据源,每15个作为一个新的窗口来向后发射。每次向后平移一个窗口
                .flatMap(integerObservable -> integerObservable.scan((integer, integer2) -> integer + integer2).skip(14))//对上边每次发射的数据,一次求和
                .subscribe(failureCount::set);
        successSubject.window(1,TimeUnit.SECONDS)
                .flatMap(o -> o.reduce((integer, integer2) -> integer + integer2).toObservable())
                .window(15,1)
                .flatMap(integerObservable -> integerObservable.scan((integer, integer2) -> integer + integer2).skip(14))//scan操作对第一次发送的数据应用一个函数,并将结果和下一次发送的数据同时应用该函数。最终将所有数据向后发射。skip操作是对一串数据流跳过前多少个数据。
                .subscribe(successCount::set);
    }
    //通过ConcurrentHashMap来存储并且懒加载对象。
    public static FuseBreaker create(String serviceName){
        return iQueryCodeRegFuseBreakerMap.getOrDefault(serviceName, defaultBreaker(serviceName));
    }

    private static FuseBreaker defaultBreaker(String serviceName){
        FuseBreaker res = new FuseBreaker(serviceName);
        FuseBreaker absent = iQueryCodeRegFuseBreakerMap.putIfAbsent(serviceName, res);
        return absent == null ? res : absent;
    }
    //熔断器的三个状态
    enum Status {
        CLOSED,HALF_OPEN,OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<>(Status.CLOSED);

    public void markSuccess(){
        if (status.compareAndSet(Status.HALF_OPEN,Status.CLOSED)){
            successCount.set(0);
            failureCount.set(0);
            halfOpenCount.set(0);
        }else if (status.get() == Status.CLOSED){
            successSubject.onNext(1);
        }
    }

    public void markNonSuccess(){
        if (status.compareAndSet(Status.HALF_OPEN,Status.OPEN)){
            fuseOpenTime.set(System.currentTimeMillis());
        }else if (status.get() == Status.CLOSED){
            failureSubject.onNext(1);
            if (failureCount.get() / (failureCount.get() + successCount.get() + 1.0) > ALLOW_REQUEST_THRESHOLD
                    && status.compareAndSet(Status.CLOSED,Status.OPEN)){
                fuseOpenTime.set(System.currentTimeMillis());
            }
        }
    }

    public boolean allowRequest() {
        log.info("current reg is :{} , status : {} , rate : {} , failureCount : {}, successCount : {}, sleepTime : {} , halfOpenCount :{}",
            name, status.get(), failureCount.get() / (failureCount.get() + successCount.get() + 1.0), failureCount, successCount,
                    SLEEP_WINDOW_IN_MILLISECONDS * (1 << halfOpenCount.get()) - System.currentTimeMillis() + fuseOpenTime.get(), halfOpenCount);

            if (status.get() == Status.CLOSED){
                return true;
            }else {
                if (status.get() == Status.HALF_OPEN){
                return false;
            }else {
                if (isAfterSleepWindow() && status.compareAndSet(Status.OPEN, Status.HALF_OPEN)){
                    if (halfOpenCount.get() <= MAX_HALF_OPEN_COUNT){
                        halfOpenCount.incrementAndGet();
                    }
                    return true;
                }
            }
        }
        return false;
    }

    private boolean isAfterSleepWindow() {
        long fuseOpenTime = this.fuseOpenTime.get();
        long currentTime = System.currentTimeMillis();
        return SLEEP_WINDOW_IN_MILLISECONDS * (1 << halfOpenCount.get()) < currentTime - fuseOpenTime;
    }
}
  • 通过上述的代码,可以看到:
    熔断器有CLOSED,HALF_OPEN,OPEN三个状态
熔断状态
  • 从半开到关闭,或者到打开,我们只允许请求一次,这样,我们能保证服务可用时,熔断将会关闭,所有请求继续。服务不可用时,熔断器打开,继续熔断,并且增加熔断的时间。可以看到isAfterSleepWindow()方法中,进入半开状态的次数,将是熔断时间窗口的系数。

测试环节

通过压测,得出下列数据,对比效果,一目了然:

  • 不加熔断机制的120s请求:完成总数为21次,平均响应时间为5.47s。


  • 添加熔断之后的120s请求:完成总数37次,平均响应时间3.07s


相关文章

  • 熔断机制的一次实现与演变

    业务场景 简单介绍一下为什么要做熔断: 业务场景中需要并发请求一批服务,每个服务都有不稳定性,有的时候部分服务会出...

  • 服务熔断Spring Cloud Hystrix

    Spring Cloud Hystrix 对于熔断机制的实现,Hystrix设计了三种状态: 1.熔断关闭状态(C...

  • Hystrix实现熔断机制

    前言 当服务规模足够大的时候,会出现多个服务之间调用的情况这样会产生一种问题,即某个服务不可用的时候,调用方线程就...

  • 服务降级熔断 - 熔断降级

    Hystrix实现原理-熔断机制 熔断是参考电路而产生的一种保护性机制,即系统中如果存在某个服务失败率过高时,将开...

  • 近期新闻关键词名解

    熔断机制 熔断机制(Circuit Breaker),也叫自动停盘机制,是指当股指波幅达到规定的熔断点时,交易所为...

  • 2019-05-15(二零二)

    离别 总是让人伤感 也许 离开 是因为 心灰意冷了吧 香帅的北大金融学 夭折的熔断机制。 什么熔断机制? 熔断机制...

  • Spring Cloud(5) Feign - 熔断机制

    目标 模拟实现熔断机制 account增加Hystrix依赖 account 增加fallBack处理 配置开启h...

  • 基于rxjava的生产消费模型

    一、前言 最近在看springcloud的熔断机制的实现,发现底层使用的rxjava实现,就看了下rxjava的使...

  • SpringClound-熔断-Hystrix

    具体的服务搭建参考上一篇文章 1.新建熔断处理类文件 需要实现熔断机制的rpc远程接口需要实现消费者服务中的rpc...

  • 投资黄金out了, 看数据投资钻石才是王道

    这几天,跌宕起伏的股市可以说是再一次虐惨了中国股民。熔断机制横空救市效果却不尽人意,接连的第二次熔断机制更是创下全...

网友评论

      本文标题:熔断机制的一次实现与演变

      本文链接:https://www.haomeiwen.com/subject/chnbxftx.html