亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

死磕 java同步系列之zookeeper分布式鎖

發布時間:2020-07-18 02:23:28 來源:網絡 閱讀:226 作者:彤哥讀源碼 欄目:編程語言

問題

(1)zookeeper如何實現分布式鎖?

(2)zookeeper分布式鎖有哪些優點?

(3)zookeeper分布式鎖有哪些缺點?

簡介

zooKeeper是一個分布式的,開放源碼的分布式應用程序協調服務,它可以為分布式應用提供一致性的服務,它是Hadoop和Hbase的重要組件,同時也可以作為配置中心、注冊中心運用在微服務體系中。

本章我們將介紹zookeeper如何實現分布式鎖運用在分布式系統中。

基礎知識

什么是znode?

zooKeeper操作和維護的為一個個數據節點,稱為 znode,采用類似文件系統的層級樹狀結構進行管理,如果 znode 節點包含數據則存儲為字節數組(byte array)。

而且,同一個節點多個客戶同時創建,本文由公從號“彤哥讀源碼”原創,只有一個客戶端會成功,其它客戶端創建時將失敗。

死磕 java同步系列之zookeeper分布式鎖

節點類型

znode 共有四種類型:

  • 持久(無序)

  • 持久有序

  • 臨時(無序)

  • 臨時有序

其中,持久節點如果不手動刪除會一直存在,臨時節點當客戶端session失效就會自動刪除節點。

什么是watcher?

watcher(事件監聽器),是zookeeper中的一個很重要的特性。

zookeeper允許用戶在指定節點上注冊一些watcher,并且在一些特定事件觸發的時候,zooKeeper服務端會將事件通知到感興趣的客戶端上去,該機制是Zookeeper實現分布式協調服務的重要特性

KeeperState EventType 觸發條件 說明 操作
SyncConnected(3) None(-1) 客戶端與服務端成功建立連接 此時客戶端和服務器處于連接狀態 -
同上 NodeCreated(1) Watcher監聽的對應數據節點被創建 同上 Create
同上 NodeDeleted(2) Watcher監聽的對應數據節點被刪除 同上 Delete/znode
同上 NodeDataChanged(3) Watcher監聽的對應數據節點的數據內容發生變更 同上 setDate/znode
同上 NodeChildChanged(4) Wather監聽的對應數據節點的子節點列表發生變更 同上 Create/child
Disconnected(0) None(-1) 客戶端與ZooKeeper服務器斷開連接 此時客戶端和服務器處于斷開連接狀態 -
Expired(-112) None(-1) 會話超時 此時客戶端會話失效,通常同時也會受到SessionExpiredException異常 -
AuthFailed(4) None(-1) 通常有兩種情況,1:使用錯誤的schema進行權限檢查 2:SASL權限檢查失敗 通常同時也會收到AuthFailedException異常 -

原理解析

方案一

既然,同一個節點只能創建一次,那么,加鎖時檢測節點是否存在,不存在則創建之,存在或者創建失敗則監聽這個節點的刪除事件,這樣,當釋放鎖的時候監聽的客戶端再次競爭去創建這個節點,成功的則獲取到鎖,不成功的則再次監聽該節點。

死磕 java同步系列之zookeeper分布式鎖

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時嘗試創建/locker/user_1節點;

(2)client1創建成功,它獲取到鎖;

(3)client2和client3創建失敗,它們監聽/locker/user_1的刪除事件;

(4)client1執行鎖內業務邏輯;

(5)client1釋放鎖,刪除節點/locker/user_1;

(6)client2和client3都捕獲到節點/locker/user_1被刪除的事件,二者皆被喚醒;

(7)client2和client3同時去創建/locker/user_1節點;

(8)回到第二步,依次類推,本文由公從號“彤哥讀源碼”原創;

不過,這種方案有個很嚴重的弊端——驚群效應。

如果并發量很高,多個客戶端同時監聽同一個節點,釋放鎖時同時喚醒這么多個客戶端,然后再競爭,最后還是只有一個能獲取到鎖,其它客戶端又要沉睡,這些客戶端的喚醒沒有任何意義,極大地浪費系統資源,那么有沒有更好的方案呢?答案是當然有,請看方案二。

方案二

為了解決方案一中的驚群效應,我們可以使用有序子節點的形式來實現分布式鎖,而且為了規避客戶端獲取鎖后突然斷線的風險,我們有必要使用臨時有序節點。

死磕 java同步系列之zookeeper分布式鎖

比如,有三個客戶端client1、client2、client3同時獲取/locker/user_1這把鎖,它們將按照如下步驟運行:

(1)三者同時在/locker/user_1/下面創建臨時有序子節點;

(2)三者皆創建成功,分別為/locker/user_1/0000000001、/locker/user_1/0000000003、/locker/user_1/0000000002;

(3)檢查自己創建的節點是不是子節點中最小的;

(4)client1發現自己是最小的節點,它獲取到鎖;

(5)client2和client3發現自己不是最小的節點,它們無法獲取到鎖;

(6)client2創建的節點為/locker/user_1/0000000003,它監聽其上一個節點/locker/user_1/0000000002的刪除事件;

(7)client3創建的節點為/locker/user_1/0000000002,它監聽其上一個節點/locker/user_1/0000000001的刪除事件;

(8)client1執行鎖內業務邏輯;

(9)client1釋放鎖,刪除節點/locker/user_1/0000000001;

(10)client3監聽到節點/locker/user_1/0000000001的刪除事件,被喚醒;

(11)client3再次檢查自己是不是最小的節點,發現是,則獲取到鎖;

(12)client3執行鎖內業務邏輯,本文由公從號“彤哥讀源碼”原創;

(13)client3釋放鎖,刪除節點/locker/user_1/0000000002;

(14)client2監聽到節點/locker/user_1/0000000002的刪除事件,被喚醒;

(15)client2執行鎖內業務邏輯;

(16)client2釋放鎖,刪除節點/locker/user_1/0000000003;

(17)client2檢查/locker/user_1/下是否還有子節點,沒有了則刪除/locker/user_1節點;

(18)流程結束;

這種方案相對于方案一來說,每次釋放鎖時只喚醒一個客戶端,減少了線程喚醒的代價,提高了效率。

zookeeper原生API實現

pom文件

pom中引入以下jar包:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.5</version>
</dependency>

Locker接口

定義一個Locker接口,與上一章mysql分布式鎖使用同一個接口。

public interface Locker {
    void lock(String key, Runnable command);
}

zookeeper分布式鎖實現

這里通過內部類ZkLockerWatcher處理zookeeper的相關操作,需要注意以下幾點:

(1)zk連接建立完畢之前不要進行相關操作,否則會報ConnectionLoss異常,這里通過LockSupport.park();阻塞連接線程并在監聽線程中喚醒處理;

(2)客戶端線程與監聽線程不是同一個線程,所以可以通過LockSupport.park();及LockSupport.unpark(thread);來處理;

(3)中間很多步驟不是原子的(坑),所以需要再次檢測,詳見代碼中注釋;

@Slf4j
@Component
public class ZkLocker implements Locker {
    @Override
    public void lock(String key, Runnable command) {
        ZkLockerWatcher watcher = ZkLockerWatcher.conn(key);
        try {
            if (watcher.getLock()) {
                command.run();
            }
        } finally {
            watcher.releaseLock();
        }
    }

    private static class ZkLockerWatcher implements Watcher {
        public static final String connAddr = "127.0.0.1:2181";
        public static final int timeout = 6000;
        public static final String LOCKER_ROOT = "/locker";

        ZooKeeper zooKeeper;
        String parentLockPath;
        String childLockPath;
        Thread thread;

        public static ZkLockerWatcher conn(String key) {
            ZkLockerWatcher watcher = new ZkLockerWatcher();
            try {
                ZooKeeper zooKeeper = watcher.zooKeeper = new ZooKeeper(connAddr, timeout, watcher);
                watcher.thread = Thread.currentThread();
                // 阻塞等待連接建立完畢
                LockSupport.park();
                // 根節點如果不存在,就創建一個(并發問題,如果兩個線程同時檢測不存在,兩個同時去創建必須有一個會失敗)
                if (zooKeeper.exists(LOCKER_ROOT, false) == null) {
                    try {
                        zooKeeper.create(LOCKER_ROOT, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則創建失敗,這里捕獲異常,并不阻擋程序正常運行
                        log.info("創建節點 {} 失敗", LOCKER_ROOT);
                    }
                }
                // 當前加鎖的節點是否存在
                watcher.parentLockPath = LOCKER_ROOT + "/" + key;
                if (zooKeeper.exists(watcher.parentLockPath, false) == null) {
                    try {
                        zooKeeper.create(watcher.parentLockPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException e) {
                        // 如果節點已存在,則創建失敗,這里捕獲異常,并不阻擋程序正常運行
                        log.info("創建節點 {} 失敗", watcher.parentLockPath);
                    }
                }

            } catch (Exception e) {
                log.error("conn to zk error", e);
                throw new RuntimeException("conn to zk error");
            }
            return watcher;
        }

        public boolean getLock() {
            try {
                // 創建子節點,本文由公從號“彤哥讀源碼”原創
                this.childLockPath = zooKeeper.create(parentLockPath + "/", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                // 檢查自己是不是最小的節點,是則獲取成功,不是則監聽上一個節點
                return getLockOrWatchLast();
            } catch (Exception e) {
                log.error("get lock error", e);
                throw new RuntimeException("get lock error");
            } finally {
//                System.out.println("getLock: " + childLockPath);
            }
        }

        public void releaseLock() {
            try {
                if (childLockPath != null) {
                    // 釋放鎖,刪除節點
                    zooKeeper.delete(childLockPath, -1);
                }
                // 最后一個釋放的刪除鎖節點
                List<String> children = zooKeeper.getChildren(parentLockPath, false);
                if (children.isEmpty()) {
                    try {
                        zooKeeper.delete(parentLockPath, -1);
                    } catch (KeeperException e) {
                        // 如果刪除之前又新加了一個子節點,會刪除失敗
                        log.info("刪除節點 {} 失敗", parentLockPath);
                    }
                }
                // 關閉zk連接
                if (zooKeeper != null) {
                    zooKeeper.close();
                }
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error");
            } finally {
//                System.out.println("releaseLock: " + childLockPath);
            }
        }

        private boolean getLockOrWatchLast() throws KeeperException, InterruptedException {
            List<String> children = zooKeeper.getChildren(parentLockPath, false);
            // 必須要排序一下,這里取出來的順序可能是亂的
            Collections.sort(children);
            // 如果當前節點是第一個子節點,則獲取鎖成功
            if ((parentLockPath + "/" + children.get(0)).equals(childLockPath)) {
                return true;
            }

            // 如果不是第一個子節點,就監聽前一個節點
            String last = "";
            for (String child : children) {
                if ((parentLockPath + "/" + child).equals(childLockPath)) {
                    break;
                }
                last = child;
            }

            if (zooKeeper.exists(parentLockPath + "/" + last, true) != null) {
                this.thread = Thread.currentThread();
                // 阻塞當前線程
                LockSupport.park();
                // 喚醒之后重新檢測自己是不是最小的節點,因為有可能上一個節點斷線了
                return getLockOrWatchLast();
            } else {
                // 如果上一個節點不存在,說明還沒來得及監聽就釋放了,重新檢查一次
                return getLockOrWatchLast();
            }
        }

        @Override
        public void process(WatchedEvent event) {
            if (this.thread != null) {
                // 喚醒阻塞的線程(這是在監聽線程,跟獲取鎖的線程不是同一個線程)
                LockSupport.unpark(this.thread);
                this.thread = null;
            }
        }
    }
}

測試代碼

我們這里起兩批線程,一批獲取user_1這個鎖,一批獲取user_2這個鎖。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class ZkLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testZkLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("user_1", ()-> {
                    try {
                        System.out.println(String.format("user_1 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }
        for (int i = 1000; i < 2000; i++) {
            new Thread(()->{
                locker.lock("user_2", ()-> {
                    try {
                        System.out.println(String.format("user_2 time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

運行結果:

可以看到穩定在500ms左右打印兩個鎖的結果。

user_1 time: 1568973299578, threadName: Thread-10
user_2 time: 1568973299579, threadName: Thread-1780
user_1 time: 1568973300091, threadName: Thread-887
user_2 time: 1568973300091, threadName: Thread-1542
user_1 time: 1568973300594, threadName: Thread-882
user_2 time: 1568973300594, threadName: Thread-1539
user_2 time: 1568973301098, threadName: Thread-1592
user_1 time: 1568973301098, threadName: Thread-799
user_1 time: 1568973301601, threadName: Thread-444
user_2 time: 1568973301601, threadName: Thread-1096
user_1 time: 1568973302104, threadName: Thread-908
user_2 time: 1568973302104, threadName: Thread-1574
user_2 time: 1568973302607, threadName: Thread-1515
user_1 time: 1568973302607, threadName: Thread-80
user_1 time: 1568973303110, threadName: Thread-274
user_2 time: 1568973303110, threadName: Thread-1774
user_1 time: 1568973303615, threadName: Thread-324
user_2 time: 1568973303615, threadName: Thread-1621

curator實現

上面的原生API實現更易于理解zookeeper實現分布式鎖的邏輯,但是難免保證沒有什么問題,比如不是重入鎖,不支持讀寫鎖等。

下面我們一起看看現有的輪子curator是怎么實現的。

pom文件

pom文件中引入以下jar包:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.0.0</version>
</dependency>

代碼實現

下面是互斥鎖的一種實現方案:

@Component
@Slf4j
public class ZkCuratorLocker implements Locker {
    public static final String connAddr = "127.0.0.1:2181";
    public static final int timeout = 6000;
    public static final String LOCKER_ROOT = "/locker";

    private CuratorFramework cf;

    @PostConstruct
    public void init() {
        this.cf = CuratorFrameworkFactory.builder()
                .connectString(connAddr)
                .sessionTimeoutMs(timeout)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();

        cf.start();
    }

    @Override
    public void lock(String key, Runnable command) {
        String path = LOCKER_ROOT + "/" + key;
        InterProcessLock lock = new InterProcessMutex(cf, path);
        try {
            // 本文由公從號“彤哥讀源碼”原創
            lock.acquire();
            command.run();
        } catch (Exception e) {
            log.error("get lock error", e);
            throw new RuntimeException("get lock error", e);
        } finally {
            try {
                lock.release();
            } catch (Exception e) {
                log.error("release lock error", e);
                throw new RuntimeException("release lock error", e);
            }
        }
    }
}

除了互斥鎖,curator還提供了讀寫鎖、多重鎖、信號量等實現方式,而且他們是可重入的鎖。

總結

(1)zookeeper中的節點有四種類型:持久、持久有序、臨時、臨時有序;

(2)zookeeper提供了一種非常重要的特性——監聽機制,它可以用來監聽節點的變化;

(3)zookeeper分布式鎖是基于 臨時有序節點 + 監聽機制 實現的;

(4)zookeeper分布式鎖加鎖時在鎖路徑下創建臨時有序節點;

(5)如果自己是第一個節點,則獲得鎖;

(6)如果自己不是第一個節點,則監聽前一個節點,并阻塞當前線程;

(7)當監聽到前一個節點的刪除事件時,喚醒當前節點的線程,并再次檢查自己是不是第一個節點;

(8)使用臨時有序節點而不是持久有序節點是為了讓客戶端無故斷線時能夠自動釋放鎖;

彩蛋

zookeeper分布式鎖有哪些優點?

答:1)zookeeper本身可以集群部署,相對于mysql的單點更可靠;

2)不會占用mysql的連接數,不會增加mysql的壓力;

3)使用監聽機制,減少線程上下文切換的次數;

4)客戶端斷線能夠自動釋放鎖,非常安全;

5)有現有的輪子curator可以使用;

6)curator實現方式是可重入的,對現有代碼改造成本小;

zookeeper分布式鎖有哪些缺點?

答:1)加鎖會頻繁地“寫”zookeeper,增加zookeeper的壓力;

2)寫zookeeper的時候會在集群進行同步,節點數越多,同步越慢,獲取鎖的過程越慢;

3)需要另外依賴zookeeper,而大部分服務是不會使用zookeeper的,增加了系統的復雜性;

4)相對于redis分布式鎖,性能要稍微略差一些;

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

东乡族自治县| 台中县| 巴楚县| 崇明县| 建昌县| 九江县| 常山县| 酉阳| 临邑县| 肇州县| 临夏县| 阳山县| 锦州市| 五指山市| 克山县| 石嘴山市| 双峰县| 定结县| 乌兰察布市| 江津市| 延长县| 苗栗市| 石家庄市| 金华市| 孟津县| 东山县| 灌南县| 梅河口市| 灵石县| 奇台县| 米易县| 汤原县| 交口县| 兴海县| 阳谷县| 北辰区| 泸州市| 苏尼特左旗| 阿克陶县| 鹤壁市| 普格县|