您好,登錄后才能下訂單哦!
這篇文章主要講解了“Java從零實現Redis分布式鎖的方法教程”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Java從零實現Redis分布式鎖的方法教程”吧!
在 jdk 中為我們提供了加鎖的方式:
(1)synchronized 關鍵字
(2)volatile + CAS 實現的樂觀鎖
(3)ReadWriteLock 讀寫鎖
(4)ReenTrantLock 可重入鎖
等等,這些鎖為我們變成提供極大的便利性,保證在多線程的情況下,保證線程安全。
但是在分布式系統中,上面的鎖就統統沒用了。
我們想要解決分布式系統中的并發問題,就需要引入分布式鎖的概念。
首先是對鎖實現原理的一個實現,理論指導實踐,實踐完善理論。
晚上關于 redis 分布式鎖的文章一大堆,但是也都稂莠不齊。
redis 分布式鎖工具有時候中間件團隊不見得會提供,提供了也不見得經常維護,不如自己實現一個,知道原理,也方便修改。
為了便于和 JDK 復用,我們讓接口繼承自 jdk 的 Lock 接口。
package com.github.houbb.lock.api.core; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; /** * 鎖定義 * @author binbin.hou * @since 0.0.1 */ public interface ILock extends Lock { /** * 嘗試加鎖 * @param time 時間 * @param unit 當為 * @param key key * @return 返回 * @throws InterruptedException 異常 * @since 0.0.1 */ boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException; /** * 嘗試加鎖 * @param key key * @return 返回 * @since 0.0.1 */ boolean tryLock(String key); /** * 解鎖 * @param key key * @since 0.0.1 */ void unlock(String key); }
方法我們只添加了三個比較常用的核心方法,作為第一個版本,簡單點。
后續陸續添加即可。
為了便于后期添加更多的所實現,這里首先實現了一個公用的抽象父類。
package com.github.houbb.lock.redis.core; import com.github.houbb.lock.api.core.ILock; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.wait.api.IWait; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; /** * 抽象實現 * @author binbin.hou * @since 0.0.1 */ public abstract class AbstractLockRedis implements ILock { /** * 鎖等待 * @since 0.0.1 */ private final IWait wait; protected AbstractLockRedis(IWait wait) { this.wait = wait; } @Override public void lock() { throw new UnsupportedOperationException(); } @Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); } @Override public boolean tryLock() { return tryLock(LockRedisConst.DEFAULT_KEY); } @Override public void unlock() { unlock(LockRedisConst.DEFAULT_KEY); } @Override public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException { long startTimeMills = System.currentTimeMillis(); // 一次獲取,直接成功 boolean result = this.tryLock(key); if(result) { return true; } // 時間判斷 if(time <= 0) { return false; } long durationMills = unit.toMillis(time); long endMills = startTimeMills + durationMills; // 循環等待 while (System.currentTimeMillis() < endMills) { result = tryLock(key); if(result) { return true; } // 等待 10ms wait.wait(TimeUnit.MILLISECONDS, 10); } return false; } @Override public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return tryLock(time, unit, LockRedisConst.DEFAULT_KEY); } @Override public Condition newCondition() { throw new UnsupportedOperationException(); } }
最核心的實際上是 public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。
這個方法會調用 this.tryLock(key) 獲取鎖,如果成功,直接返回;如果不成功,則循環等待。
這里設置了超時時間,如果超時,則直接返回 true。
我們實現的 redis 分布鎖,繼承自上面的抽象類。
package com.github.houbb.lock.redis.core; import com.github.houbb.heaven.util.lang.StringUtil; import com.github.houbb.id.api.Id; import com.github.houbb.id.core.util.IdThreadLocalHelper; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.lock.redis.exception.LockRedisException; import com.github.houbb.lock.redis.support.operator.IOperator; import com.github.houbb.wait.api.IWait; /** * 這里是基于 redis 實現 * * 實際上也可以基于 zk/數據庫等實現。 * * @author binbin.hou * @since 0.0.1 */ public class LockRedis extends AbstractLockRedis { /** * redis 操作實現 * @since 0.0.1 */ private final IOperator redisOperator; /** * 主鍵標識 * @since 0.0.1 */ private final Id id; public LockRedis(IWait wait, IOperator redisOperator, Id id) { super(wait); this.redisOperator = redisOperator; this.id = id; } @Override public boolean tryLock(String key) { final String requestId = id.id(); IdThreadLocalHelper.put(requestId); return redisOperator.lock(key, requestId, LockRedisConst.DEFAULT_EXPIRE_MILLS); } @Override public void unlock(String key) { final String requestId = IdThreadLocalHelper.get(); if(StringUtil.isEmpty(requestId)) { String threadName = Thread.currentThread().getName(); throw new LockRedisException("Thread " + threadName +" not contains requestId"); } boolean unlock = redisOperator.unlock(key, requestId); if(!unlock) { throw new LockRedisException("Unlock key " + key + " result is failed!"); } } }
這里就是 redis 鎖的核心實現了,如果不太理解,建議回顧一下原理篇:
redis 分布式鎖原理詳解
加鎖部分,這里會生成一個 id 標識,用于區分當前操作者。
為了安全也設置了默認的超時時間。
當然這里是為了簡化調用者的使用成本,開發在使用的時候只需要關心自己要加鎖的 key 即可。
當然,甚至連加鎖的 key 都可以進一步抽象掉,比如封裝 @DistributedLock 放在方法上,即可實現分布式鎖。這個后續有時間可以拓展,原理也不難。
解鎖的時候,就會獲取當前進程的持有標識。
憑借當前線程持有的 id 標識,去解鎖。
我們對 redis 的操作進行了抽象,為什么抽象呢?
因為 redis 服務種類實際很多,可以是 redis 單點,集群,主從,哨兵。
連接的客戶端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。
這里為了后期拓展方便,就對操作進行了抽象。
定義接口如下:
package com.github.houbb.lock.redis.support.operator; /** * Redis 客戶端 * @author binbin.hou * @since 0.0.1 */ public interface IOperator { /** * 嘗試獲取分布式鎖 * * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTimeMills 超期時間 * @return 是否獲取成功 * @since 0.0.1 */ boolean lock(String lockKey, String requestId, int expireTimeMills); /** * 解鎖 * @param lockKey 鎖 key * @param requestId 請求標識 * @return 結果 * @since 0.0.1 */ boolean unlock(String lockKey, String requestId); }
我們實現一個 jedis 單點版本的:
package com.github.houbb.lock.redis.support.operator.impl; import com.github.houbb.lock.redis.constant.LockRedisConst; import com.github.houbb.lock.redis.support.operator.IOperator; import redis.clients.jedis.Jedis; import java.util.Collections; /** * Redis 客戶端 * @author binbin.hou * @since 0.0.1 */ public class JedisOperator implements IOperator { /** * jedis 客戶端 * @since 0.0.1 */ private final Jedis jedis; public JedisOperator(Jedis jedis) { this.jedis = jedis; } /** * 嘗試獲取分布式鎖 * * expireTimeMills 保證當前進程掛掉,也能釋放鎖 * * requestId 保證解鎖的是當前進程(鎖的持有者) * * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTimeMills 超期時間 * @return 是否獲取成功 * @since 0.0.1 */ @Override public boolean lock(String lockKey, String requestId, int expireTimeMills) { String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills); return LockRedisConst.LOCK_SUCCESS.equals(result); } /** * 解鎖 * * (1)使用 requestId,保證為當前鎖的持有者 * (2)使用 lua 腳本,保證執行的原子性。 * * @param lockKey 鎖 key * @param requestId 請求標識 * @return 結果 * @since 0.0.1 */ @Override public boolean unlock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); return LockRedisConst.RELEASE_SUCCESS.equals(result); } }
這里時最核心的部分。
別看簡單幾行代碼,需要注意的點還是很多的。
加鎖時附帶 requestId,用來標識自己為鎖的持有者。
SETNX 當 key 不存在時才進行加鎖。
設置加鎖的過期時間,避免因異常等原因未釋放鎖,導致鎖的長時間占用。
使用 lua 腳本,保證操作的原子性。
為了證明為鎖的持有者,傳入 requestId。
<dependency> <groupId>com.github.houbb</groupId> <artifactId>lock-core</artifactId> <version>0.0.1</version> </dependency>
Jedis jedis = new Jedis("127.0.0.1", 6379); IOperator operator = new JedisOperator(jedis); // 獲取鎖 ILock lock = LockRedisBs.newInstance().operator(operator).lock(); try { boolean lockResult = lock.tryLock(); System.out.println(lockResult); // 業務處理 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); }
到這里,一個簡單版本的 redis 分布式鎖就實現完成了。
當然還有很多可以改進的地方:
(1)比如引入遞增的 sequence,避免分布式鎖中的 GC 導致的問題
(2)對于更多 redis 服務端+客戶端的支持
(3)對于注解式 redis 分布式鎖的支持
感謝各位的閱讀,以上就是“Java從零實現Redis分布式鎖的方法教程”的內容了,經過本文的學習后,相信大家對Java從零實現Redis分布式鎖的方法教程這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。