您好,登錄后才能下訂單哦!
這篇文章主要介紹了go語言中的限流漏桶和令牌桶庫怎么使用的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇go語言中的限流漏桶和令牌桶庫怎么使用文章都會有所收獲,下面我們一起來看看吧。
在大數據量高并發訪問時,經常會出現服務或接口面對大量的請求而導致數據庫崩潰的情況,甚至引發連鎖反映導致整個系統崩潰。或者有人惡意攻擊網站,大量的無用請求出現會導致緩存穿透的情況出現。使用限流中間件可以在短時間內對請求進行限制數量,起到降級的作用,從而保障了網站的安全性。
使用消息中間件進行統一限制(降速)
使用限流方案將多余請求返回(限流)
升級服務器
緩存(但仍然有緩存穿透等危險)
等等
可以看出在代碼已經無法提升的情況下,只能去提升硬件水平。或者改動架構再加一層!也可以使用消息中間件統一處理。而結合看來,限流方案是一種既不需要大幅改動也不需要高額開銷的策略。
令牌桶算法
漏桶算法
滑動窗口算法
等等
go get -u go.uber.org/ratelimit
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}
該函數使用了函數選項模式對多個結構體對象進行初始化
根據傳入的值來初始化一個桶結構體 rate
為int
傳參 。
初始化過程中包括了
每一滴水需要的時間 perquest = config.per / time.Duration(rate)
maxSlack
寬松度(寬松度為負值)-1 * time.Duration(config.slack) * perRequest
松緊度是用來規范等待時間的
// Clock is the minimum necessary interface to instantiate a rate limiter with
// a clock or mock clock, compatible with clocks created using
// github.com/andres-erbsen/clock.
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
同時還需要結構體Clock
來記錄當前請求的時間now
和此刻的請求所需要花費等待的時間sleep
type state struct {
last time.Time
sleepFor time.Duration
}
state
主要用來記錄上次執行的時間以及當前執行請求需要花費等待的時間(作為中間狀態記錄)
func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// 計算是否需要進行等待取水操作
newState.sleepFor += t.perRequest(每兩滴水之間的間隔時間) - now.Sub(oldState.last)(當前時間與上次取水時間的間隔)
// 如果等待取水時間特別小,就需要松緊度進行維護
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果等待時間大于0,就進行更新
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
// 最后返回需要等待的時間
return newState.last
}
實現一個Take方法
該Take方法會進行原子性操作(可以理解為加鎖和解鎖),在大量并發請求下仍可以保證正常使用。
記錄下當前的時間 now := t.clock.Now()
oldState.last.IsZero()
判斷是不是第一次取水,如果是就直接將state
結構體中的值進行返回。而這個結構體中初始化了上次執行時間,如果是第一次取水就作為當前時間直接傳參。
如果 newState.sleepFor
非常小,就會出現問題,因此需要借助寬松度,一旦這個最小值比寬松度小,就用寬松度對取水時間進行維護。
如果newState.sleepFor > 0
就直接更新結構體中上次執行時間newState.last = newState.last.Add(newState.sleepFor)
并記錄需要等待的時間interval, newState.sleepFor = newState.sleepFor, 0
。
如果允許取水和等待操作,那就說明沒有發生并發競爭的情況,就模擬睡眠時間t.clock.Sleep(interval)
。然后將取水的目標時間進行返回,由服務端代碼來判斷是否打回響應或者等待該時間后繼續響應。
func (c *clock) Sleep(d time.Duration) { time.Sleep(d) }
實際上在一個請求來的時候,限流器就會進行睡眠對應的時間,并在睡眠后將最新取水時間返回。
func ratelimit1() func(ctx *gin.Context) {
r1 := rate1.New(100)
return func(ctx *gin.Context) {
now := time.Now()
// Take 返回的是一個 time.Duration的時間
if r1.Take().Sub(now) > 0 {
// 返回的時間比當前的時間還大,說明需要進行等待
// 如果需要等待, 就 time.Sleep(r1.Take().Sub(now())) 然后放行
// 如果不需要等待請求時間,就直接進行Abort 然后返回
response(ctx, http.StatusRequestTimeout, "rate1 limit...")
fmt.Println("rate1 limit...")
ctx.Abort()
return
}
// 放行
ctx.Next()
}
}
這里你可以進行選擇是否返回。因為Take一定會執行sleep函數,所以當執行take結束后表示當前請求已經接到了水。當前演示使用第一種情況。
如果你的業務要求響應不允許進行等待。那么可以在該請求接完水之后然后,如上例。
如果你的業務允許響應等待,那么該請求等待對應的接水時間后進行下一步。具體代碼就是將if
中的內容直接忽略。(建議使用)
這里定義了一個響應函數和一個handler
函數方便測試
func response(c *gin.Context, code int, info any) {
c.JSON(code, info)
}
func pingHandler(c *gin.Context) {
response(c, 200, "ping ok~")
}
執行go test -run=Run -v
先開啟一個web服務
func TestRun(t *testing.T) {
r := gin.Default()
r.GET("/ping1", ratelimit1(), pingHandler)
r.GET("/ping2", ratelimit2(), helloHandler)
_ = r.Run(":4399")
}
使用接口壓力測試工具go-wrk
進行測試->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通過go install github.com/tsliwowicz/go-wrk@latest
下載
Usage: go-wrk <options> <url>
Options:
-H Header to add to each request (you can define multiple -H flags) (Default )
-M HTTP method (Default GET)
-T Socket/request timeout in ms (Default 1000)
-body request body string or @filename (Default )
-c Number of goroutines to use (concurrent connections) (Default 10)
-ca CA file to verify peer against (SSL/TLS) (Default )
-cert CA certificate file to verify peer against (SSL/TLS) (Default )
-d Duration of test in seconds (Default 10)
-f Playback file name (Default <empty>)
-help Print help (Default false)
-host Host Header (Default )
-http Use HTTP/2 (Default true)
-key Private key file name (SSL/TLS (Default )
-no-c Disable Compression - Prevents sending the "Accept-Encoding: gzip" header (Default false)
-no-ka Disable KeepAlive - prevents re-use of TCP connections between different HTTP requests (Default false)
-no-vr Skip verifying SSL certificate of the server (Default false)
-redir Allow Redirects (Default false)
-v Print version details (Default false)
-t 8個線程 -c 400個連接 -n 模擬100次請求 -d 替換-n 表示連接時間
輸入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流積攢(壓測速度過快)。
可以看出,89
個請求全部返回。也就是說在一段請求高峰期,不會有請求進行響應。因此我認為既然內部已經睡眠,那么就也就應該對請求放行處理。
引入ratelimit
庫
go get -u github.com/juju/ratelimit
// NewBucket returns a new token bucket that fills at the
// rate of one token every fillInterval, up to the given
// maximum capacity. Both arguments must be
// positive. The bucket is initially full.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
// NewBucketWithClock is identical to NewBucket but injects a testable clock
// interface.
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
進行Bucket
桶的初始化。
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
// 填充速率
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
// 最大令牌容量
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
// 單次令牌生成量
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}
令牌桶初始化過程,初始化結構體 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三個變量有一個小于或者等于0的話直接進行報錯返回。在最開始就將當前令牌數初始化為最大容量。
// TakeAvailable takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
調用TakeAvailable
函數,傳入參數為需要取出的令牌數量,返回參數是實際能夠取出的令牌數量。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
// 如果需要取出的令牌數小于等于零,那么就返回0個令牌
if count <= 0 {
return 0
}
// 根據時間對當前桶中令牌數進行計算
tb.adjustavailableTokens(tb.currentTick(now))
// 計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌
if tb.availableTokens <= 0 {
return 0
}
// 如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數
if count > tb.availableTokens {
count = tb.availableTokens
}
// 調整令牌數
tb.availableTokens -= count
return count
}
如果需要取出的令牌數小于等于零,那么就返回0個令牌
根據時間對當前桶中令牌數進行計算
計算之后的令牌總數小于等于0,說明當前令牌不足取出,那么就直接返回0個令牌
如果當前存儲的令牌數量多于請求數量,那么就返回取出令牌數
調整令牌數
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
// 如果當前令牌數大于最大等于容量,直接返回最大容量
if tb.availableTokens >= tb.capacity {
return
}
// 當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)
tb.availableTokens += (tick - lastTick) * tb.quantum
// 如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}
如果當前令牌數大于最大等于容量,直接返回最大容量
當前令牌數 += (當前時間 - 上次取出令牌數的時間) * quannum(每次生成令牌量)
如果當前令牌數大于最大等于容量, 將當前令牌數 = 最大容量 然后返回 當前令牌數
加鎖 defer
解鎖
判斷count(想要取出的令牌數) 是否小于等于 0,如果是直接返回 0
調用函數adjustTokens
獲取可用的令牌數量
如果當前可以取出的令牌數小于等于0 直接返回 0
如果當前可以取出的令牌數小于當前想要取出的令牌數(count) count = 當前可以取出的令牌數
當前的令牌數 -= 取出的令牌數 (count)
返回 count(可以取出的令牌數)
take
函數,能夠返回等待時間和布爾值,允許欠賬,沒有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函數,可以根據最大等待時間來進行判斷。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因為他們內部的實現都基于令牌調整,我這里不做過多介紹,如果感興趣可以自行研究一下。
func ratelimit2() func(ctx *gin.Context) {
// 生成速率 最大容量
r2 := rate2.NewBucket(time.Second, 200)
return func(ctx *gin.Context) {
//r2.Take() // 允許欠賬,令牌不夠也可以接收請求
if r2.TakeAvailable(1) == 1 {
// 如果想要取出1個令牌并且能夠取出,就放行
ctx.Next()
return
}
response(ctx, http.StatusRequestTimeout, "rate2 limit...")
ctx.Abort()
return
}
}
壓測速度過于快速,在實際過程中可以根據調整令牌生成速率來進行具體限流!
關于“go語言中的限流漏桶和令牌桶庫怎么使用”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“go語言中的限流漏桶和令牌桶庫怎么使用”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。