亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

分布式鎖如何在Redis數據庫中使用

發布時間:2020-11-23 15:19:08 來源:億速云 閱讀:231 作者:Leah 欄目:開發技術

今天就跟大家聊聊有關分布式鎖如何在Redis數據庫中使用,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

模擬一個電商里面下單減庫存的場景。

1.首先在redis里加入商品庫存數量。

分布式鎖如何在Redis數據庫中使用

2.新建一個Spring Boot項目,在pom里面引入相關的依賴。

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

3.接下來,在application.yml配置redis屬性和指定應用的端口號:

server:
 port: 8090

spring:
 redis:
 host: 192.168.0.60
 port: 6379

4.新建一個Controller類,扣減庫存第一版代碼:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Objects;

@RestController
public class StockController {

 private static final Logger logger = LoggerFactory.getLogger(StockController.class);

 @Resource
 private StringRedisTemplate stringRedisTemplate;

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 // 從redis中獲取庫存數量
 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
 if (stock > 0) {
  // 減庫存
  int restStock = stock - 1;
  // 剩余庫存再重新設置到redis中
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("扣減成功,剩余庫存:{}", restStock);
 } else {
  logger.info("庫存不足,扣減失敗。");
 }

 return "success";
 }
}

上面第一版的代碼存在什么問題:超賣。假如多個線程同時調用獲取庫存數量的代碼,那么每個線程拿到的都是100,判斷庫存都大于0,都可以執行減庫存的操作。假如兩個線程都做減庫存更新緩存,那么緩存的庫存變成99,但實際上,應該是減掉2個庫存。

那么很多人的第一個想法是加synchronized同步代碼塊,因為獲取數量和減庫存不是原子性操作,有多個線程來執行代碼的時候,只允許一個線程執行代碼塊里的代碼。那么改完的第二版的代碼如下:

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 synchronized (this) {
  // 從redis中獲取庫存數量
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  // 減庫存
  int restStock = stock - 1;
  // 剩余庫存再重新設置到redis中
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("扣減成功,剩余庫存:{}", restStock);
  } else {
  logger.info("庫存不足,扣減失敗。");
  }
 }

 return "success";
 }

但使用synchronize存在的問題,就是只能保證單機環境運行時沒有問題的。但現在的軟件公司里,基本上都是集群架構,是多實例,前面使用Nginx做負載均衡,大概架構如下:

分布式鎖如何在Redis數據庫中使用

Nginx分發請求,把請求發送到不同的Tomcat容器,而synchronize只能保證一個應用是沒有問題的。

那么代碼改進第三版,就是引入redis分布式鎖,具體代碼如下:

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 String lockKey = "stockKey";
 try {
  boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
  if (!result) {
  return "errorCode";
  }
  // 從redis中獲取庫存數量
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  // 減庫存
  int restStock = stock - 1;
  // 剩余庫存再重新設置到redis中
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("扣減成功,剩余庫存:{}", restStock);
  } else {
  logger.info("庫存不足,扣減失敗。");
  }
 } finally {
  stringRedisTemplate.delete(lockKey)
 }
 return "success";
 }

如果有一個線程拿到鎖,那么其他的線程就會等待。一定要記得在finally里面把使用完的鎖要刪除掉。否則一旦拋出異常,只有一個線程會一直持有鎖,其他線程沒有機會獲取。

但如果在執行if (stock > 0) {代碼塊里的代碼,因為宕機或重啟沒有執行完,也會一直持有鎖,所以,這里需要把鎖加一個超時時間:

 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
 stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

但如果上面兩行代碼在中間執行出問題了,設置超時時間的代碼還沒執行,也會出現鎖不能釋放的問題。好在有對應的方法:就是把上面兩行代碼設置成一個原子操作:

 // 這里默認設置超時時間為10秒
 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);

到此為止,如果并發量不是很大的話,基本上是沒有問題的。

但是,如果請求的并發量很大,就會出現新的問題:有種比較特殊的情況,第一個線程執行了15秒,但是執行到10秒鐘的時候,鎖已經失效釋放了,那么在高并發場景下,第二個線程發現鎖已經失效,那么它就可以拿到這把鎖進行加鎖,
假設第二個線程執行需要8秒,它執行到5秒鐘后,此時第一個線程已經執行完了,執行完那一刻,進行了刪除key的操作,但是此時的鎖是第二個線程加的,這樣第一個線程把第二個線程加的鎖刪掉了。

那意味著第三個線程又可以拿到鎖,第三個線程執行了3秒鐘,此時第二個線程執行完畢,那么第二個線程把第三個線程的鎖又刪除了。導致鎖失效。

那么解決的思路就是,我自己加的鎖,不要被別人刪掉。那么可以為每個進來的請求生成一個唯一的id,作為分布式鎖的值,然后在釋放時,判斷一下當前線程的id,是不是和緩存里的id是否相等。

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 String lockKey = "stockKey";
 String id = UUID.randomUUID().toString();
 try {
  // 這里默認設置超時時間為30秒
  boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
  if (!result) {
  return "errorCode";
  }
  // 從redis中獲取庫存數量
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  // 減庫存
  int restStock = stock - 1;
  // 剩余庫存再重新設置到redis中
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("扣減成功,剩余庫存:{}", restStock);
  } else {
  logger.info("庫存不足,扣減失敗。");
  }
 } finally {
  if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
  stringRedisTemplate.delete(lockKey);
  }
 }
 return "success";
 }

到此為止,一個比較完善的鎖就實現了,可以應付大部分場景。
當然,上面的代碼還有一個問題,就是一個線程執行時間超過了過期時間,后面的代碼還沒有執行完,鎖就已經刪除了,還是會有些bug存在。解決的方法是給鎖續命的操作。
在當前主線程獲取到鎖以后,可以fork出一個線程,執行Timer定時器操作,假如默認超時時間為30秒,那么定時器每隔10秒去看下這把鎖還是否存在,存在就說明這個鎖里的邏輯還沒有執行完,那么就可以把當前主線程的超時時間重新設置為30秒;如果不存在,就直接結束掉。

但是上面的邏輯,在高并發場景下,實現比較完善還是比較困難的。好在現在已經有比較成熟的框架,那就是Redisson。官方地址https://redisson.org。

下面用Redisson來實現分布式鎖。

首先引入依賴包:

  <dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.6.5</version>
  </dependency>

配置類:

@Configuration
public class RedissonConfig {
 @Bean
 public Redisson redisson() {
  // 單機模式
  Config config = new Config();
  config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
  return (Redisson) Redisson.create(config);
 }
}

接下來用redisson重寫上面的減庫存操作:

 @Resource
 private Redisson redisson;
 
 @RequestMapping("/reduceStock")
 public String reduceStock() {
  String lockKey = "stockKey";
  RLock redissonLock = redisson.getLock(lockKey);
  try {
   // 加鎖,鎖續命
   redissonLock.lock();
   // 從redis中獲取庫存數量
   int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
   if (stock > 0) {
    // 減庫存
    int restStock = stock - 1;
    // 剩余庫存再重新設置到redis中
    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
    logger.info("扣減成功,剩余庫存:{}", restStock);
   } else {
    logger.info("庫存不足,扣減失敗。");
   }
  } finally {
   redissonLock.unlock();
  }
  return "success";
 }

其實就是三個步驟:獲取鎖,加鎖,釋放鎖。

先簡單看下Redisson的實現原理:

分布式鎖如何在Redis數據庫中使用

這里先說一下Redis很多操作使用Lua腳本來實現原子性操作,關于Lua語法,可以去網上找下相關教程。
使用Lua腳本的好處有:

1.減少網絡開銷,多個命令可以使用一次請求完成;

2.實現了原子性操作,Redis會把Lua腳本作為一個整體去執行;

3.實現事務,Redis自帶的事務功能有限,而Lua腳本實現了事務的常規操作,而且還支持回滾。

但是Lua實際上不會使用很多,如果Lua腳本執行時間過長,因為Redis是單線程,因此會導致堵塞。

最后,說下Redisson分布式鎖的代碼實現,
找到上面的redissonLock.lock();
lock方法點進去,一直點到RedissonLock類里面的lockInterruptibly方法:

 @Override
 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  // 獲取線程id
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return;
  }

  RFuture<RedissonLockEntry> future = subscribe(threadId);
  commandExecutor.syncSubscription(future);

  try {
   while (true) {
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     break;
    }

    // waiting for message
    if (ttl >= 0) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().acquire();
    }
   }
  } finally {
   unsubscribe(future, threadId);
  }
//  get(lockAsync(leaseTime, unit));
 }

重點看下tryAcquire方法,把線程id作為一個參數傳遞進來,在這個方法里面,找到tryLockInnerAsync方法點進去,

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  internalLockLeaseTime = unit.toMillis(leaseTime);

  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     "if (redis.call('exists', KEYS[1]) == 0) then " +
      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
      "return nil; " +
     "end; " +
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
      "return nil; " +
     "end; " +
     "return redis.call('pttl', KEYS[1]);",
     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
 }

這里就是一堆Lua腳本,先看第一個if命令,先去判斷 KEYS[1](就是對應的鎖key的名字),如果不存在,在hashmap里,設置一個屬性為線程id,值為1,再把map的過期時間設置為internalLockLeaseTime,這個值默認是30秒,

分布式鎖如何在Redis數據庫中使用

上面的操作對應的命令是:

hset keyname id:thread 1
pexpire keyname 30

然后返回nil,相當于null,那程序return了。

另外,Redisson還支持重入鎖,那第二個if就是執行重入鎖的操作,會判斷鎖是否存在,并且傳入的線程id是否是當前線程的id,若果是,支持重復加鎖進行自增操作;

如果是其他線程調用lock方法,上面兩個if判斷不會走,會返回鎖剩余過期時間。

接著返回到tryAcquireAsync方法里面往下看:

實際上是加了一個監聽器,在監聽器里面有個很重要的方法scheduleExpirationRenewal,一看這個名字就能大概猜出是什么功能,

里面有個定時任務的輪詢,

private void scheduleExpirationRenewal(final long threadId) {
  if (expirationRenewalMap.containsKey(getEntryName())) {
   return;
  }

  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
   @Override
   public void run(Timeout timeout) throws Exception {
    // 判斷傳遞進來的線程id是否是我們之前主線程設置的id,如果是,則增加續命,增加30秒。
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
       "return 1; " +
      "end; " +
      "return 0;",
       Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    
    future.addListener(new FutureListener<Boolean>() {
     @Override
     public void operationComplete(Future<Boolean> future) throws Exception {
      expirationRenewalMap.remove(getEntryName());
      if (!future.isSuccess()) {
       log.error("Can't update lock " + getName() + " expiration", future.cause());
       return;
      }
      
      if (future.getNow()) {
       // reschedule itself
       scheduleExpirationRenewal(threadId);
      }
     }
    });
   }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

  if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
   task.cancel();
  }
 }

接著推遲10秒鐘(internalLockLeaseTime / 3),再執行續命操作邏輯。

到最后,再回到lockInterruptibly方法,如果ttl 為null,說明加鎖成功了,就返回null,那如果其他線程的話,就會返回剩余過期時間,那么就會進入到while死循環里,一直嘗試加鎖,調用tryAcquire方法,在瑣失效以后,再會嘗試獲取加鎖。

看完上述內容,你們對分布式鎖如何在Redis數據庫中使用有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

岢岚县| 保靖县| 庆城县| 双桥区| 香港| 甘孜| 潞西市| 鹤山市| 苗栗市| 都江堰市| 湖口县| 饶阳县| 科尔| 中江县| 徐汇区| 郑州市| 阿拉尔市| 临海市| 岳池县| 朝阳市| 双牌县| 格尔木市| 政和县| 仁化县| 宣恩县| 西安市| 永宁县| 阳春市| 邵阳市| 嫩江县| 湖口县| 饶阳县| 中西区| 水城县| 浮梁县| 棋牌| 蒙自县| 沈丘县| 岢岚县| 万安县| 克拉玛依市|