1. 简介
github仓库地址:https://github.com/uber-go/ratelimit
文档地址:https://pkg.go.dev/go.uber.org/ratelimit
ratelimit 是一个 Go 语言实现的基于漏桶算法的限流库,它的实现根据不同请求之间的时间差来填充漏桶,而不是用一个固定频率的时钟来填充。
系统为了防止瞬时流量过大,所造成的本服务、数据库、第三方服务接口请求的压力过大,通常需要对请求进行限流。
2. 使用
以下示例引用自官方仓库的 README 文件的使用示例。
ratelimit 的使用非常简单,先创建一个 ratelimit 对象,指定每秒的频率限制(RPS),然后通过 Take 方法来获取频控,程序将会阻塞直至得到当前一次频控。
import (
"fmt"
"time"
"go.uber.org/ratelimit"
)
func main() {
rl := ratelimit.New(100) // 创建一个 ratelimit 对象,指定每秒的频率限制
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take() // 获取频控
fmt.Println(i, now.Sub(prev))
prev = now
}
}
New 函数创建的限流器默认是每秒的频率限制,通过配置参数可以改变时间间隔。
rl := ratelimit.New(100) // 创建一个每秒100次限制的限流器
rl := ratelimit.New(10, ratelimit.Per(time.Minute)) // 创建一个每分钟10次限制的限流器
3. 开发文档
3.1 类型
Limiter 是限流器接口,用来对某种过程进行频率控制,支持跨协程间的控制。程序需要通过调用 Take 方法来获取频控,获取不到时将会阻塞等待。
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
Option 是限流器的配置参数。
type Option interface {
// contains filtered or unexported methods
}
// WithoutSlack 让限流器无松弛量
var WithoutSlack Option = slackOption(0)
Clock 是创建一个带有时钟的限流器的最简化接口。
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
3.2 函数
// New 创建指定每秒频控的限流器
func New(rate int, opts ...Option) Limiter
// NewUnlimited 创建无限频控的限流器
func NewUnlimited() Limiter
// Per 指定时间间隔的选项,默认时间间隔是秒,可以指定为分钟等其他时间间隔
func Per(per time.Duration) Option
// WithClock 指定时钟的选项,替换默认的定时时钟,通常用来mock做测试
func WithClock(clock Clock) Option
// WithSlack 指定松弛量的选项
func WithSlack(slack int) Option
4. 源码分析
4.1 最大松弛量
传统的漏桶算法对于每个请求的时间间隔是固定的。例如一个 100 次每秒的限流器,每次请求的时间间隔为 10ms,当一个请求到来后,要至少等待 10ms 之后才能处理下一个请求。如果其中两个请求之间间隔大于 10ms,那这一秒限流器支持的请求限制就必然会小于 100 次。
而在实际使用中,流量经常是突发性的,是时间间隔不稳定的。例如第一次请求 15ms 后产生第二次请求,然后 5ms 后产生第三次请求,第三次请求就因为和第二次请求不足 10ms 而需要等待 5ms,三个请求需要消耗 25ms。
ratelimit 对漏桶算法做了一些改良,引入了最大松弛量(maxSlack)的概念。对于以上的例子,因为从请求一到请求二多等了 5ms,可以把这 5ms 挪给请求三使用,因而请求三无需等待,直接处理。三个请求只需要消耗 20ms。
但是如果两次请求之间时间间隔过久,比如高出平均时间间隔一两个数量级,那后面多个请求到来时,这个松弛量会被立刻消耗完,这里也就失去了限流的意义。为了防止这种情况,ratelimit 引入最大松弛量(maxSlack),表示允许抵消的最长时间。
maxSlack 等于平均时间间隔乘以松弛量 slack,slack 的默认值是 10,可以在创建限流器时指定设置,或者设置为 0 表示不用松弛量。
// 平均时间间隔 = 统计时间 / 频次
perRequest = config.per / time.Duration(rate)
// 最大松弛量 = slack * 平均时间间隔
maxSlack = time.Duration(config.slack) * perRequest
4.2 选项
ratelimit 的选项有 3 种,定义在结构体 config 的三个成员变量中。
// config configures a limiter.
type config struct {
clock Clock
slack int
per time.Duration
}
配置选项 Option 是一个接口类型,只要实现了 apply 方法就相当于实现了 Option 接口,也就是将设置写入结构体 config 中。clockOption、slackOption、perOption 三个类型都实现了 apply 方法,分别将三种选项写入结构体 config 中。
// Option configures a Limiter.
type Option interface {
apply(*config)
}
type clockOption struct {
clock Clock
}
func (o clockOption) apply(c *config) {
c.clock = o.clock
}
type slackOption int
func (o slackOption) apply(c *config) {
c.slack = int(o)
}
type perOption time.Duration
func (p perOption) apply(c *config) {
c.per = time.Duration(p)
}
在应用配置选项的函数 buildConfig 中就是针对 Option 切片依次执行 apply 方法。该函数由 New 函数调用,调用处就可以很灵活地传 0 到多个选项。
// buildConfig combines defaults with options.
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
for _, opt := range opts {
opt.apply(&c)
}
return c
}
4.3 创建
Limiter 是限流器接口,结构体 atomicInt64Limiter 实现了该接口。
type Limiter interface {
Take() time.Time
}
type atomicInt64Limiter struct {
prepadding [64]byte // 避免伪共享
state int64 // 下个允许频控获取的纳秒时间戳,初始化为0
postpadding [56]byte // 避免伪共享,减掉了 state 字段的 8 字节
perRequest time.Duration // 平均时间间隔
maxSlack time.Duration // 最大松弛量
clock Clock
}
New 函数创建一个新的限流器。默认统计时间为秒,创建参数 rate 表示频次,创建一个 rate 次每秒的限流器。也可以通过配置选项设置成分钟或其他,创建一个分钟级或其他时间级的限流器。
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicInt64Based(rate, opts...)
}
// newAtomicBased returns a new atomic based limiter.
func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {
config := buildConfig(opts) // 应用配置选项
perRequest := config.per / time.Duration(rate) // 平均时间间隔 = 统计时间 / 频次
l := &atomicInt64Limiter{ // 初始化 atomicInt64Limiter 对象
perRequest: perRequest,
maxSlack: time.Duration(config.slack) * perRequest, // 最大松弛量
clock: config.clock,
}
atomic.StoreInt64(&l.state, 0)
return l
}
4.4 获取频控
程序使用 Take 方法获取频控。
func (t *atomicInt64Limiter) Take() time.Time {
var (
newTimeOfNextPermissionIssue int64 // 下次频控获取的纳秒时间戳
now int64
)
for {
now = t.clock.Now().UnixNano() // 当前时间的纳秒时间戳
timeOfNextPermissionIssue := atomic.LoadInt64(&t.state) // 限流器记录的允许频控获取的纳秒时间戳
switch { // 设置下次频控获取的纳秒时间戳
case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):
// 第一次获取频控 or 最大松弛量为0且现在比允许频控获取的时间超出一次间隔,现在可以获取频控
newTimeOfNextPermissionIssue = now
case t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack):
// 现在比允许频控获取的时间超出最大松弛量,减去最大松弛量
newTimeOfNextPermissionIssue = now - int64(t.maxSlack)
default:
// 上次频控获取加上平均时间间隔
newTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)
}
// 当取出 state 的值与当前 state 的值相等,证明 state 没有被其他协程修改
// 获取当前调用的下次频控获取的纳秒时间戳,并设置 state,退出循环
if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {
break
}
}
sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)
if sleepDuration > 0 { // 休眠直到下次频控获取时间,返回该时间
t.clock.Sleep(sleepDuration)
return time.Unix(0, newTimeOfNextPermissionIssue)
}
// 无需休眠,直接返回当前时间
return time.Unix(0, now)
}