您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“Redis實現分布式鎖的五種方法是什么”,內容詳細,步驟清晰,細節處理妥當,希望這篇“Redis實現分布式鎖的五種方法是什么”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
單機數據一致性架構如下圖所示:多個可客戶訪問同一個服務器,連接同一個數據庫。
場景描述:客戶端模擬購買商品過程,在Redis
中設定庫存總數剩100個
,多個客戶端同時并發購買。
@RestController public class IndexController1 { @Autowired StringRedisTemplate template; @RequestMapping("/buy1") public String index(){ // Redis中存有goods:001號商品,數量為100 String result = template.opsForValue().get("goods:001"); // 獲取到剩余商品數 int total = result == null ? 0 : Integer.parseInt(result); if( total > 0 ){ // 剩余商品數大于0 ,則進行扣減 int realTotal = total -1; // 將商品數回寫數據庫 template.opsForValue().set("goods:001",String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:"+realTotal +"件, 服務端口為8001"); return "購買商品成功,庫存還剩:"+realTotal +"件, 服務端口為8001"; }else{ System.out.println("購買商品失敗,服務端口為8001"); } return "購買商品失敗,服務端口為8001"; } }
使用Jmeter
模擬高并發場景,測試結果如下:
測試結果出現多個用戶購買同一商品,發生了數據不一致問題!
解決辦法:單體應用的情況下,對并發的操作進行加鎖操作,保證對數據的操作具有原子性
synchronized
ReentrantLock
@RestController public class IndexController2 { // 使用ReentrantLock鎖解決單體應用的并發問題 Lock lock = new ReentrantLock(); @Autowired StringRedisTemplate template; @RequestMapping("/buy2") public String index() { lock.lock(); try { String result = template.opsForValue().get("goods:001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("goods:001", String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"); return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"; } else { System.out.println("購買商品失敗,服務端口為8001"); } } catch (Exception e) { lock.unlock(); } finally { lock.unlock(); } return "購買商品失敗,服務端口為8001"; } }
上面解決了單體應用的數據一致性問題,但如果是分布式架構部署呢,架構如下:
提供兩個服務,端口分別為8001
、8002
,連接同一個Redis
服務,在服務前面有一臺Nginx
作為負載均衡
兩臺服務代碼相同,只是端口不同
將8001
、8002
兩個服務啟動,每個服務依然用ReentrantLock
加鎖,用Jmeter
做并發測試,發現會出現數據一致性問題!
取消單機鎖,下面使用redis
的set
命令來實現分布式加鎖
SET KEY VALUE [EX seconds] [PX milliseconds] [NX|XX]
EX seconds 設置指定的到期時間(以秒為單位)
PX milliseconds 設置指定的到期時間(以毫秒為單位)
NX 僅在鍵不存在時設置鍵
XX 只有在鍵已存在時才設置
@RestController public class IndexController4 { // Redis分布式鎖的key public static final String REDIS_LOCK = "good_lock"; @Autowired StringRedisTemplate template; @RequestMapping("/buy4") public String index(){ // 每個人進來先要進行加鎖,key值為"good_lock",value隨機生成 String value = UUID.randomUUID().toString().replace("-",""); try{ // 加鎖 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); String result = template.opsForValue().get("goods:001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { int realTotal = total - 1; template.opsForValue().set("goods:001", String.valueOf(realTotal)); // 如果在搶到所之后,刪除鎖之前,發生了異常,鎖就無法被釋放, // 釋放鎖操作不能在此操作,要在finally處理 // template.delete(REDIS_LOCK); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"); return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"; } else { System.out.println("購買商品失敗,服務端口為8001"); } return "購買商品失敗,服務端口為8001"; }finally { // 釋放鎖 template.delete(REDIS_LOCK); } } }
上面的代碼,可以解決分布式架構中數據一致性問題。但再仔細想想,還是會有問題,下面進行改進。
在上面的代碼中,如果程序在運行期間,部署了微服務jar
包的機器突然掛了,代碼層面根本就沒有走到finally
代碼塊,也就是說在宕機前,鎖并沒有被刪除掉,這樣的話,就沒辦法保證解鎖
所以,這里需要對這個key
加一個過期時間,Redis
中設置過期時間有兩種方法:
template.expire(REDIS_LOCK,10, TimeUnit.SECONDS)
template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS)
第一種方法需要單獨的一行代碼,且并沒有與加鎖放在同一步操作,所以不具備原子性,也會出問題
第二種方法在加鎖的同時就進行了設置過期時間,所有沒有問題,這里采用這種方式
調整下代碼,在加鎖的同時,設置過期時間:
// 為key加一個過期時間,其余代碼不變 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK,value,10L,TimeUnit.SECONDS);
這種方式解決了因服務突然宕機而無法釋放鎖的問題。但再仔細想想,還是會有問題,下面進行改進。
方式二設置了key
的過期時間,解決了key
無法刪除的問題,但問題又來了
上面設置了key
的過期時間為10
秒,如果業務邏輯比較復雜,需要調用其他微服務,處理時間需要15
秒(模擬場
景,別較真),而當10
秒鐘過去之后,這個key
就過期了,其他請求就又可以設置這個key
,此時如果耗時15
秒
的請求處理完了,回來繼續執行程序,就會把別人設置的key
給刪除了,這是個很嚴重的問題!
所以,誰上的鎖,誰才能刪除
@RestController public class IndexController6 { public static final String REDIS_LOCK = "good_lock"; @Autowired StringRedisTemplate template; @RequestMapping("/buy6") public String index(){ // 每個人進來先要進行加鎖,key值為"good_lock" String value = UUID.randomUUID().toString().replace("-",""); try{ // 為key加一個過期時間 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); String result = template.opsForValue().get("goods:001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此處需要調用其他微服務,處理時間較長。。。 int realTotal = total - 1; template.opsForValue().set("goods:001", String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"); return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"; } else { System.out.println("購買商品失敗,服務端口為8001"); } return "購買商品失敗,服務端口為8001"; }finally { // 誰加的鎖,誰才能刪除!!!! if(template.opsForValue().get(REDIS_LOCK).equals(value)){ template.delete(REDIS_LOCK); } } } }
這種方式解決了因服務處理時間太長而釋放了別人鎖的問題。這樣就沒問題了嗎?
在上面方式三下,規定了誰上的鎖,誰才能刪除,但finally
快的判斷和del
刪除操作不是原子操作,并發的時候也會出問題,并發嘛,就是要保證數據的一致性,保證數據的一致性,最好要保證對數據的操作具有原子性。
在Redis
的set
命令介紹中,最后推薦Lua
腳本進行鎖的刪除,地址
@RestController public class IndexController7 { public static final String REDIS_LOCK = "good_lock"; @Autowired StringRedisTemplate template; @RequestMapping("/buy7") public String index(){ // 每個人進來先要進行加鎖,key值為"good_lock" String value = UUID.randomUUID().toString().replace("-",""); try{ // 為key加一個過期時間 Boolean flag = template.opsForValue().setIfAbsent(REDIS_LOCK, value,10L,TimeUnit.SECONDS); // 加鎖失敗 if(!flag){ return "搶鎖失敗!"; } System.out.println( value+ " 搶鎖成功"); String result = template.opsForValue().get("goods:001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此處需要調用其他微服務,處理時間較長。。。 int realTotal = total - 1; template.opsForValue().set("goods:001", String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"); return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"; } else { System.out.println("購買商品失敗,服務端口為8001"); } return "購買商品失敗,服務端口為8001"; }finally { // 誰加的鎖,誰才能刪除,使用Lua腳本,進行鎖的刪除 Jedis jedis = null; try{ jedis = RedisUtils.getJedis(); String script = "if redis.call('get',KEYS[1]) == ARGV[1] " + "then " + "return redis.call('del',KEYS[1]) " + "else " + " return 0 " + "end"; Object eval = jedis.eval(script, Collections.singletonList(REDIS_LOCK), Collections.singletonList(value)); if("1".equals(eval.toString())){ System.out.println("-----del redis lock ok...."); }else{ System.out.println("-----del redis lock error ...."); } }catch (Exception e){ }finally { if(null != jedis){ jedis.close(); } } } } }
在方式四下,規定了誰上的鎖,誰才能刪除,并且解決了刪除操作沒有原子性問題。但還沒有考慮緩存續命,以及Redis
集群部署下,異步復制造成的鎖丟失:主節點沒來得及把剛剛set
進來這條數據給從節點,就掛了。所以直接上RedLock
的Redisson
落地實現。
@RestController public class IndexController8 { public static final String REDIS_LOCK = "good_lock"; @Autowired StringRedisTemplate template; @Autowired Redisson redisson; @RequestMapping("/buy8") public String index(){ RLock lock = redisson.getLock(REDIS_LOCK); lock.lock(); // 每個人進來先要進行加鎖,key值為"good_lock" String value = UUID.randomUUID().toString().replace("-",""); try{ String result = template.opsForValue().get("goods:001"); int total = result == null ? 0 : Integer.parseInt(result); if (total > 0) { // 如果在此處需要調用其他微服務,處理時間較長。。。 int realTotal = total - 1; template.opsForValue().set("goods:001", String.valueOf(realTotal)); System.out.println("購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"); return "購買商品成功,庫存還剩:" + realTotal + "件, 服務端口為8001"; } else { System.out.println("購買商品失敗,服務端口為8001"); } return "購買商品失敗,服務端口為8001"; }finally { if(lock.isLocked() && lock.isHeldByCurrentThread()){ lock.unlock(); } } } }
讀到這里,這篇“Redis實現分布式鎖的五種方法是什么”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。