美文网首页Sentinel
Sentinel流量控制

Sentinel流量控制

作者: 晴天哥_王志 | 来源:发表于2021-06-20 11:33 被阅读0次

系列

开篇

  • 流量控制(flow control),其原理是监控应用流量的 QPS 或并发线程数等指标,当达到指定的阈值时对流量进行控制,以避免被瞬时的流量高峰冲垮,从而保障应用的高可用性。
  • FlowSlot 会根据预设的规则,结合前面 NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot 统计出来的实时信息进行流量控制。
  • 流量控制的详细介绍可以参考Sentinel流量控制的介绍

流控规则介绍

  • 流控规则在管理后台的配置参数如上图所示。
  • 流控规则核心属性如上图所示。

流控规则配置

    private static void initFlowRule(int interfaceFlowLimit, boolean method) {
        FlowRule flowRule = new FlowRule(INTERFACE_RES_KEY)
                .setCount(interfaceFlowLimit)
                .setGrade(RuleConstant.FLOW_GRADE_QPS);
        List<FlowRule> list = new ArrayList<>();
        if (method) {
            FlowRule flowRule1 = new FlowRule(RES_KEY)
                    .setCount(5)
                    .setGrade(RuleConstant.FLOW_GRADE_QPS);
            list.add(flowRule1);
        }
        list.add(flowRule);
        FlowRuleManager.loadRules(list);
    }
  • 通过配置流控规则并且通过FlowRuleManager生效限流规则。

流控规则定义

public interface Rule {
    String getResource();
}

public abstract class AbstractRule implements Rule {
    // 资源名
    private String resource;
    // 流控对应的调用来源
    private String limitApp;
}

public class FlowRule extends AbstractRule {

    // 流控类型:0=线程,1=QPS FLOW_GRADE_THREAD = 0 FLOW_GRADE_QPS = 1;
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    // 流控阈值,配置的是qps类型则代表qps的值;配置的是线程数类型则代表线程数
    private double count;
     // 流控限流策略
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    // 关联流控的资源
    private String refResource;
    // 流控效果控制 0. default(reject directly), 1. warm up,  2. rate limiter, 3. warm up + rate limiter
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    // 对应流控效果为Warm Up情况下,出现的预热时长
    private int warmUpPeriodSec = 10;
    // 对应流控效果为排队等待情况下,出现的超时时间
    private int maxQueueingTimeMs = 500;
    // 对应新增流控规则页面的是否集群
    private boolean clusterMode;
    // 集群流控的相关配置
    private ClusterFlowConfig clusterConfig;
    // 流量整形的实现,不同流控效果有不同算法
    private TrafficShapingController controller;
}

一条限流规则主要由下面几个因素组成,我们可以组合这些元素来实现不同的限流效果:

  • resource:资源名,即限流规则的作用对象
  • count: 限流阈值
  • grade: 限流阈值类型(QPS 或并发线程数)
  • limitApp: 流控针对的调用来源,若为 default 则不区分调用来源
  • strategy: 调用关系限流策略
  • controlBehavior: 流量控制效果(直接拒绝、Warm Up、匀速排队)

流控流程

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        // 校验是否限流
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}


public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        // 获取匹配的规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 检查规则能否通过
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, 
                               int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        // 集群模式下的规则检测
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        // 单机模式下的规则检测
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 选择流量统计的节点进行限流计算
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        // rule.getRater返回TrafficShapingController对象,
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
  • 流控的核心逻辑在FlowSlot进行处理,通过FlowRuleChecker进行限流规则生效判断
  • FlowRuleChecker的checkFlow会遍历FlowRule进行canPassCheck判断
  • canPassCheck单机模式执行passLocalCheck,集群模式执行passClusterCheck
  • passLocalCheck根据流控效果策略和获取的统计节点判断是否限流
  • selectNodeByRequesterAndStrategy返回ClusterBuilderSlot阶段生成的ClusterNode
  • TrafficShapingController在默认模式下返回流控效果策略DefaultController。
  • DefaultController负责实现流量控制。

流控效果策略

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;
    private double count;
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 获取当前已使用的token
        int curCount = avgUsedTokens(node);
        // 当前已使用token + 获取的token 大于token数量的场景
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }
}
  • 默认流控策略比较已使用token和此次消耗token是否大于token数量判断限流

流控策略

com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController

public final class FlowRuleUtil {
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                // 预热/冷启动
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                // 匀速排队
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                // 冷启动+匀速排队
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);

                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 直接拒绝
        return new DefaultController(rule.getCount(), rule.getGrade());
    }
}
  • 流控效果策略根据不同的规则返回不同的流控策略。
  • 直接拒绝(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默认的流量控制方式,当QPS超过任意规则的阈值后,新的请求就会被立即拒绝,拒绝方式为抛出FlowException。
  • Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。
  • 匀速排队(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法。

流控节点选择

public class FlowRuleChecker {

    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            // 实际访问的分支
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }
}
  • 根据请求和策略来来返回数据统计节点用以流控限制。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        // 负责创建DefaultNode,职责链以资源维度,slot以职责链维度
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    // 以资源作为key保存的全局的集群节点
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
    private static final Object lock = new Object();
    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        // 将DefaultNode设置进集群节点
        node.setClusterNode(clusterNode);

        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}
  • NodeSelectorSlot生成的DefaultNode在ClusterBuilderSlot被设置ClusterNode。
  • ClusterNode记录访问的统计数据会被用在流控当中。

相关文章

  • Sentinel 熔断降级

    Sentinel是什么? Sentinel是面向分布式服务架构的轻量级流量控制,熔断降级产品,主要以流量控制为切入...

  • sentinel 阿里开源流控框架介绍

    sentinel 是什么 Sentinel 是面向分布式服务架构的轻量级流量控制框架,主要以流量为切入点,从流量控...

  • Sentinel使用入门

    一、Sentinel介绍 Sentinel 是面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整...

  • spring cloud alibaba系列(二)Sentine

    限流组件Sentinel Sentinel是把流量作为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务...

  • 微服务架构--流量控制Sentinel(二)

    一、Sentinel简介   Sentinel 是面向分布式服务架构的轻量级流量控制产品,主要以流量为切入点,从流...

  • Alibaba Sentinel

    Sentinel 是什么 Sentinel 是面向分布式服务架构的轻量级流量控制组件,主要以流量为切入点,从限流、...

  • Sentinel流量控制

    系列 Sentinel流程介绍[https://www.jianshu.com/p/059aaf9492b1] S...

  • 分布式组件-Sentinel-Dashboard常规操作

    Sentinel资源保护规则 Sentinel支持多种保护规则:流量控制规则、熔断降级规则、系统保护规则、来源访问...

  • [SpringCloud Alibaba] 浅析Sentinel

    Sentinel 面向云原生微服务的流量控制、熔断降级组件。 搭建 官网下载Sentinel 仪表盘jar包。编译...

  • sentinel简介

    sentinel是面向分布式服务框架的轻量级流量控制框架,主要以流量为切入点,从流量控制,熔断降级,系统负载保护等...

网友评论

    本文标题:Sentinel流量控制

    本文链接:https://www.haomeiwen.com/subject/xalaeltx.html