您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關使用ZooKeeper怎么實現一個分布式鎖,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
ZooKeeper 是一個典型的分布式數據一致性解決方案,分布式應用程序可以基于 ZooKeeper 實現諸如數據發布/訂閱、負載均衡、分布式協調/通知、集群管理、Master 選舉、分布式鎖等功能。
節點
在介紹 ZooKeeper 分布式鎖前需要先了解一下 ZooKeeper 中節點(Znode),ZooKeeper 的數據存儲數據模型是一棵樹(Znode Tree),由斜杠(/)的進行分割的路徑,就是一個 Znode(如 /locks/my_lock)。每個 Znode 上都會保存自己的數據內容,同時還會保存一系列屬性信息。
Znode 又分為以下四種類型:
類型 | 描述 |
---|---|
持久節點 | 節點創建后,會一直存在,不會因客戶端會話失效而刪除 |
持久順序節點 | 基本特性與持久節點一致,創建節點的過程中,ZooKeeper 會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名 |
臨時節點 | 客戶端會話失效或連接關閉后,該節點會被自動刪除 |
臨時順序節點 | 基本特性與臨時節點一致,創建節點的過程中,ZooKeeper 會在其名字后自動追加一個單調增長的數字后綴,作為新的節點名 |
鎖原理
ZooKeeper 分布式鎖是基于 臨時順序節點 來實現的,鎖可理解為 ZooKeeper 上的一個節點,當需要獲取鎖時,就在這個鎖節點下創建一個臨時順序節點。當存在多個客戶端同時來獲取鎖,就按順序依次創建多個臨時順序節點,但只有排列序號是第一的那個節點能獲取鎖成功,其他節點則按順序分別監聽前一個節點的變化,當被監聽者釋放鎖時,監聽者就可以馬上獲得鎖。
而且用臨時順序節點的另外一個用意是如果某個客戶端創建臨時順序節點后,自己意外宕機了也沒關系,ZooKeeper 感知到某個客戶端宕機后會自動刪除對應的臨時順序節點,相當于自動釋放鎖。
如上圖:ClientA 和 ClientB 同時想獲取鎖,所以都在 locks 節點下創建了一個臨時節點 1 和 2,而 1 是當前 locks 節點下排列序號第一的節點,所以 ClientA 獲取鎖成功,而 ClientB 處于等待狀態,這時 ZooKeeper 中的 2 節點會監聽 1 節點,當 1節點鎖釋放(節點被刪除)時,2 就變成了 locks 節點下排列序號第一的節點,這樣 ClientB 就獲取鎖成功了。
代碼測試
請確保 ZooKeeper 服務已啟動,ZooKeeper 的搭建可參考Kafka 集群 中的 ZooKeeper 集群部分
以下是基于 C# 的測試,Java 可使用 Curator 框架,實現原理和上面描述是一致的,有興趣可以看看源碼,應該也不難理解。
創建 .NET Core 控制臺程序 Nuget
安裝 ZooKeeperNetEx.Recipes
創建 ZooKeeper Client
private const int CONNECTION_TIMEOUT = 50000; private const string CONNECTION_STRING = "127.0.0.1:2181"; private ZooKeeper CreateClient() { var zooKeeper = new ZooKeeper(CONNECTION_STRING, CONNECTION_TIMEOUT, NullWatcher.Instance); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds < CONNECTION_TIMEOUT) { var state = zooKeeper.getState(); if (state == ZooKeeper.States.CONNECTED || state == ZooKeeper.States.CONNECTING) { break; } } sw.Stop(); return zooKeeper; } class NullWatcher : Watcher { public static readonly NullWatcher Instance = new NullWatcher(); private NullWatcher() { } public override Task process(WatchedEvent @event) { return Task.CompletedTask; } }
添加 Lock 方法
/// <summary> /// 加鎖 /// </summary> /// <param name="key">加鎖的節點名</param> /// <param name="lockAcquiredAction">加鎖成功后需要執行的邏輯</param> /// <param name="lockReleasedAction">鎖釋放后需要執行的邏輯,可為空</param> /// <returns></returns> public async Task Lock(string key, Action lockAcquiredAction, Action lockReleasedAction = null) { // 獲取 ZooKeeper Client ZooKeeper keeper = CreateClient(); // 指定鎖節點 WriteLock writeLock = new WriteLock(keeper, $"/{key}", null); var lockCallback = new LockCallback(() => { lockAcquiredAction.Invoke(); writeLock.unlock(); }, lockReleasedAction); // 綁定鎖獲取和釋放的監聽對象 writeLock.setLockListener(lockCallback); // 獲取鎖(獲取失敗時會監聽上一個臨時節點) await writeLock.Lock(); } class LockCallback : LockListener { private readonly Action _lockAcquiredAction; private readonly Action _lockReleasedAction; public LockCallback(Action lockAcquiredAction, Action lockReleasedAction) { _lockAcquiredAction = lockAcquiredAction; _lockReleasedAction = lockReleasedAction; } /// <summary> /// 獲取鎖成功回調 /// </summary> /// <returns></returns> public Task lockAcquired() { _lockAcquiredAction?.Invoke(); return Task.FromResult(0); } /// <summary> /// 釋放鎖成功回調 /// </summary> /// <returns></returns> public Task lockReleased() { _lockReleasedAction?.Invoke(); return Task.FromResult(0); } }
多線程模擬測試
static async Task RunAsync() { Parallel.For(1, 10, async (i) => { await new ZooKeeprDistributedLock().Lock("locks", () => { Console.WriteLine($"第{i}個請求,獲取鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}"); Thread.Sleep(1000); // 業務邏輯... }, () => { Console.WriteLine($"第{i}個請求,釋放鎖成功:{DateTime.Now},線程Id:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine("-------------------------------"); }); }); await Task.CompletedTask; }
看完上述內容,你們對使用ZooKeeper怎么實現一個分布式鎖有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。