您好,登錄后才能下訂單哦!
(1)zookeeper如何實現分布式鎖?
(2)zookeeper分布式鎖有哪些優點?
(3)zookeeper分布式鎖有哪些缺點?
zooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,它可以為分布式應用提供一致性的服務,它是Hadoop和Hbase的重要組件,同時也可以作為配置中心、注冊中心運用在微服務體系中。
本章我們將介紹zookeeper如何實現分布式鎖運用在分布式系統中。
zooKeeper操作和維護的為一個個數據節點,稱為 znode,采用類似文件系統的層級樹狀結構進行管理,如果 znode 節點包含數據則存儲為字節數組(byte array)。
而且,同一個節點多個客戶同時創建,本文由公從號“彤哥讀源碼”原創,只有一個客戶端會成功,其它客戶端創建時將失敗。
znode 共有四種類型:
持久(無序)
持久有序
臨時(無序)
其中,持久節點如果不手動刪除會一直存在,臨時節點當客戶端session失效就會自動刪除節點。
watcher(事件監聽器),是zookeeper中的一個很重要的特性。
zookeeper允許用戶在指定節點上注冊一些watcher,并且在一些特定事件觸發的時候,zooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現分布式協調服務的重要特性。
KeeperState | EventType | 觸發條件 | 說明 | 操作 |
---|---|---|---|---|
SyncConnected(3) | None(-1) | 客戶端與服務端成功建立連接 | 此時客戶端和服務器處于連接狀態 | - |
同上 | NodeCreated(1) | Watcher監聽的對應數據節點被創建 | 同上 | Create |
同上 | NodeDeleted(2) | Watcher監聽的對應數據節點被刪除 | 同上 | Delete/znode |
同上 | NodeDataChanged(3) | Watcher監聽的對應數據節點的數據內容發生變更 | 同上 | setDate/znode |
同上 | NodeChildChanged(4) | Wather監聽的對應數據節點的子節點列表發生變更 | 同上 | Create/child |
Disconnected(0) | None(-1) | 客戶端與ZooKeeper服務器斷開連接 | 此時客戶端和服務器處于斷開連接狀態 | - |
Expired(-112) | None(-1) | 會話超時 | 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 | - |
AuthFailed(4) | None(-1) | 通常有兩種情況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 | 通常同時也會收到AuthFailedException異常 | - |
既然,同一個節點只能創建一次,那么,加鎖時檢測節點是否存在,不存在則創建之,存在或者創建失敗則監聽這個節點的刪除事件,這樣,當釋放鎖的時候監聽的客戶端再次競爭去創建這個節點,成功的則獲取到鎖,不成功的則再次監聽該節點。
比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:
(1)三者同時嘗試創建/locker/user_1節點;
(2)client1創建成功,它獲取到鎖;
(3)client2和client3創建失敗,它們監聽/locker/user_1的刪除事件;
(4)client1執行鎖內業務邏輯;
(5)client1釋放鎖,刪除節點/locker/user_1;
(6)client2和client3都捕獲到節點/locker/user_1被刪除的事件,二者皆被喚醒;
(7)client2和client3同時去創建/locker/user_1節點;
(8)回到第二步,依次類推,本文由公從號“彤哥讀源碼”原創;
不過,這種方案有個很嚴重的弊端——驚群效應。
如果并發量很高,多個客戶端同時監聽同一個節點,釋放鎖時同時喚醒這么多個客戶端,然后再競爭,最后還是只有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統資源,那么有沒有更好的方案呢?答案是當然有,請看方案二。
為了解決方案一中的驚群效應,我們可以使用有序子節點的形式來實現分布式鎖,而且為了規避客戶端獲取鎖后突然斷線的風險,我們有必要使用臨時有序節點。
比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:
(1)三者同時在/locker/user_1/下面創建臨時有序子節點;
(2)三者皆創建成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;
(3)檢查自己創建的節點是不是子節點中最小的;
(4)client1發現自己是最小的節點,它獲取到鎖;
(5)client2和client3發現自己不是最小的節點,它們無法獲取到鎖;
(6)client2創建的節點為/locker/user_1/0000000003,它監聽其上一個節點/locker/user_1/0000000002的刪除事件;
(7)client3創建的節點為/locker/user_1/0000000002,它監聽其上一個節點/locker/user_1/0000000001的刪除事件;
(8)client1執行鎖內業務邏輯;
(9)client1釋放鎖,刪除節點/locker/user_1/0000000001;
(10)client3監聽到節點/locker/user_1/0000000001的刪除事件,被喚醒;
(11)client3再次檢查自己是不是最小的節點,發現是,則獲取到鎖;
(12)client3執行鎖內業務邏輯,本文由公從號“彤哥讀源碼”原創;
(13)client3釋放鎖,刪除節點/locker/user_1/0000000002;
(14)client2監聽到節點/locker/user_1/0000000002的刪除事件,被喚醒;
(15)client2執行鎖內業務邏輯;
(16)client2釋放鎖,刪除節點/locker/user_1/0000000003;
(17)client2檢查/locker/user_1/下是否還有子節點,沒有了則刪除/locker/user_1節點;
(18)流程結束;
這種方案相對于方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了線程喚醒的代價,提高了效率。
pom中引入以下jar包:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
定義一個Locker接口,與上一章mysql分布式鎖使用同一個接口。
public interface Locker {
void lock(String key, Runnable command);
}
這里通過內部類ZkLockerWatcher處理zookeeper的相關操作,需要注意以下幾點:
(1)zk連接建立完畢之前不要進行相關操作,否則會報ConnectionLoss異常,這里通過LockSupport.park();阻塞連接線程并在監聽線程中喚醒處理;
(2)客戶端線程與監聽線程不是同一個線程,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;
(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見代碼中注釋;
@Slf4j
@Component
public class ZkLocker implements Locker {
@Override
public void lock(String key, Runnable command) {
ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
try {
if (watcher.getLock()) {
command.run();
}
} finally {
watcher.releaseLock();
}
}
private static class ZkLockerWatcher implements Watcher {
public static final String connAddr = "127.0.0.1:2181";
public static final int timeout = 6000;
public static final String LOCKER_ROOT = "/locker";
ZooKeeper zooKeeper;
String parentLockPath;
String childLockPath;
Thread thread;
public static ZkLockerWatcher conn(String key) {
ZkLockerWatcher watcher = new ZkLockerWatcher();
try {
ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
watcher.thread = Thread.currentThread();
// 阻塞等待連接建立完畢
LockSupport.park();
// 根節點如果不存在,就創建一個(并發問題,如果兩個線程同時檢測不存在,兩個同時去創建必須有一個會失敗)
if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
try {
zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// 如果節點已存在,則創建失敗,這里捕獲異常,并不阻擋程序正常運行
log.info("創建節點 {} 失敗", LOCKER_ROOT);
}
}
// 當前加鎖的節點是否存在
watcher.parentLockPath = LOCKER_ROOT + "/" + key;
if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
try {
zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
// 如果節點已存在,則創建失敗,這里捕獲異常,并不阻擋程序正常運行
log.info("創建節點 {} 失敗", watcher.parentLockPath);
}
}
} catch (Exception e) {
log.error("conn to zk error", e);
throw new RuntimeException("conn to zk error");
}
return watcher;
}
public boolean getLock() {
try {
// 創建子節點,本文由公從號“彤哥讀源碼”原創
this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 檢查自己是不是最小的節點,是則獲取成功,不是則監聽上一個節點
return getLockOrWatchLast();
} catch (Exception e) {
log.error("get lock error", e);
throw new RuntimeException("get lock error");
} finally {
// System.out.println("getLock: " + childLockPath);
}
}
public void releaseLock() {
try {
if (childLockPath != null) {
// 釋放鎖,刪除節點
zooKeeper.delete(childLockPath, -1);
}
// 最后一個釋放的刪除鎖節點
List<String> children = zooKeeper.getChildren(parentLockPath, false);
if (children.isEmpty()) {
try {
zooKeeper.delete(parentLockPath, -1);
} catch (KeeperException e) {
// 如果刪除之前又新加了一個子節點,會刪除失敗
log.info("刪除節點 {} 失敗", parentLockPath);
}
}
// 關閉zk連接
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (Exception e) {
log.error("release lock error", e);
throw new RuntimeException("release lock error");
} finally {
// System.out.println("releaseLock: " + childLockPath);
}
}
private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
List<String> children = zooKeeper.getChildren(parentLockPath, false);
// 必須要排序一下,這里取出來的順序可能是亂的
Collections.sort(children);
// 如果當前節點是第一個子節點,則獲取鎖成功
if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
return true;
}
// 如果不是第一個子節點,就監聽前一個節點
String last = "";
for (String child : children) {
if ((parentLockPath + "/" + child).equals(childLockPath)) {
break;
}
last = child;
}
if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
this.thread = Thread.currentThread();
// 阻塞當前線程
LockSupport.park();
// 喚醒之后重新檢測自己是不是最小的節點,因為有可能上一個節點斷線了
return getLockOrWatchLast();
} else {
// 如果上一個節點不存在,說明還沒來得及監聽就釋放了,重新檢查一次
return getLockOrWatchLast();
}
}
@Override
public void process(WatchedEvent event) {
if (this.thread != null) {
// 喚醒阻塞的線程(這是在監聽線程,跟獲取鎖的線程不是同一個線程)
LockSupport.unpark(this.thread);
this.thread = null;
}
}
}
}
我們這里起兩批線程,一批獲取user_1這個鎖,一批獲取user_2這個鎖。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {
@Autowired
private Locker locker;
@Test
public void testZkLocker() throws IOException {
for (int i = 0; i < 1000; i++) {
new Thread(()->{
locker.lock("user_1", ()-> {
try {
System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}, "Thread-"+i).start();
}
for (int i = 1000; i < 2000; i++) {
new Thread(()->{
locker.lock("user_2", ()-> {
try {
System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}, "Thread-"+i).start();
}
System.in.read();
}
}
運行結果:
可以看到穩定在500ms左右打印兩個鎖的結果。
user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621
上面的原生API實現更易于理解zookeeper實現分布式鎖的邏輯,但是難免保證沒有什么問題,比如不是重入鎖,不支持讀寫鎖等。
下面我們一起看看現有的輪子curator是怎么實現的。
pom文件中引入以下jar包:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
下面是互斥鎖的一種實現方案:
@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
public static final String connAddr = "127.0.0.1:2181";
public static final int timeout = 6000;
public static final String LOCKER_ROOT = "/locker";
private CuratorFramework cf;
@PostConstruct
public void init() {
this.cf = CuratorFrameworkFactory.builder()
.connectString(connAddr)
.sessionTimeoutMs(timeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
cf.start();
}
@Override
public void lock(String key, Runnable command) {
String path = LOCKER_ROOT + "/" + key;
InterProcessLock lock = new InterProcessMutex(cf, path);
try {
// 本文由公從號“彤哥讀源碼”原創
lock.acquire();
command.run();
} catch (Exception e) {
log.error("get lock error", e);
throw new RuntimeException("get lock error", e);
} finally {
try {
lock.release();
} catch (Exception e) {
log.error("release lock error", e);
throw new RuntimeException("release lock error", e);
}
}
}
}
除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現方式,而且他們是可重入的鎖。
(1)zookeeper中的節點有四種類型:持久、持久有序、臨時、臨時有序;
(2)zookeeper提供了一種非常重要的特性——監聽機制,它可以用來監聽節點的變化;
(3)zookeeper分布式鎖是基于 臨時有序節點 + 監聽機制 實現的;
(4)zookeeper分布式鎖加鎖時在鎖路徑下創建臨時有序節點;
(5)如果自己是第一個節點,則獲得鎖;
(6)如果自己不是第一個節點,則監聽前一個節點,并阻塞當前線程;
(7)當監聽到前一個節點的刪除事件時,喚醒當前節點的線程,并再次檢查自己是不是第一個節點;
(8)使用臨時有序節點而不是持久有序節點是為了讓客戶端無故斷線時能夠自動釋放鎖;
zookeeper分布式鎖有哪些優點?
答:1)zookeeper本身可以集群部署,相對于mysql的單點更可靠;
2)不會占用mysql的連接數,不會增加mysql的壓力;
3)使用監聽機制,減少線程上下文切換的次數;
4)客戶端斷線能夠自動釋放鎖,非常安全;
5)有現有的輪子curator可以使用;
6)curator實現方式是可重入的,對現有代碼改造成本小;
zookeeper分布式鎖有哪些缺點?
答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;
2)寫zookeeper的時候會在集群進行同步,節點數越多,同步越慢,獲取鎖的過程越慢;
3)需要另外依賴zookeeper,而大部分服務是不會使用zookeeper的,增加了系統的復雜性;
4)相對于redis分布式鎖,性能要稍微略差一些;
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。