您好,登錄后才能下訂單哦!
本篇內容介紹了“Zookeeper Queue隊列怎么實現 ”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1: Barries: 柵欄,見面知意。
2:Queue:Queue也就是我們所說的隊列
1:Barries:
1.1: 是指所有的現場都達到 barrier后才能進行后續的計算
1.2:所有的線程都完成自己的計算以后才能離開barrier
進入柵欄: 1,新建一個根節點 "/root" 2, 想進入barrier的線程在 “/root”下建立一個字節點"/root/c-i" 3,循環監聽"/root"孩子節點數的變化,每當其達到Size的時候就說明有Size個線程都已經達到了Barrier的 要求。
2:Queue:就是指一個生產者或消費者的模型
離開Barrier 1: 想離開Barrier的現場刪除掉在"/root" 下建立的子節點 2: 循環監聽"/root" 孩子節點數目的變化,當Size減少到0的時候它就可以離開了。
3 :Queue 隊列的實現
1 : 建立一個根節點"/root" 2 : 生產線程在"/root" 下建立一個SEQUENTAIL的節點 3 : 消費線程檢查"/root" 如果沒有就循環的監聽"/root" 節點的變化,直到它有自己的子節點,刪除序號 最小子字節點。
package sync; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.List; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; String root; //同步原語 SyncPrimitive(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); //建立Zookeeper連接,并且指定watcher zk = new ZooKeeper(address, 3000, this); //初始化鎖對象 mutex = new Integer(-1); System.out.println("Finished starting ZK:" + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } } @Override synchronized public void process(WatchedEvent event) { synchronized (mutex) { //有事件發生時,調用notify,使其他wait()點得以繼續 mutex.notify(); } } static public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; if (zk != null) { try { //一個barrier建立一個根目錄 Stat s = zk.exists(root, false); //不注冊watcher if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } try { //獲取自己的主機名 name = new String(InetAddress.getLocalHost() .getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } boolean enter() throws KeeperException, InterruptedException { //在根目錄下創建一個子節點.create和delete都會觸發children wathes,這樣getChildren就會收到通知,process()就會被調用 zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //一直等,直到根目錄下的子節點數目達到size時,函數退出 while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); //釋放mutex上的鎖 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //刪除自己創建的節點 zk.delete(root + "/" + name, 0); //一直等,直到根目錄下有子節點時,函數退出 while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } static public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { //一個queue建立一個根目錄 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } } //參數i是要創建節點的data boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); //根目錄下創建一個子節點,因為是SEQUENTIAL的,所以先創建的節點具有較小的序號 zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); //并不能保證list[0]就是序號最小的 //如果根目錄下沒有子節點就一直等 if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } //找到序號最小的節點將其刪除 else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tmp = new Integer(s.substring(7)); if (tmp < min) min = tmp; } System.out.println("Temporary value:" + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String[] args) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } private static void barrierTest(String[] args) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try { boolean flag = b.enter(); System.out.println("Enter barrier:" + args[2]); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e) { } catch (InterruptedException e) { } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try { b.leave(); } catch (KeeperException e) { } catch (InterruptedException e) { } System.out.println("Left barrier"); } private static void queueTest(String[] args) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input:" + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + 1); } catch (KeeperException e) { } catch (InterruptedException e) { } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) try { int r = q.consume(); System.out.println("Item:" + r); } catch (KeeperException e) { i--; } catch (InterruptedException e) { } } } }
“Zookeeper Queue隊列怎么實現 ”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。