亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行RocketMQ事務消息實現

發布時間:2021-11-23 21:26:23 來源:億速云 閱讀:181 作者:柒染 欄目:云計算

這篇文章將為大家詳細講解有關如何進行RocketMQ事務消息實現,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

摘要:            事務消息提交或回滾的實現原理就是根據commitlogOffset找到消息,如果是提交動作,就恢復原消息的主題與隊列,再次存入commitlog文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在commitlog文件中。

若您對RocketMQ技術感興趣,請加入 RocketMQ技術交流群

小編將重點分析RocketMQ Broker如何處理事務消息提交、回滾命令,根據前面的介紹,其入口EndTransactionProcessor#processRequest:

OperationResult result = new OperationResult();if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {        // @1result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);    // @2
      if (result.getResponseCode() == ResponseCode.SUCCESS) {  // @3
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    // @4
          if (res.getCode() == ResponseCode.SUCCESS) {
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());     // @5
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());    // @6
                RemotingCommand sendResult = sendFinalMessage(msgInner);                              // @7
                if (sendResult.getCode() == ResponseCode.SUCCESS) {             
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    // @8
                }                return sendResult;
           }          return res;
     }
}

代碼@1:如果請求為提交事務,進入事務消息提交處理流程。
代碼@2:提交消息,別被這名字誤導了,該方法主要是根據commitLogOffset從commitlog文件中查找消息返回OperationResult實例:
如何進行RocketMQ事務消息實現cdn.com/14618120dcde45cd59aba1c5cb3ccd5cee5c9e61.png">

  • private MessageExt prepareMessage :消息對象。

  • private int responseCode:查找結果。

  • private String responseRemark :錯誤提示。

代碼@3:如果成功查找到消息,則繼續處理,否則返回給客戶端,消息未找到錯誤信息。

代碼@4:驗證消息必要字段。
驗證消息的生產組與請求信息中的生產者組是否一致。
驗證消息的隊列偏移量(queueOffset)與請求信息中的偏移量是否一致。
驗證消息的commitLogOffset與請求信息中的CommitLogOffset是否一致。

代碼@5:調用endMessageTransaction方法,該方法主要的目的就是恢復事務消息的真實的主題、隊列,并設置事務ID。

代碼@6:設置消息的相關屬性,這一步應該直接在endMessageTransaction中實現就好,統一恢復原消息的數量,特別關注的是取消了事務相關的系統標記。

代碼@7:發送最終消息,其實現原理非常簡單,調用MessageStore將消息存儲在commitlog文件中,此時的消息,會被轉發到原消息主題對應的消費隊列,被消費者消費。

代碼@8:刪除預處理消息(prepare),其實是將消息存儲在主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC的主題中,代表這些消息已經被處理(提交或回滾)。

上述就是事務消息提交的流程,事務回滾類似,接下來大概分析一下事務消息回滾的流程。

EndTransactionProcessor#processRequest else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
       result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);    // @1
       if (result.getResponseCode() == ResponseCode.SUCCESS) {
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);            if (res.getCode() == ResponseCode.SUCCESS) {                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());   // @2
            }           return res;
       }
}

代碼@1:回滾消息,其實內部就是根據commitlogOffset查找消息。
代碼@2:將消息存儲在RMQ_SYS_TRANS_OP_HALF_TOPIC中,代表該消息已被處理,與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在commitlog文件中。

事務消息在Broker服務端的提交回滾流程就介紹到這了。其核心實現就是根據commitlogOffset找到消息,如果是提交動作,就恢復原消息的主題與隊列,再次存入commitlog文件進而轉到消息消費隊列,供消費者消費,然后將原預處理消息存入一個新的主題RMQ_SYS_TRANS_OP_HALF_TOPIC,代表該消息已被處理;回滾消息與提交事務消息不同的是,提交事務消息會將消息恢復原主題與隊列,再次存儲在commitlog文件中。

關于如何進行RocketMQ事務消息實現就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

五华县| 鱼台县| 沁阳市| 齐齐哈尔市| 北碚区| 泰安市| 镇康县| 和顺县| 博爱县| 达孜县| 彭阳县| 获嘉县| 临湘市| 贡嘎县| 阳谷县| 德庆县| 中卫市| 桂林市| 陇川县| 大埔县| 抚松县| 武陟县| 南丰县| 德安县| 垣曲县| 渝中区| 开封县| 资溪县| 满城县| 广宁县| 敦煌市| 天台县| 留坝县| 嘉义县| 桐庐县| 甘洛县| 漠河县| 饶平县| 宣武区| 尤溪县| 商水县|