亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Raft組件

發布時間:2021-10-29 09:12:28 來源:億速云 閱讀:153 作者:iii 欄目:web開發

這篇文章主要講解了“如何使用Raft組件”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用Raft組件”吧!

一、編譯

github下載 Ratis 直接 mvn clean package 即可,如果編譯過程中出錯,可以先clean install  ratis-proto

二、示例

Ratis 自帶的示例有三個:

  • arithmetic

  • counter

  • filestore

在 ratis-examples 模塊中,對于 arithmetic 和 filestore比較方便,可以通過main/bin目錄下的 shell  腳本快速啟動 Server 和 Client 來進行測試。

對于Raft,咱們都知道是需要多實例組成集群才能測試,你啟動一個實例沒啥用,連選主都成問題。Bin 目錄下的 start-all 支持 example  的名稱以及對應的命令。比如 filestore server 代表是啟動 filestore 這個應用的server。對應的命令參數會在相應example里的  cli 中解析。同時會一次性啟動三個server,組成一個集群并在周期內完成選舉。

而對于 counter 這個示例,并沒有相應的腳本來快速啟動三個server,這個我們可以通過命令行或者在IDE里以參數的形式啟動。

三、分析

下面我們來示例里看下 Raft Server 是怎樣工作的。

對于 counter 示例來說,我們啟動的時候,需要傳入一個參數,代表當前的server是第幾個,目的在于,要從 peers 列表中得知該用哪個IP +  端口去啟動它。這里我們能發現,這個 peers 列表,是在代碼內提前設置好的。當然你說動態配置啥的,也沒啥問題,另外兩個示例是通過shell 腳本里common  中的配置傳入的。

所以,第一步我們看到, Raft Server 在啟動的時候,會通過「配置」的形式,來知道 peer  之間的存在,這樣才能彼此通信,讓別人給自己投票或者給別人投票,完成 Term 內的選舉。另外,才能接收到 Leader 傳過來的 Log  ,并且應用到本地。

第二步,我們來看下 Client 和 集群之間是如何通信的。整個 Raft 集群可能有多個實例,我們知道必須通過 Leader  來完成寫操作。那怎樣知道誰是Leader?有什么辦法?

一般常見的思路有:

  • 在寫之前,先去集群內查一下,誰是 Leader,然后再寫

  • 隨機拿一個寫,不行再換一個,不停的試,總會有一個成功。

當然方式二這樣試下去效率不太高。所以會在這個隨機試一次之后,集群會將當前的 Leader 信息返回給 Client,然后 Client  直接通過這個建立連接進行通信即可。

在 Ratis 里, Client 調用非 Leader 節點會收到 Server 拋出的一個異常,異常中會包含一個稱為 suggestLeader  的信息,表示當前正確的 Leader,按這個連上去就行。當然,如果如果在此過程中發生的 Leader 的變更,那就會有一個新的suggestLeader  返回來,再次重試。

我們來看 Counter 這個示例中的實現。

Server 和 Client 的共用的Common 代碼中,包含 peers 的聲明

public final class CounterCommon {   public static final List<RaftPeer> PEERS = new ArrayList<>(3);    static {     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n1"), "127.0.0.1:6000"));     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n2"), "127.0.0.1:6001"));     PEERS.add(new RaftPeer(RaftPeerId.getRaftPeerId("n3"), "127.0.0.1:6002"));   }

這里聲明了三個節點。

通過命令行啟動時,會直接把index 傳進來, index 取值1-3。

java -cp *.jar org.apache.ratis.examples.counter.server.CounterServer {serverIndex}

然后在Server 啟動的時候,拿到對應的配置信息。

//find current peer object based on application parameter     RaftPeer currentPeer =         CounterCommon.PEERS.get(Integer.parseInt(args[0]) - 1);

再設置存儲目錄

//set the storage directory (different for each peer) in RaftProperty object     File raftStorageDir = new File("./" + currentPeer.getId().toString());     RaftServerConfigKeys.setStorageDir(properties,         Collections.singletonList(raftStorageDir))

重點看這里,每個 Server 都會有一個狀態機「CounterStateMachine」,平時我們的「業務邏輯」都放到這里

//create the counter state machine which hold the counter value     CounterStateMachine counterStateMachine = new CounterStateMachine();

客戶端發送的命令,會在這個狀態機中被執行,同時這些命令又以Log 的形式復制給其它節點,各個節點的Log  又會在它自己的狀態機里執行,從而保證各個節點狀態的一致。

如何使用Raft組件

如何使用Raft組件

最后根據這些配置,生成 Raft Server 實例并啟動。

//create and start the Raft server     RaftServer server = RaftServer.newBuilder()         .setGroup(CounterCommon.RAFT_GROUP)         .setProperties(properties)         .setServerId(currentPeer.getId())         .setStateMachine(counterStateMachine)         .build();     server.start();

CounterStateMachine 里,應用計數的這一小段代碼,我們看先檢查了命令是否合法,然后執行命令

//check if the command is valid     String logData = entry.getStateMachineLogEntry().getLogData()         .toString(Charset.defaultCharset());     if (!logData.equals("INCREMENT")) {       return CompletableFuture.completedFuture(           Message.valueOf("Invalid Command"));     }     //update the last applied term and index     final long index = entry.getIndex();     updateLastAppliedTermIndex(entry.getTerm(), index);      //actual execution of the command: increment the counter     counter.incrementAndGet();      //return the new value of the counter to the client     final CompletableFuture<Message> f =         CompletableFuture.completedFuture(Message.valueOf(counter.toString()));      //if leader, log the incremented value and it's log index     if (trx.getServerRole() == RaftProtos.RaftPeerRole.LEADER) {       LOG.info("{}: Increment to {}", index, counter.toString());     }

我們再來看 Client 的實現。

和 Server 類似,通過配置屬性,創建一個實例

private static RaftClient buildClient() {     RaftProperties raftProperties = new RaftProperties();     RaftClient.Builder builder = RaftClient.newBuilder()         .setProperties(raftProperties)         .setRaftGroup(CounterCommon.RAFT_GROUP)         .setClientRpc(             new GrpcFactory(new Parameters())                 .newRaftClientRpc(ClientId.randomId(), raftProperties));     return builder.build();   }

然后就可以向Server發送命令開工了。

raftClient.send(Message.valueOf("INCREMENT"));

Counter 的狀態機支持INCREMENT 和 GET 兩個命令。所以example 最后執行了一個 GET 的命令來獲取最終的計數結果

RaftClientReply count = raftClient.sendReadOnly(Message.valueOf("GET"));

四、內部部分實現

RaftClientImpl 里,初期會從peers列表中選一個,當成leader 去請求。

RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {     this.clientId = clientId;     this.clientRpc = clientRpc;     this.peers = new ConcurrentLinkedQueue<>(group.getPeers());     this.groupId = group.getGroupId();     this.leaderId = leaderId != null? leaderId         : !peers.isEmpty()? peers.iterator().next().getId(): null;     ...   }

之后,會根據server 返回的不同異常分別處理。

private RaftClientReply sendRequest(RaftClientRequest request) throws IOException {     RaftClientReply reply;     try {       reply = clientRpc.sendRequest(request);     } catch (GroupMismatchException gme) {       throw gme;     } catch (IOException ioe) {       handleIOException(request, ioe);     }     reply = handleLeaderException(request, reply, null);     reply = handleRaftException(reply, Function.identity());     return reply;   }

比如在 handleLeaderException 中,又分幾種情況,因為通過Client 來和 Server  進行通訊的時候,會隨機從peers里選擇一個,做為leader去請求,如果 Server  返回異常,說它不是leader,就用下面的代碼,隨機從另外的peer里選擇一個再去請求。

final RaftPeerId oldLeader = request.getServerId();     final RaftPeerId curLeader = leaderId;     final boolean stillLeader = oldLeader.equals(curLeader);     if (newLeader == null && stillLeader) {       newLeader = CollectionUtils.random(oldLeader,           CollectionUtils.as(peers, RaftPeer::getId));     }   static <T> T random(final T given, Iterable<T> iteration) {     Objects.requireNonNull(given, "given == null");     Objects.requireNonNull(iteration, "iteration == null");      final List<T> list = StreamSupport.stream(iteration.spliterator(), false)         .filter(e -> !given.equals(e))         .collect(Collectors.toList());     final int size = list.size();     return size == 0? null: list.get(ThreadLocalRandom.current().nextInt(size));   }

是不是感覺很低效。如果這個時候,server 返回的信息里,告訴client 誰是 leader,那client 直接連上去就可以了是吧。

/**    * @return null if the reply is null or it has    * {@link NotLeaderException} or {@link LeaderNotReadyException}    * otherwise return the same reply.    */   RaftClientReply handleLeaderException(RaftClientRequest request, RaftClientReply reply,                                         Consumer<RaftClientRequest> handler) {     if (reply == null || reply.getException() instanceof LeaderNotReadyException) {       return null;     }     final NotLeaderException nle = reply.getNotLeaderException();     if (nle == null) {       return reply;     }     return handleNotLeaderException(request, nle, handler);   }
RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle,       Consumer<RaftClientRequest> handler) {     refreshPeers(nle.getPeers());     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null         : nle.getSuggestedLeader().getId();     handleIOException(request, nle, newLeader, handler);     return null;   }

我們會看到,在異常的信息中,如果能夠提取出一個 suggestedLeader,這時候就會做為新的leaderId來使用,下次直接連接了。

感謝各位的閱讀,以上就是“如何使用Raft組件”的內容了,經過本文的學習后,相信大家對如何使用Raft組件這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

利辛县| 香港| 蕉岭县| 柞水县| 武威市| 新安县| 修水县| 韶山市| 久治县| 和静县| 宽城| 呼伦贝尔市| 琼海市| 菏泽市| 丰县| 清徐县| 汉源县| 尼木县| 信阳市| 南汇区| 武胜县| 舟山市| 噶尔县| 泰和县| 镇远县| 大名县| 巴塘县| 库尔勒市| 青岛市| 临泽县| 丰宁| 祁东县| 宜阳县| 新化县| 新津县| 麻城市| 乐山市| 昭觉县| 肇源县| 新河县| 荣昌县|