介绍
一个流量治理组件通常拥有“限流”这个最基础的能力,实现限流其核心思想是通过统计一段时间内的请求数,然后根据预先设定的阈值判断是否应该进行限流
那么如何存储并统计这一段时间内的请求数则是核心关键,本文将重点介绍Sentienl-Go是如何实现的毫秒级指标存储以及数据统计
固定窗口
在正式介绍之前,先简单介绍一下固定窗口的算法(也叫计数器算法)是实现流量控制比较简单的一种方式。其他常见的还有很多例如滑动时间窗口算法,漏桶算法,令牌桶算法等等。
固定窗口算法一般是通过原子操作将请求在统计周期内进行累加,然后当请求数大于阈值时进行限流。
实现代码:
1var (
2 counter int64 //计数
3 intervalMs int64 = 1000 //窗口长度(1S)
4 threshold int64 = 2 //限流阈值
5 startTime = time.Now().UnixMilli() //窗口开始时间
6)
7
8func main() {
9 for i := 0; i < 10; i++ {
10 if tryAcquire() {
11 fmt.Println("成功请求", time.Now().Unix())
12 }
13 }
14}
15
16func tryAcquire() bool {
17 if time.Now().UnixMilli()-atomic.LoadInt64(&startTime) > intervalMs {
18 atomic.StoreInt64(&startTime, time.Now().UnixMilli())
19 atomic.StoreInt64(&counter, 0)
20 }
21 return atomic.AddInt64(&counter, 1) <= threshold
22}
固定窗口的限流在实现上看起来比较简单容易,但是也有一些问题,最典型的就是“边界”问题。
如下图:
统计周期为1S,限流阈值是2的情况下,假设4次请求恰好“跨越”了固定的时间窗口,如红色的1S时间窗口所示会有四次请求,明显不符合限流的预期
滑动时间窗口
在滑动时间窗口算法中可以解决固定窗口算法的边界问题,在滑动窗口算法中通常有两个比较重要的概念
-
统计周期:例如想限制5S的请求数不能超过100次,那么5S就是统计周期
-
窗口(格子)的大小:一个周期内会有多个窗口(格子)进行指标(例如请求数)的统计,长度相等的统计周期,格子的数量越多,统计的越精确
如下所示:统计周期为1s,每个周期内分为两个格子,每个格子的长度是500ms。
在滑动窗口中统计周期以及窗口的大小,需要根据业务情况进行设定。
统计周期一致,窗口大小不一致:窗口越大统计精准度越低,但并发性能好,越小:统计精准度越高,并发性能随之降低
统计周期不一致,窗口大小一致:周期越长抗流量脉冲情况越好
统计结构
下面将详细介绍 Sentinel-Go 是如何使用滑动时间窗口高效的存储和统计指标数据的
窗口结构
在滑动时间窗口中时间很重要。每个窗口(BocketWrap)的组成是由一个开始时间和一个抽象的统计结构
1type BucketWrap struct {
2 // BucketStart represents start timestamp of this statistic bucket wrapper.
3 BucketStart uint64
4 // Value represents the actual data structure of the metrics (e.g. MetricBucket).
5 Value atomic.Value
6}
**开始时间:**当前格子的的起始时间
**统计结构:**存储指标数据,原子操作并发安全
如下:统计周期1s,每个窗口的长度是200ms
指标数据:
-
pass: 表示到来的数量,即此刻通过 Sentinel-Go 规则的流量数量
-
block: 表示被拦截的流量数量
-
complete: 表示完成的流量数量,包含正常结束和异常结束的情况
-
error: 表示错误的流量数量(熔断场景使用)
-
rt: 单次请求的request time
-
total:暂时无用
原子时间轮
如上:整个统计周期内有多个时间窗口,在 Sentinel-Go 中统计周期是由slice实现的,每个元素对应一个窗口
在上面介绍了为了解决边界问题,滑动时间窗口统计的过程需要向右滑动。随时时间的推移,无限的向右滑动,势必会让slice持续的扩张,导致slice的容量就会越大
为了解决这个问题,在 Sentinel-Go 中实现了一个时间轮的概念,通过固定slice长度将过期的时间窗口重置,节省空间。
如下:原子时间轮数据结构
1type AtomicBucketWrapArray struct {
2 // The base address for real data array
3 base unsafe.Pointer // 窗口数组首元素地址
4 // The length of slice(array), it can not be modified.
5 length int // 窗口数组的长度
6 data []*BucketWrap //窗口数组
7}
初始化
1: 根据当前时间计算出当前时间对应的窗口的startime,并得到当前窗口对应的位置
1// 计算开始时间
2func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {
3 return now - (now % uint64(bucketLengthInMs))
4}
5// 窗口下标位置
6idx := int((now / uint64(bucketLengthInMs)) % uint64(len))
2:初始化窗口数据结构(BucketWrap)
1for i := idx; i <= len-1; i++ {
2 ww := &BucketWrap{
3 BucketStart: startTime,
4 Value: atomic.Value{},
5 }
6 ww.Value.Store(generator.NewEmptyBucket())
7 ret.data[i] = ww
8 startTime += uint64(bucketLengthInMs)
9}
10for i := 0; i < idx; i++ {
11 ww := &BucketWrap{
12 BucketStart: startTime,
13 Value: atomic.Value{},
14 }
15 ww.Value.Store(generator.NewEmptyBucket())
16 ret.data[i] = ww
17 startTime += uint64(bucketLengthInMs)
18}
3:将窗口数组首元素地址设置到原子时间轮
13:将窗口数组首元素地址设置到原子时间轮
2// calculate base address for real data array
3sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))
4ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))
如果对unsafe.Pointer和slice熟悉的对于这段代码不难理解,这里通过unsafe.Pointer将底层slice首元素(第一个窗口)地址设置到原子时间轮中。这么做的原因主要是实现对时间轮中的元素(窗口)进行原子无锁的读取和更新,极大的提升性能。
窗口获取&窗口替换
如何在并发安全的情况下读取窗口和对窗口进行替换(时间轮涉及到对窗口更新操作)
代码如下:
1// 获取对应窗口
2func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
3 // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
4 // then convert to (*unsafe.Pointer)
5 if offset, ok := aa.elementOffset(idx); ok {
6 return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
7 }
8 return nil
9}
10
11// 替换对应窗口
12func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
13 // aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
14 // then convert to (*unsafe.Pointer)
15 // update secondary pointer
16 if offset, ok := aa.elementOffset(idx); ok {
17 return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
18 }
19 return false
20}
21
22// 获取对应窗口的地址
23func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
24 if idx >= aa.length || idx < 0 {
25 logging.Error(errors.New("array index out of bounds"),
26 "array index out of bounds in AtomicBucketWrapArray.elementOffset()",
27 "idx", idx, "arrayLength", aa.length)
28 return nil, false
29 }
30 basePtr := aa.base
31 return unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true
32}
获取窗口:
-
在get func中接收根据当前时间计算出的窗口对应下标位置
-
根据下标位置在elementOffset func中,首先将底层的slice首元素地址转换成uintptr,然后将窗口对应下标*对应的指针字节大小即可以得到对应窗口元素的地址
-
将对应窗口地址转换成时间窗口(*BucketWarp)即可
窗口更新:
和获取窗口一样,获取到对应下标位置的窗口地址,然后利用atomic.CompareAndSwapPointer进行cas更新,将新的窗口指针地址更新到底层数组中。
滑动窗口
在原子时间轮中提供了对窗口读取以及更新的操作。那么在什么时机触发更新以及如何滑动?
滑动
所谓滑动就是根据当前时间找到整个统计周期的所有窗口中的数据。例如在限流场景下,我们需要获取统计周期内的所有pass的流量,从而来判断当前流量是否应该被限流。
核心代码:
1// 根据当前时间获取周期内的所有窗口
2func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap {
3 start, end := m.getBucketStartRange(now)
4 satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {
5 return ws >= start && ws <= end
6 })
7 return satisfiedBuckets
8}
9
10
11// 根据当前时间获取整个周期对应的窗口的开始时间和结束时间
12func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
13 curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
14 end = curBucketStartTime
15 start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
16 return
17}
18
19// 匹配符合条件的窗口
20func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
21 if now <= 0 {
22 return make([]*BucketWrap, 0)
23 }
24 ret := make([]*BucketWrap, 0, la.array.length)
25 for i := 0; i < la.array.length; i++ {
26 ww := la.array.get(i)
27 if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {
28 continue
29 }
30 ret = append(ret, ww)
31 }
32 return ret
33}
如下图所示:统计周期=1000ms(跨两个格子),now=1300时 计算出start=500,end=1000
那么在计算周期内的pass数量时,会根据如下条件遍历格子,也就会找到开始时间是500和1000的两个格子,那么统计的时候1000的这个格子中的数据自然也会被统计到。(当前时间1300,在1000的这个格子中)
1satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool {
2 return ws >= start && ws <= end
3 })
更新
每次流量经过时都会进行相应的指标存储,在存储时会先获取对应的窗口,然后会根据窗口的开始时间进行对比,如果过期则进行窗口重置。
如下图:根据窗口开始时间匹配发现0号窗口已过期
如下图:重置窗口的开始时间和统计指标
核心代码:
1func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
2 // 计算当前时间对应的窗口下标
3 idx := la.calculateTimeIdx(now)
4 // 计算当前时间对应的窗口的开始时间
5 bucketStart := calculateStartTime(now, la.bucketLengthInMs)
6
7 for {
8 // 获取旧窗口
9 old := la.array.get(idx)
10 // 如果旧窗口==nil则初始化(正常不会执行这部分代码)
11 if old == nil {
12 newWrap := &BucketWrap{
13 BucketStart: bucketStart,
14 Value: atomic.Value{},
15 }
16 newWrap.Value.Store(bg.NewEmptyBucket())
17 if la.array.compareAndSet(idx, nil, newWrap) {
18 return newWrap, nil
19 } else {
20 runtime.Gosched()
21 }
22 // 如果本次计算的开始时间等于旧窗口的开始时间,则认为窗口没有过期,直接返回
23 } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
24 return old, nil
25 // 如果本次计算的开始时间大于旧窗口的开始时间,则认为窗口过期尝试重置
26 } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
27 if la.updateLock.TryLock() {
28 old = bg.ResetBucketTo(old, bucketStart)
29 la.updateLock.Unlock()
30 return old, nil
31 } else {
32 runtime.Gosched()
33 }
34 ......
35 }
36}
总结
通过上面的介绍可以了解到在Sentienl-Go中实现底层指标的统计代码量并不多,本质是通过“时间轮”进行指标的数据统计和存储,在时间轮中借鉴slice的底层实现利用unsafe.Pointer和atomic配合对时间轮进行无锁的原子操作,极大的提升了性能
Sentinel-GO整体的数据结构图:
作者介绍
Github 账号:binbin0325,公众号:柠檬汁Code,Sentinel-Golang Committer 、ChaosBlade Committer 、 Nacos PMC 、Apache Dubbo-Go Committer。目前主要关注于混沌工程、中间件以及云原生方向。