开发者

golang 实现时间滑动窗口的示例代码

开发者 https://www.devze.com 2022-12-03 14:25 出处:网络 作者: wangxiaoangg
目录一 概念二 go-zero中的滑动窗口实现1.Bucket 样本窗口2. window 滑动窗口3. RollingWindow窗口三 使用一 概念
目录
  • 一 概念
  • 二 go-zero中的滑动窗口实现
    • 1.Bucket 样本窗口
    • 2. window 滑动窗口
    • 3. RollingWindow窗口
  • 三 使用

    一 概念

    固定窗口就像是滑动窗口的一个特例,固定窗口是大小固定且不能随着时间而变化的。

    滑动时间窗口就是把一段时间片分为多个样本窗口,可以通过javascript更细粒度对数据进行统计。然后计算对应的时间落在那个窗口上,来对数据统计;滑动时间窗口,随着时间流失,最开始的样本窗口将会失效,同时会生成新的样本窗口。

    例如 我们将1s划分为4个样本窗口,每个样本窗口对应250ms。

    golang 实现时间滑动窗口的示例代码

    二 go-zero中的滑动窗口实现

    1.Bucket 样本窗口

    Bucket用于记录每个样本窗口的值

    // Bucket defines the bucket that holds sum and num of additions.
    type Bucket struct {
    	Sum   float64 //样本窗口的值
    	Count int64   //样本窗口被add的次数
    }
     
    func (b *Bucket) add(v float64) {
    	b.Sum += v
    	b.Count++
    }
     
    //重置样本窗口,样本窗口过期时
    func (b *Bucket) reset() {
    	b.Sum = 0
    	b.Count = 0
    }

    2. window 滑动窗口

     type window struct {
    	buckets []*Bucket //样本窗口
    	size    int //样本窗口个数
    }
     
    func newWindow(size int) *window {
    	buckets := make([]*Bucket, size)
    	for i := 0; i < size; i++ {
    		buckets[i] = new(Bucket)
    	}
    	return &window{
    		buckets: buckets,
    		size:    size,
    	}
    }
    func (w *window) add(offset int, v float64) {
    	w.buckets[offset%w.size].add(v)
    }
     
    func (w *window) reduce(start, count in开发者_C培训t, fn func(b *Bucket)) {
    	for i := 0; i < count; i++ {
    		fn(w.buckets[(start+i)%w.size])
    	}
    }
     
    func (w *window) resetBucket(offset int) {
    	w.buckets[offset%w.size].reset()
    }

    3. RollingWindow窗口

    bucket和window的实现都很简单,逻辑很好理解。

    RollingWindow相对复杂一些。

    当add值时需要如下操作:

    • 计算已经过期的bucket(样本窗口),将已经过期的bucket重置。
    • 计算offset,当前add操作应当记录到哪个bucket中。
     
    type (
    	// RollingWindowOption let callers customize the RollingWindow.
    	RollingWindowOption func(rollingWindow *RollingWindow)
     
    	// RollingWindow defines a rolling window to calculate the events in buckets with time interval.
    	RollingWindow struct {
    		lock          sync.RWMutex
    		size          int
    		win           *window
    		interval      time.Duration
    		offset        int
    		ignoreCurrent bool
    		lastpythonTime      time.Duration // start time of the last bucket
    	}
    )
     
    // NewRollingWindow returns a RollingWindow that with size buckets and time interval,
    // use opts to customize the RollingWindow.
    func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow {
    	if size < 1 {
    		panic("size must be greater than 0")
    	}
     
    	w := &RollingWindow{
    		size:     size,
    		win:      newWindow(size),
    		interval: interval,
    		lastTime: timex.Now(),
    	}
    	for _, opt := range opts {
    		opt(w)
    	}
    	return w
    }
     
    // Add adds value to current bucket.
    func (rw *RollingWindow) Add(v float64) {
    	rw.lock.Lock()
    	defer rw.lock.Unlock()
    	rw.updateOffset()
    	rw.win.add(rw.offset, v)
    }
     
    // Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
    func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
    	rw.lock.RLock()
    	defer rw.lock.RUnlock()
     
    	var diff int
    	//获取跨度,并计算还有几个bucket还在窗口期内
    	span := rw.span()
    	// ignore current bucket, because of partial data
    	if span == 0 && rw.ignoreCurrent {
    		diff = rw.size - 1
    	} else {
    		diff = rw.size - span
    	}
    	if diff > 0 {
    		offset := (rw.offset + span + 1) % rw.size
    		rw.win.reduce(offset, diff, fn)
    	}
    }
     
    //距离上次add操作跨度,
    //例如 lastTime = 1s, 当前时间1777ms。样本窗口时间250ms,那么跨度为3个编程客栈样本窗口
    func (rw *RollingWindow) span() int {
    	offset := int(timex.Since(rw.lastTime) / rw.interval)
    	if 0 <= offset && offset < rw.size {
    		return offset
    	}
     
    	return rw.size
    }
     
    //g
    func (rw *RollingWindow) updateOffset() {
    	span := rw.span()
    	if span <= 0 {
    		return
    	}
     
    	offset := rw.offset
    	// reset expired buckets ,重置已经超时的bucket
    	for i := 0; i < span; i++ {
    		rw.win.resetBucket((offset + i + 1) % rw.size)
    	}
     
    	rw.offset = (offset + span) % rw.size
    	now := timewww.devze.comx.Now()
    	//和样本窗口时间对齐
    	rw.lastTime = now - (now-rw.lastTime)%rw.interval
    }

    三 使用

    //1.新建一个4样本窗口,每个样本窗口250ms
    rollingWindow:= NewRollingWindow(4, time.Millisecond*250,IgnoreCurrentBucket())
     
    //2.add 
    rollingWindow.Add(1)
    rollingWindow.Add(2)
    time.Sleep(time.Millisecond*250)
     
    rollingWindow.Add(3)
    rollingWindow.Add(4)
     
     
    //3.获取滑动窗口的值
     
    var Sum float64
    var total int64
    rollingWindow.Reduce(func(b *collection.Bucket) {
    		Sum += int64(b.Sum)
    		total += b.Count
    	})

    到此这篇关于golang 实现时间滑动窗口的文章就介绍到这了,更多相关golang 时间滑动窗口内容请搜索我们以前的文章或继编程客栈续浏览下面的相关文章希望大家以后多多支持我们!

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号