本篇内容介绍了“Golang怎么实现常见的限流算法”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
固定窗口
每开启一个新的窗口,在窗口时间大小内,可以通过窗口请求上限个请求。
该算法主要是会存在临界问题,如果流量都集中在两个窗口的交界处,那么突发流量会是设置上限的两倍。
package limiter import ( "sync" "time" ) // FixedWindowLimiter 固定窗口限流器 type FixedWindowLimiter struct { limit int // 窗口请求上限 window time.Duration // 窗口时间大小 counter int // 计数器 lastTime time.Time // 上一次请求的时间 mutex sync.Mutex // 避免并发问题 } func NewFixedWindowLimiter(limit int, window time.Duration) *FixedWindowLimiter { return &FixedWindowLimiter{ limit: limit, window: window, lastTime: time.Now(), } } func (l *FixedWindowLimiter) TryAcquire() bool { l.mutex.Lock() defer l.mutex.Unlock() // 获取当前时间 now := time.Now() // 如果当前窗口失效,计数器清0,开启新的窗口 if now.Sub(l.lastTime) > l.window { l.counter = 0 l.lastTime = now } // 若到达窗口请求上限,请求失败 if l.counter >= l.limit { return false } // 若没到窗口请求上限,计数器+1,请求成功 l.counter++ return true }
滑动窗口
滑动窗口类似于固定窗口,它只是把大窗口切分成多个小窗口,每次向右移动一个小窗口,它可以避免两倍的突发流量。
固定窗口可以说是滑动窗口的一种特殊情况,只要滑动窗口里面的小窗口和大窗口大小一样。
窗口算法都有一个问题,当流量达到上限,后面的请求都会被拒绝。
package limiter import ( "errors" "sync" "time" ) // SlidingWindowLimiter 滑动窗口限流器 type SlidingWindowLimiter struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindow int64 // 小窗口时间大小 smallWindows int64 // 小窗口数量 counters map[int64]int // 小窗口计数器 mutex sync.Mutex // 避免并发问题 } // NewSlidingWindowLimiter 创建滑动窗口限流器 func NewSlidingWindowLimiter(limit int, window, smallWindow time.Duration) (*SlidingWindowLimiter, error) { // 窗口时间必须能够被小窗口时间整除 if window%smallWindow != 0 { return nil, errors.New("window cannot be split by integers") } return &SlidingWindowLimiter{ limit: limit, window: int64(window), smallWindow: int64(smallWindow), smallWindows: int64(window / smallWindow), counters: make(map[int64]int), }, nil } func (l *SlidingWindowLimiter) TryAcquire() bool { l.mutex.Lock() defer l.mutex.Unlock() // 获取当前小窗口值 currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow // 获取起始小窗口值 startSmallWindow := currentSmallWindow - l.smallWindow*(l.smallWindows-1) // 计算当前窗口的请求总数 var count int for smallWindow, counter := range l.counters { if smallWindow < startSmallWindow { delete(l.counters, smallWindow) } else { count += counter } } // 若到达窗口请求上限,请求失败 if count >= l.limit { return false } // 若没到窗口请求上限,当前小窗口计数器+1,请求成功 l.counters[currentSmallWindow]++ return true }
漏桶算法
漏桶是模拟一个漏水的桶,请求相当于往桶里倒水,处理请求的速度相当于水漏出的速度。
主要用于请求处理速率较为稳定的服务,需要使用生产者消费者模式把请求放到一个队列里,让消费者以一个较为稳定的速率处理。
package limiter import ( "sync" "time" ) // LeakyBucketLimiter 漏桶限流器 type LeakyBucketLimiter struct { peakLevel int // 最高水位 currentLevel int // 当前水位 currentVelocity int // 水流速度/秒 lastTime time.Time // 上次放水时间 mutex sync.Mutex // 避免并发问题 } func NewLeakyBucketLimiter(peakLevel, currentVelocity int) *LeakyBucketLimiter { return &LeakyBucketLimiter{ peakLevel: peakLevel, currentVelocity: currentVelocity, lastTime: time.Now(), } } func (l *LeakyBucketLimiter) TryAcquire() bool { l.mutex.Lock() defer l.mutex.Unlock() // 尝试放水 now := time.Now() // 距离上次放水的时间 interval := now.Sub(l.lastTime) if interval >= time.Second { // 当前水位-距离上次放水的时间(秒)*水流速度 l.currentLevel = maxInt(0, l.currentLevel-int(interval/time.Second)*l.currentVelocity) l.lastTime = now } // 若到达最高水位,请求失败 if l.currentLevel >= l.peakLevel { return false } // 若没有到达最高水位,当前水位+1,请求成功 l.currentLevel++ return true } func maxInt(a, b int) int { if a > b { return a } return b }
令牌桶
与漏桶算法的相反,令牌桶会不断地把令牌添加到桶里,而请求会从桶中获取令牌,只有拥有令牌地请求才能被接受。
因为桶中可以提前保留一些令牌,所以它允许一定地突发流量通过。
package limiter import ( "sync" "time" ) // TokenBucketLimiter 令牌桶限流器 type TokenBucketLimiter struct { capacity int // 容量 currentTokens int // 令牌数量 rate int // 发放令牌速率/秒 lastTime time.Time // 上次发放令牌时间 mutex sync.Mutex // 避免并发问题 } func NewTokenBucketLimiter(capacity, rate int) *TokenBucketLimiter { return &TokenBucketLimiter{ capacity: capacity, rate: rate, lastTime: time.Now(), } } func (l *TokenBucketLimiter) TryAcquire() bool { l.mutex.Lock() defer l.mutex.Unlock() // 尝试发放令牌 now := time.Now() // 距离上次发放令牌的时间 interval := now.Sub(l.lastTime) if interval >= time.Second { // 当前令牌数量+距离上次发放令牌的时间(秒)*发放令牌速率 l.currentTokens = minInt(l.capacity, l.currentTokens+int(interval/time.Second)*l.rate) l.lastTime = now } // 如果没有令牌,请求失败 if l.currentTokens == 0 { return false } // 如果有令牌,当前令牌-1,请求成功 l.currentTokens-- return true } func minInt(a, b int) int { if a < b { return a } return b }
滑动日志
滑动日志与滑动窗口算法类似,但是滑动日志主要用于多级限流的场景,比如短信验证码1分钟1次,1小时10次,1天20次这种业务。
算法流程与滑动窗口相同,只是它可以指定多个策略,同时在请求失败的时候,需要通知调用方是被哪个策略所拦截。
package limiter import ( "errors" "fmt" "sort" "sync" "time" ) // ViolationStrategyError 违背策略错误 type ViolationStrategyError struct { Limit int // 窗口请求上限 Window time.Duration // 窗口时间大小 } func (e *ViolationStrategyError) Error() string { return fmt.Sprintf("violation strategy that limit = %d and window = %d", e.Limit, e.Window) } // SlidingLogLimiterStrategy 滑动日志限流器的策略 type SlidingLogLimiterStrategy struct { limit int // 窗口请求上限 window int64 // 窗口时间大小 smallWindows int64 // 小窗口数量 } func NewSlidingLogLimiterStrategy(limit int, window time.Duration) *SlidingLogLimiterStrategy { return &SlidingLogLimiterStrategy{ limit: limit, window: int64(window), } } // SlidingLogLimiter 滑动日志限流器 type SlidingLogLimiter struct { strategies []*SlidingLogLimiterStrategy // 滑动日志限流器策略列表 smallWindow int64 // 小窗口时间大小 counters map[int64]int // 小窗口计数器 mutex sync.Mutex // 避免并发问题 } func NewSlidingLogLimiter(smallWindow time.Duration, strategies ...*SlidingLogLimiterStrategy) (*SlidingLogLimiter, error) { // 复制策略避免被修改 strategies = append(make([]*SlidingLogLimiterStrategy, 0, len(strategies)), strategies...) // 不能不设置策略 if len(strategies) == 0 { return nil, errors.New("must be set strategies") } // 排序策略,窗口时间大的排前面,相同窗口上限大的排前面 sort.Slice(strategies, func(i, j int) bool { a, b := strategies[i], strategies[j] if a.window == b.window { return a.limit > b.limit } return a.window > b.window }) fmt.Println(strategies[0], strategies[1]) for i, strategy := range strategies { // 随着窗口时间变小,窗口上限也应该变小 if i > 0 { if strategy.limit >= strategies[i-1].limit { return nil, errors.New("the smaller window should be the smaller limit") } } // 窗口时间必须能够被小窗口时间整除 if strategy.window%int64(smallWindow) != 0 { return nil, errors.New("window cannot be split by integers") } strategy.smallWindows = strategy.window / int64(smallWindow) } return &SlidingLogLimiter{ strategies: strategies, smallWindow: int64(smallWindow), counters: make(map[int64]int), }, nil } func (l *SlidingLogLimiter) TryAcquire() error { l.mutex.Lock() defer l.mutex.Unlock() // 获取当前小窗口值 currentSmallWindow := time.Now().UnixNano() / l.smallWindow * l.smallWindow // 获取每个策略的起始小窗口值 startSmallWindows := make([]int64, len(l.strategies)) for i, strategy := range l.strategies { startSmallWindows[i] = currentSmallWindow - l.smallWindow*(strategy.smallWindows-1) } // 计算每个策略当前窗口的请求总数 counts := make([]int, len(l.strategies)) for smallWindow, counter := range l.counters { if smallWindow < startSmallWindows[0] { delete(l.counters, smallWindow) continue } for i := range l.strategies { if smallWindow >= startSmallWindows[i] { counts[i] += counter } } } // 若到达对应策略窗口请求上限,请求失败,返回违背的策略 for i, strategy := range l.strategies { if counts[i] >= strategy.limit { return &ViolationStrategyError{ Limit: strategy.limit, Window: time.Duration(strategy.window), } } } // 若没到窗口请求上限,当前小窗口计数器+1,请求成功 l.counters[currentSmallWindow]++ return nil }