亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RabbitMQ 可靠投遞

發布時間:2020-07-15 14:27:53 來源:網絡 閱讀:5652 作者:王清培 欄目:軟件技術
  • 背景
  • confirmCallback 確認模式
  • returnCallback 未投遞到 queue 退回模式
  • shovel-plugin 跨機房可靠投遞

背景

在使用 RabbitMQ 的時候,作為消息發送方希望杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 為我們提供了兩個選項用來控制消息的投遞可靠性模式。

rabbitmq 整個消息投遞的路徑為:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster 則會返回一個 confirmCallback
messageexchange->queue 投遞失敗則會返回一個 returnCallback 。我們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。

confirmCallback 確認模式

在創建 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啟 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啟confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息發送失敗!" + cause + data.toString());
        } else {
            log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我們來看下 ConfirmCallback 接口。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重點是 CorrelationData 對象,每個發送的消息都需要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息唯一性。

發送的時候創建一個 CorrelationData 對象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

這里將 user ID 設置為當前消息 CorrelationData id 。當然這里是純粹 demo,真實場景是需要做業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對賬。

消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才會調用 confirmCallback

broker 接收到只能表示 message 已經到達服務器,并不能保證消息一定會被投遞到目標 queue 里。所以需要用到接下來的 returnCallback

returnCallback 未投遞到queue退回模式

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 里。在有些業務場景下,我們需要保證消息一定要投遞到目標 queue 里,此時就需要用到 return 退回模式。

同樣創建 ConnectionFactory 到時候需要設置 PublisherReturns(true) 選項。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啟return模式
rabbitTemplate.setMandatory(true);//開啟強制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

這樣如果未能投遞到目標 queue 里將調用 returnCallback ,可以記錄下詳細到投遞數據,定期的巡檢或者自動糾錯都需要這些數據。

shovel-plugin 跨機房可靠投遞

RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件非常方便,shovel 可以接受機房之間的網絡斷開、機器下線等不穩定因素。

這里有兩個 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我們希望將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。我們先開啟 __rabbit_node1shovel-plugin__。

先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,如果有的話直接開啟。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin 面板里看到這個設置選項,怎么設置這里就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost

如果配置沒有問題的話,應該是這樣的一個狀態,說明已經順利連接到 __rabbit_node2 broker__ 。

RabbitMQ 可靠投遞
RabbitMQ 可靠投遞

我們來看下 rabbit_node1rabbit_node2Connections 面板。
__rabbit_node1(10.211.55.3):__
RabbitMQ 可靠投遞

__rabbit_node2(10.211.55.4):__
RabbitMQ 可靠投遞

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 創建了兩個 tcp 連接,端口 39544 連接是用來消費 plen.queue 里的消息,端口 55706 連接是用來推送消息給 rabbit_node2

我們來看下 __rabbit_node1 tcp__ 連接狀態:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

__rabbit_node2 tcp__ 連接狀態:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

為了驗證 shovel-plugin 穩定性,我們將 __rabbit_node2__ 下線。
RabbitMQ 可靠投遞

然后再發送消息,發現消息會現在 rabbit_node1 plen.queue 里待著,一旦 shovel-plugin 連接恢復將消費 rabbit_node1 plen.queue 消息,然后投遞給 __rabbit_node2 plen.queue__ 。

作者:王清培 (滬江集團資深JAVA架構師)

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

凭祥市| 桃源县| 宁城县| 华亭县| 金昌市| 邮箱| 乌拉特中旗| 武清区| 新和县| 临颍县| 克什克腾旗| 阿合奇县| 吉木萨尔县| 红河县| 新建县| 石河子市| 霍山县| 阳山县| 田林县| 丽江市| 谢通门县| 通化市| 芮城县| 松江区| 友谊县| 灌南县| 穆棱市| 吴桥县| 鸡泽县| 旺苍县| 黄山市| 平山县| 犍为县| 巴彦淖尔市| 大兴区| 射洪县| 威海市| 陇南市| 田阳县| 藁城市| 宁乡县|