限流-阿里开源Sentinel

Sentinel作用

Sentinelm,分布式系统的流量防卫兵,可以从多个维度保护应用,广泛用于阿里各个系统,是大促稳定性保障的关键。现已开源,github地址Sentinel

最常用的场景就是限流,比方说提供某个http接口的服务,经过压测,我单机qps只能达到500,流量再大就可能导致系统崩溃,从而无法提供服务。参考压测进过,可以对此接口限流500,一旦超过,其他请求就快速失败,从而保证系统正常工作不被压垮。sentinel总结起来主要有以下作用:
1. 保护服务提供方不被过于频繁的调用压垮。
2. 保护服务调用方不被不可用的服务拖死。
3. 保护系统不超过负载。

Sentinel 的开源生态:

> 图1: 来源:sentinel github wiki

qps限流示例

下文通过一个示例,见github下sentinel-demo/sentinel-demo-basic。其他如spring的整合见spring-cloud-starter-alibaba-sentinel文档

static class RunTask implements Runnable {
    @Override
    public void run() {
        while (!stop) {
            Entry entry = null;

            try {
                // 1.获取token,如果不能,将会抛出BlockException
                entry = SphU.entry(KEY);
                pass.addAndGet(1);
            } catch (BlockException e1) {
                block.incrementAndGet();
            } catch (Exception e2) {
                // biz exception
            } finally {
                total.incrementAndGet();
                if (entry != null) {
                    // 2. 退出
                    entry.exit();
                }
            }
            Random random2 = new Random();
            try {
                TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
            } catch (InterruptedException e) {
            }
        }
    }
}

上面只是截取了FlowQpsDemo的一段代码,这个示例主要做了几个工作:
1. 初始化限流规则,限流20
2. 开启32个线程循环执行RunTask,并把总qps/通过qps/限流qps分表计入3个AtomicInteger
3. 开启一个定时任务,每秒统计一次3个AtomicInteger的值,作为统计数据输出

如:

1541907654105, total:1185, pass:20, block:1165
46 send qps is: 1218
1541907655110, total:1218, pass:21, block:1197
45 send qps is: 1130
1541907656115, total:1130, pass:20, block:1110
44 send qps is: 1177
1541907657119, total:1177, pass:22, block:1155

出现大于20的情况,因为统计线程有误差。那么,sentinel之所以能限流,肯定也统计了qps,有误差么?这个问题以后讨论

概念名词解释


> 图2

可以理解,上图为一个应用,其中ABCDEF为可以对其授权、限流、降级的入口,在sentinel中也叫切入点。比如现在访问C的qps为100,但是有可能A->C的访问频繁但不是很重要,而B-C次数不多但比较关键。此时我只想对A->C限流,于是就需要一个链路的概念,途中1、2、3组成3条链路,链路上的切入点也叫树节点。这些切入点可能处于不同链路如C,于是把用来汇总切入点信息的叫簇结点

示最简单的示例中,sentinel入口代码,定义一个切入口。

// 为需要保护的资源(概念或者逻辑上)定义一个入口,所有访问资源的请求都需要从此入口申请令牌
Entry entry = SphU.entry(“自定义资源名”);

如果要把节点纳入链路中,需要根据链路限流,需要通过上下文(Context)声明链路入口如:

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
nodeA.exit();
}
 ContextUtil.exit();

原理

还是从示例FlowQpsDemo,进入entry = SphU.entry(KEY)代码开始分析;

// 为了获取令牌,从此入口进入,通过一些列ProcessorSlot校验限流规则,能通过返回Entry,否则抛BlockException
public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
         return new CtEntry(resourceWrapper, null, context);
    }
    if (context == null) {
        // Using default context.
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }
    // 2
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        chain.entry(context, resourceWrapper, null, count, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}
  1. 从threadLocal获取Context,没有就使用默认Context
  2. 获取处理Slot(ProcessorSlot)
  3. 执行Slot的entry方法,校验通过则返回Entry

整个设计模式和netty类似:

|责任链模式|netty |sentinel|
|—|—|—|—|—|
|链|ChannelPipeline | chain |
|处理器|ChannelHandler| ProcessorSlot |
|上下文,携带元数据|ChannelHandlerContext|Context |
|不同|双向链表|单向链表|

所以下文重点看采用了哪些ProcessorSlot。

ProcessorSlot

先看lookProcessChain是如何创建ProcessorSlotChain。

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    // 更加资源wrapper从缓存查询
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        // 缓存不存在,加锁双重检查创建chain
        synchronized (LOCK) { 
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                    return null;
                }
                chain = SlotChainProvider.newSlotChain();
                Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                    chainMap.size() + 1);
                newMap.putAll(chainMap);
                newMap.put(resourceWrapper, chain);
                chainMap = newMap;
            }
        }
    }
    return chain;
}

我们接着看chain = SlotChainProvider.newSlotChain()如何创建chain,跟踪代码发现DefaultSlotChainBuilder这个类,build方法如下。

 public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());
    return chain;
}
  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

总体框架图:

> 图3


>图4: 3/4来源:sentinel github wiki

下午详细看看几个slot。

NodeSelectorSlot

从上文可知,NodeSelectorSlot对象是根据resourceWrapper缓存,也就是说,图2中BC节点(资源/入口)分别拥有不同NodeSelectorSlot对象。NodeSelectorSlot缓存了对应节点DefaultNode。

但是,缓存DefaultNode的key不是资源名,因为同一个资源可能处于不同链路,所以,sentinel采用Context的名字来缓存DefaultNode,从而构建不同链路的节点树;

图3中treenode还有一种结构没有画出来:

 *                  machine-root
 *                  /         \
 *                 /           \
 *         EntranceNode1   EntranceNode2
 *               /               \
 *              /                 \
 *      DefaultNode(nodeA)   DefaultNode(nodeA)
 *             |                    |
 *             +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);

图5

ClusterBuilderSlot

ClusterBuilderSlot用来创建构建资源的ClusterNode,主要维护资源的运行时统计数据,比如响应时间、qps、线程数、异常等。一个资源有一个clusterNode,但可以有多个defaultNode,如上图5。

StatisticSlot

StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据,维护到NodeSelectorSlot和ClusterBuilderSlot创建的数据结构中:
* clusterNode:资源唯一标识的 ClusterNode 的 runtime 统计
* origin:根据来自不同调用者的统计信息
* defaultnode: 根据上下文条目名称和资源 ID 的 runtime 统计
* 入口的统计

核心代码:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
    throws Throwable {
    try {
        fireEntry(context, resourceWrapper, node, count, args);
        // 线程+1
        node.increaseThreadNum();
        // 通过请求数+1
        node.addPassRequest();

        if (context.getCurEntry().getOriginNode() != null) {
            // 如果origin不为空,也更新数据
            context.getCurEntry().getOriginNode().increaseThreadNum();
            context.getCurEntry().getOriginNode().addPassRequest();
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            // ru入站请求,全局统计
            Constants.ENTRY_NODE.increaseThreadNum();
            Constants.ENTRY_NODE.addPassRequest();
        }
        
        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onPass(context, resourceWrapper, node, count, args);
        }
    } catch (BlockException e) {
        context.getCurEntry().setError(e);

        // Add block count.
        node.increaseBlockQps();
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseBlockQps();
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseBlockQps();
        }

        for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
            handler.onBlocked(e, context, resourceWrapper, node, count, args);
        }

        throw e;
    } catch (Throwable e) {
        context.getCurEntry().setError(e);

        // Should not happen
        node.increaseExceptionQps();
        if (context.getCurEntry().getOriginNode() != null) {
            context.getCurEntry().getOriginNode().increaseExceptionQps();
        }

        if (resourceWrapper.getType() == EntryType.IN) {
            Constants.ENTRY_NODE.increaseExceptionQps();
        }
        throw e;
    }
}

我们关注下这两行代码:

node.increaseThreadNum();
node.addPassRequest();

线程数统计比较容器,AtomicInteger保存即可。但是qps怎么统计,毕竟qps是一段时间内的请求数量。
我们找到如下代码:

@Override
public void addPassRequest() {
    rollingCounterInSecond.addPass();
    rollingCounterInMinute.addPass();
}

在深入看代码之前,我们先看看,假设统计qps,我们会怎么做。比如用一个AtomicInteger数组保存每一秒的请求,index就是当前秒/60。 统计qps只需要用当前时间计算数组Index,就可以获取qps。比如下图,1时秒10qps,2秒时400qps。

>图6

问题在于:读取qps的时间不是整秒,比如上图,统计时刻2秒500毫秒,此时,获取的qps是100,但其实还有后500毫秒的请求没有统计。如果此时我统计t-1呢,也就是1秒的qps,此刻qps是准确的,但是不及时,第2秒的qps突然暴增,那统计就延迟了一秒。对只用来展示倒是问题不太大,但是对于限流来说,晚1秒可能系统已经挂了。

优化的办法,我们会想,把时间间隔缩小,比如10ms。这样是更准确了,但是就得保存很多数据,并且求qps需要把100个刻度加起来。对资源消耗太大。下面,我们看看sentinel怎么做的。

sentinel使用滑动窗口,数据结构如下:

> 图7

public class WindowWrap<T> {
    /**
     * 一个bucket的长度,单位毫秒
     */
    private final long windowLengthInMs;
    /**
     * 窗口开始时间,也是毫秒
     */
    private long windowStart;
    /**
     * 统计值
     */
    private T value;
 }

sentinel 默认采用windowLengthInMs=500计算qps。也就是说,1秒只有两个窗口。初始化如下方式如下

// 1. 创建rollingCounterInSecond
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT,1)
ArrayMetric{
    // 2. ArrayMetric包装了 MetricsLeapArray
     private final MetricsLeapArray data = new MetricsLeapArray(500, 1);
}
public abstract class LeapArray<T> {
    // 3 array 所以计算qps  ,sampleCount = 2。
    protected final AtomicReferenceArray<WindowWrap<T>> array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}

创建几个bucket,取决于SampleCountProperty.SAMPLE_COUNT设置。还有个rollingCounterInMinute用于存储分钟请求数,buckets长度是1000ms。

累积请求

主要是以下步骤:
1. 获取当前窗口(bucket)
2. bucket的值+1

获取当前窗口(bucket)
1. 获取当前时间
2. 计算当前时间对应的bucket下标(index)和bucket起始时间。
3. 获取窗口 array.get(index),

public WindowWrap<T> currentWindow(long time) {
    long timeId = time / windowLengthInMs;
    // Calculate current index.
    int idx = (int)(timeId % array.length());

    // Cut the time to current window start.
    time = time - time % windowLengthInMs;

    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (time == old.windowStart()) {
            return old;
        } else if (time > old.windowStart()) {
            if (updateLock.tryLock()) {
                try {
                    // if (old is deprecated) then [LOCK] resetTo currentTime.
                    return resetWindowTo(old, time);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }

        } else if (time < old.windowStart()) {
            // Cannot go through here.
            return new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
        }
    }
}
1. 如果为空就新创建一个,new WindowWrap<T>
2. 如果不为空,且窗口时间对得上,则使用这个窗口
3. 如果时间大于返回窗口的起始时间,说明这个窗口过期,应该加锁重置为新窗口
4. 如果窗口时间小于返回窗口的起始时间,也不能用,也需要新返回一个bucket
// add pass(LongAdder)
wrap.value().addPass();

MetricBucket{
      // 使用LongAdder保存数据,高并发下比cas原子操作类性能更好
      private final LongAdder pass = new LongAdder();
}

获取qps

qps就是通过的请求,统计pass只需要把所有bucket的请求相加,StatisticNode代码如下:

@Override
public long pass() {
    data.currentWindow();
    long pass = 0;
    List<MetricBucket> list = data.values();

    for (MetricBucket window : list) {
        pass += window.pass();
    }
    return pass;
}

qps统计只用了两个buckets,其实也跟我们最初设计的有一样的问题,如果流量波动加大,还是反应慢,可以适当调整bucket数量,比如10个。

SystemSlot

按系统负载级别限流。只按照一个切入口qps,可能并不能反映系统负责情况。还可能需要考虑所有线程、qps、rt,load等。

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args)
    throws Throwable {
    SystemRuleManager.checkSystem(resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, args);
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
    // Ensure the checking switch is on.
    if (!checkSystemStatus.get()) {
        return;
    }
    // for inbound traffic only
    if (resourceWrapper.getType() != EntryType.IN) {
        return;
    }
    // total qps
    double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
    if (currentQps > qps) {
        throw new SystemBlockException(resourceWrapper.getName(), "qps");
    }
    // total thread
    int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
    if (currentThread > maxThread) {
        throw new SystemBlockException(resourceWrapper.getName(), "thread");
    }
    double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
    if (rt > maxRt) {
        throw new SystemBlockException(resourceWrapper.getName(), "rt");
    }
    // BBR algorithm.
    if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
        if (currentThread > 1 &&
            currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {
            throw new SystemBlockException(resourceWrapper.getName(), "load");
        }
    }
}

比较好理解,只是多了一个load的实时统计。这里使用一个算法BBR 来实现。

总结

本文简单介绍sentinel的责任链模式完成限流,并介绍了几个slot。其实还有很多细节可以挖掘。


参考:
1. https://github.com/alibaba/Sentinel/wiki/
2. https://github.com/sentinel-group/sentinel-awesome

CONTENTS