前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >滑动时间窗口设计

滑动时间窗口设计

原创
作者头像
用户8000484
修改2022-02-18 18:06:29
1.8K0
修改2022-02-18 18:06:29
举报
文章被收录于专栏:分布式服务分布式服务

滑动时间窗口设计方法

导语:系统做出一系列调度要基于系统运行的统计指标,例如熔断(基于请求数、并发数、请求延迟、异常比例等),本文解析基于滑动时间窗口的统计结构设计办法。

什么是滑动时间窗口

固定窗口:一个固定长度的格子,这个格子里的所有事件元素就是统计目标

滑动窗口:滑动窗口将固定窗口等分为多个小的窗口,统计时可以圈定若干个连续窗口,统计落入其内的事件元素。显然滑动窗口可以做更细粒度上的统计。

滑动时间窗口:应用指标统计很重要一点是要与时间对齐,比如流控可能希望的是拿到前一秒的失败请求比例,所以在我们统计的指标都是需要与时间对齐。滑动时间窗口就是把一段时间片分为多个窗口,然后计算对应的时间落在那个窗口上,来对数据统计。

滑动时间窗口怎么运行

通过上面对滑动事件窗口的描述,我们可以知道滑动时间窗口有如下特点:

  1. 每个小窗口的大小均等
  2. 滑动窗口的个数及大小可以根据实际应用进行控制

那么对应的滑动时间窗口有两个重要设置:

  1. 滑动窗口的统计周期:表示滑动窗口的统计周期,一个滑动窗口包含有一个或多个小窗口
  2. 滑动窗口中每个小窗口长度:每个小窗口的统计周期

如上图,滑动窗口的统计周期是1000ms,每个小窗口的统计周期是200ms,一个滑动窗口有5个小滑动窗口

那么怎么做具体的统计呢?

如上,

  1. 每个小窗口都是一个具体的数据结构,里面做一些统计相关的结构设计,用户可以自定义这些结构
  2. 每个小窗口都有1个开始时间和1个结束时间,事件发生的时间落在哪个小窗口格子的起始区间内,那么对事件的统计就要落在这个小窗口内

接下来就是怎么计算事件落在哪个小窗口内呢?

假设事件发生的时间是now, 小窗口的统计周期也就是长度是bucketLength, 滑动窗口的统计周期也就是长度是windowLength, 那么小窗口的index计算如下:

举例如下:

如上图所示的滑动事件窗口参数,bucketLength=200ms, windowLength=1000ms,

  1. 当前时间是1ms, 按上述公式计算得index=0, 也就是会落入第1个小窗口格子内;
  2. 当前时间是300ms, 按上述公式计算得index=1,也就是会落入第2个小窗口格子内;
  3. 当前时间1001ms, 按上述公式计算得index=0,也就是会落入第1个小窗口格子内;

换句话说,我们知道事件发生的时间,就能知道事件落入哪个格子内,那么就能对事件做出相应统计计算。

从上述的计算不难看出,随着时间的推移,这个格子的计算就是在长度固定的数组内循环移动,或者在一个环形队列上循环移动。

那么如果已经过了第一遍循环,新的时间格子循环移动到前面,也就是进入到新的统计周期后,要做哪些操作呢?

  1. 把整个滑动窗口的起始时间设置为新的起始时间
  2. 把小窗口内数据结构重置后再进行新的统计

滑动时间窗口两个参数的实际意义

通过上述描述,我们已经知道滑动时间窗口的运行原理和使用方法,那么滑动时间窗口的两个参数对实际运行结果会产生怎样的影响呢?这对我们如何设置两个参数会产生决定作用。

滑动时间窗口的两个参数:

  1. 小窗口格子统计周期长度:bucketLength
  2. 滑动窗口统计周期长度: windowLength

小窗口格子越小,也就是实际记录事件的时间划分越细致,那我们得到的统计结果就会越精确,但与此同时,我们就会面对更大的存储计算和并发压力;

滑动窗口越大,也就是我们的统计周期变得更长,那么得到的统计结果就更平均,也就是图线就越平滑,相反的,窗口越小则观察统计周期就越小,那么统计结果的差异就会越大,也就是脉冲和毛刺就会越强烈,但是与实际情况会更贴近。

所以实际实现和运行中,我们要综合考虑系统的抗脉冲能力和并发能力,做出合理的设置。

代码示例

参考sentinel滑动窗口代码,简化最基础的实现部分并注释如下。

组件由两部分组成:

  1. 定长数组来表示滑动窗口,数组每个元素就是一个小窗口格子,格子是一个抽象对象,可以存储任意统计结构。
  2. 模拟时间滑动的算法,保证数据按照时间推移而做出正确滑动
代码语言:javascript
复制
package sliding_window

import (
   "errors"
   "sync"
   "sync/atomic"
)

// 小窗口格式
type Bucket struct {
   // 小窗口起始统计位置
   Start uint64
   // 存储实际自定义统计结构
   Value atomic.Value
}

// 用于模拟滑动窗口的定长数组
type BucketArray struct {
   // 数组长度
   length int
   // 实际存储的小窗口数组
   data []*Bucket
   // 保证并发安全的锁
   mutex sync.Mutex
}

// 初始化滑动窗口数组
func NewBucketArray(sampleCount int, bucketLength uint, now uint64, gen BucketItemGenerator) *BucketArray {
   ret := &BucketArray{
      length: sampleCount,
      data:   make([]*Bucket, sampleCount),
   }
   // 这里首先会根据当前时间now计算出其所对在的格子在数组中的下标idx以及该格子的统计起始时间startTime
   // startTime的意义是对齐时间
   idx := int((now / uint64(bucketLength)) % uint64(sampleCount))
   startTime := now - (now % uint64(bucketLength))

   // 从[idx, sampleCount-1] 会预先分配每个格子结构*Bucket,并且基于计算出的startTime依次往后填入对应格子的开始时间
   for i := idx; i <= sampleCount-1; i++ {
      b := &Bucket{
         Start: startTime,
         Value: atomic.Value{},
      }
      b.Value.Store(gen.NewEmptyBucket())
      ret.data[i] = b
      startTime += uint64(bucketLength)
   }

   // 从[0,idx-1] 这个区间也会预先分配每个格子结构*Bucket,不过需要注意的是,[0,idx-1] 里面的格子预先分配的时间也是未来的时间
   for i := 0; i < idx; i++ {
      b := &Bucket{
         Start: startTime,
         Value: atomic.Value{},
      }
      b.Value.Store(gen.NewEmptyBucket())
      ret.data[i] = b
      startTime += uint64(bucketLength)
   }

   return ret
}

// 根据索引获取bucket
func (ba *BucketArray) get(idx int) *Bucket {
   ba.mutex.Lock()
   defer ba.mutex.Unlock()

   return ba.data[idx]
}

// 基于时间做位置划分的滑动窗口
type SlidingWindowArray struct {
   // bucket长度,这里是基于时间划分,那么bucket长度就是时间间隔,例如200ms
   bucketLength uint
   // 一个滑动窗口包含的小窗口个数,bucket长度和包含的小窗口个数决定了滑动窗口的统计周期也就是长度
   sampleCount uint
   // 一个滑动时间窗口长度,由上述两个参数确定,如sampleCount=5,bucketLength=200ms, 则interval=1000ms
   interval uint
   // 实际存储bucket的数组结构
   array *BucketArray
   // 用于并发安全操作的锁
   mutex sync.Mutex
}

// 初始化滑动时间窗口
func NewSlidingWindowArray(sampleCount uint, bucketLength uint,
   nowInMs uint64, gen BucketItemGenerator) *SlidingWindowArray {
   return &SlidingWindowArray{
      bucketLength: bucketLength,
      sampleCount:  sampleCount,
      interval:     bucketLength * sampleCount,
      array:        NewBucketArray(int(sampleCount), bucketLength, nowInMs, gen),
   }
}

// 给定一个时间点,返回所属的bucket,从而可以对bucket做事件统计记录或者查询, 这个实际上就是模拟了全部的滑动过程
func (swa *SlidingWindowArray) CurrentBucketOfTime(now uint64, bg BucketItemGenerator) (*Bucket, error) {
   timeId := now / uint64(swa.bucketLength)
   idx := int(timeId) % swa.array.length

   bucketStart := now - (now % uint64(swa.bucketLength))

   old := swa.array.get(idx)
   if bucketStart == atomic.LoadUint64(&old.Start) {
      return old, nil
   } else if bucketStart > atomic.LoadUint64(&old.Start) {
      // 实际上已经进入新的时间格子循环了
      swa.mutex.Lock()
      // 进入新的循环是要清空原先统计数据的
      old = bg.ResetBucket(old, bucketStart)
      swa.mutex.Unlock()
      return old, nil
   } else {
      // 获取到未来时间格子,暂时不可能发生,记做失败
      return nil, errors.New("future bucket")
   }
}

func (swa *SlidingWindowArray) ValuesConditional(now uint64, predicate func(uint64) bool) []*Bucket {
   if now <= 0 {
      return make([]*Bucket, 0)
   }
   ret := make([]*Bucket, 0, swa.array.length)
   for i := 0; i < swa.array.length; i++ {
      sb := swa.array.get(i)
      if sb == nil || (now-atomic.LoadUint64(&sb.Start)) > uint64(swa.interval) || !predicate(atomic.
         LoadUint64(&sb.Start)) {
         continue
      }
      ret = append(ret, sb)
   }
   return ret
}

// Bucket实际结构的生成接口
type BucketItemGenerator interface {
   // 生成格子里的实际统计结构
   NewEmptyBucket() interface{}
   // 用于格子统计数据的重置清空
   ResetBucket(b *Bucket, startTime uint64) *Bucket
}

使用示例:

代码语言:javascript
复制
// 统计qps

// 用于统计QPS的实际结构
type MetricBucketItem struct {
   Counter int64
}

type SlidingWindowMetric struct {
   data *SlidingWindowArray
}

func (swm *SlidingWindowMetric) NewEmptyBucket() interface{} {
   return &MetricBucketItem{Counter: 0}
}

func (swm *SlidingWindowMetric) ResetBucket(bw *Bucket, startTime uint64) *Bucket {
   atomic.StoreUint64(&bw.Start, startTime)
   bw.Value.Store(&MetricBucketItem{Counter: 0})
   return bw
}

func NewSlidingWindowMetric(sampleCount uint32, bucketLength uint32) *SlidingWindowMetric {
   nowInMs := time.Now().UnixNano() / 1e6

   swm := &SlidingWindowMetric{}
   swm.data = NewSlidingWindowArray(uint(sampleCount), uint(bucketLength), uint64(nowInMs), swm)
   return swm
}

// 有访问就添加计数
func (swm *SlidingWindowMetric) AddCount(count int64) {
   nowInMs := time.Now().UnixNano() / 1e6
   // 获取当前时间应当落入的格子
   curBucket, err := swm.data.CurrentBucketOfTime(uint64(nowInMs), swm)

   if err != nil {
      fmt.Printf("get bucket from SlidingWindowArray err, %v", err)
      return
   }
   if curBucket == nil {
      fmt.Printf("current bucket is nil")
      return
   }
   mb := curBucket.Value.Load()
   if mb == nil {
      fmt.Printf("nil bucket")
      return
   }
   b, ok := mb.(*MetricBucketItem)
   if !ok {
      fmt.Printf("fail to type assert")
      return
   }

   // 将相应统计结构计数做累加
   atomic.AddInt64(&b.Counter, count)
}

func (swm *SlidingWindowMetric) GetQPS() float64 {
   now := uint64(time.Now().UnixNano() / 1e6)
   startTime := now - (now % uint64(swm.data.bucketLength))
   end := startTime
   start := end - uint64(swm.data.interval) + uint64(swm.data.bucketLength)
   satisfiedBuckets := swm.data.ValuesConditional(now, func(ws uint64) bool {
      return ws >= start && ws <= end
   })

   // 获取符合条件的bucket内所有计数
   ret := int64(0)
   for _, sb := range satisfiedBuckets {
      mb := sb.Value.Load()
      if mb == nil {
         fmt.Printf("nil BucketWrap")
         continue
      }
      counter, ok := mb.(*MetricBucketItem)
      if !ok {
         fmt.Printf("type assert failed")
         continue
      }
      ret += counter.Counter
   }
   // 假设interval 长度设为ms
   return float64(ret) / (float64(swm.data.interval) / 1000.00)
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 滑动时间窗口设计方法
    • 什么是滑动时间窗口
      • 滑动时间窗口怎么运行
        • 滑动时间窗口两个参数的实际意义
          • 代码示例
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
          http://www.vxiaotou.com