您好,登錄后才能下訂單哦!
這篇文章主要介紹了Go結合Redis怎么實現分布式鎖,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
如果熟悉Redis的命令,可能會馬上想到使用Redis的set if not exists操作來實現,并且現在標準的實現方式是SET resource_name my_random_value NX PX 30000這串命令,其中:
resource_name表示要鎖定的資源
NX表示如果不存在則設置
PX 30000表示過期時間為30000毫秒,也就是30秒
my_random_value這個值在所有的客戶端必須是唯一的,所有同一key的獲取者(競爭者)這個值都不能一樣。
value的值必須是隨機數主要是為了更安全的釋放鎖,釋放鎖的時候使用腳本告訴Redis:只有key存在并且存儲的值和我指定的值一樣才能告訴我刪除成功。可以通過以下Lua腳本實現:
if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
舉個例子:客戶端A取得資源鎖,但是緊接著被一個其他操作阻塞了,當客戶端A運行完畢其他操作后要釋放鎖時,原來的鎖早已超時并且被Redis自動釋放,并且在這期間資源鎖又被客戶端B再次獲取到。
使用Lua腳本是因為判斷和刪除是兩個操作,所以有可能A剛判斷完鎖就過期自動釋放了,然后B就獲取到了鎖,然后A又調用了Del,導致把B的鎖給釋放了。
package main import ( "context" "errors" "fmt" "github.com/brianvoe/gofakeit/v6" "github.com/go-redis/redis/v8" "sync" "time" ) var client *redis.Client const unlockScript = ` if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end` func lottery(ctx context.Context) error { // 加鎖 myRandomValue := gofakeit.UUID() resourceName := "resource_name" ok, err := client.SetNX(ctx, resourceName, myRandomValue, time.Second*30).Result() if err != nil { return err } if !ok { return errors.New("系統繁忙,請重試") } // 解鎖 defer func() { script := redis.NewScript(unlockScript) script.Run(ctx, client, []string{resourceName}, myRandomValue) }() // 業務處理 time.Sleep(time.Second) return nil } func main() { client = redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() ctx, _ := context.WithTimeout(context.Background(), time.Second*3) err := lottery(ctx) if err != nil { fmt.Println(err) } }() go func() { defer wg.Done() ctx, _ := context.WithTimeout(context.Background(), time.Second*3) err := lottery(ctx) if err != nil { fmt.Println(err) } }() wg.Wait() }
我們先看lottery()函數,這里模擬一個抽獎操作,在進入函數時,先使用SET resource_name my_random_value NX PX 30000加鎖,這里使用UUID作為隨機值,如果操作失敗,直接返回,讓用戶重試,如果成功在defer里面執行解鎖邏輯,解鎖邏輯就是執行前面說到得lua腳本,然后再進行業務處理。
我們在main()函數里面執行了兩個goroutine并發調用lottery()函數,其中有一個操作會因為拿不到鎖而直接失敗。
生成隨機值
使用SET resource_name my_random_value NX PX 30000加鎖
如果加鎖失敗,直接返回
defer添加解鎖邏輯,保證在函數退出的時候會執行
執行業務邏輯
在單實例情況下,如果這個實例掛了,那么所有請求都會因為拿不到鎖而失敗,所以我們需要多個分布在不同機器上的Redis實例,并且拿到其中大多數節點的鎖才能加鎖成功,這也就是RedLock算法。它其實也是基于上面的單實例算法的,只是我們需要同時對多個Redis實例獲取鎖。
package main import ( "context" "errors" "fmt" "github.com/brianvoe/gofakeit/v6" "github.com/go-redis/redis/v8" "sync" "time" ) var clients []*redis.Client const unlockScript = ` if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end` func lottery(ctx context.Context) error { // 加鎖 myRandomValue := gofakeit.UUID() resourceName := "resource_name" var wg sync.WaitGroup wg.Add(len(clients)) // 這里主要是確保不要加鎖太久,這樣會導致業務處理的時間變少 lockCtx, _ := context.WithTimeout(ctx, time.Millisecond*5) // 成功獲得鎖的Redis實例的客戶端 successClients := make(chan *redis.Client, len(clients)) for _, client := range clients { go func(client *redis.Client) { defer wg.Done() ok, err := client.SetNX(lockCtx, resourceName, myRandomValue, time.Second*30).Result() if err != nil { return } if !ok { return } successClients <- client }(client) } wg.Wait() // 等待所有獲取鎖操作完成 close(successClients) // 解鎖,不管加鎖是否成功,最后都要把已經獲得的鎖給釋放掉 defer func() { script := redis.NewScript(unlockScript) for client := range successClients { go func(client *redis.Client) { script.Run(ctx, client, []string{resourceName}, myRandomValue) }(client) } }() // 如果成功加鎖得客戶端少于客戶端數量的一半+1,表示加鎖失敗 if len(successClients) < len(clients)/2+1 { return errors.New("系統繁忙,請重試") } // 業務處理 time.Sleep(time.Second) return nil } func main() { clients = append(clients, redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", DB: 0, }), redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", DB: 1, }), redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", DB: 2, }), redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", DB: 3, }), redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", DB: 4, })) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() ctx, _ := context.WithTimeout(context.Background(), time.Second*3) err := lottery(ctx) if err != nil { fmt.Println(err) } }() go func() { defer wg.Done() ctx, _ := context.WithTimeout(context.Background(), time.Second*3) err := lottery(ctx) if err != nil { fmt.Println(err) } }() wg.Wait() time.Sleep(time.Second) }
在上面的代碼中,我們使用Redis的多數據庫模擬多個Redis master實例,一般我們會選擇5個Redis實例,真實環境中這些實例應該是分布在不同機器上的,避免同時失效。
在加鎖邏輯里,我們主要是對每個Redis實例執行SET resource_name my_random_value NX PX 30000獲取鎖,然后把成功獲取鎖的客戶端放到一個channel里(這里使用slice可能有并發問題),同時使用sync.WaitGroup等待所以獲取鎖操作結束。
然后添加defer釋放鎖邏輯,釋放鎖邏輯很簡單,只是把成功拿到的鎖給釋放掉即可。
最后判斷成功獲取到的鎖的數量是否大于一半,如果沒有得到一半以上的鎖,說明加鎖失敗。
如果加鎖成功接下來就是進行業務處理。
生成隨機值
并發給每個Redis實例使用SET resource_name my_random_value NX PX 30000
加鎖
等待所有獲取鎖操作完成
defer添加解鎖邏輯,保證在函數退出的時候會執行,這里先defer再判斷是因為有可能獲取到一部分Redis實例的鎖,但是因為沒有超過一半,還是會判斷為加鎖失敗
判斷是否拿到一半以上Redis實例的鎖,如果沒有說明加鎖失敗,直接返回
執行業務邏輯
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Go結合Redis怎么實現分布式鎖”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。