亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用springboot + rabbitmq消息確認機制

發布時間:2021-11-01 16:01:03 來源:億速云 閱讀:193 作者:iii 欄目:編程語言

這篇文章主要介紹“怎么使用springboot + rabbitmq消息確認機制”,在日常操作中,相信很多人在怎么使用springboot + rabbitmq消息確認機制問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”怎么使用springboot + rabbitmq消息確認機制”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

一、準備環境
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置中需要開啟 發送端消費端 的消息確認。


spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 發送者開啟 confirm 確認機制
spring.rabbitmq.publisher-confirms=true
# 發送者開啟 return 確認機制
spring.rabbitmq.publisher-returns=true
####################################################
# 設置消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,并將隊列綁定在交換機上。

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}

rabbitmq 的消息確認分為兩部分:發送消息確認 和 消息接收確認。

怎么使用springboot + rabbitmq消息確認機制在這里插入圖片描述

二、消息發送確認

消息只要被 rabbitmq broker 接收到就會觸發 confirmCallback 回調 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息發送異常!");
        } else {
            log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

實現接口 ConfirmCallback ,重寫其confirm()方法,方法內有三個參數correlationDataackcause

  • correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。
  • ack:消息投遞到broker 的狀態,true表示成功。
  • cause:表示投遞失敗的原因。

但消息被 broker 接收到只能表示已經到達 MQ服務器,并不能保證消息一定會被投遞到目標 queue 里。所以接下來需要用到 returnCallback

如果消息未能投遞到目標 queue 里將觸發回調 returnCallback ,一旦向 queue 投遞消息未成功,這里一般會記錄下當前消息的詳細投遞數據,方便后續做重發或者補償等操作。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

實現接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey(隊列)。

下邊是具體的消息發送,在rabbitTemplate中設置 ConfirmReturn 回調,我們通過setDeliveryMode()對消息做持久化處理,為了后續測試創建一個 CorrelationData對象,添加一個id10000000000

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 確保消息發送失敗后可以重新返回到隊列中
         * 注意:yml需要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消費者確認收到消息后,手動ack回執回調處理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投遞到隊列失敗回調處理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 發送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

三、消息接收確認@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具體業務
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            
            if (message.getMessageProperties().getRedelivered()) {
                
                log.error("消息已重復處理失敗,拒絕再次接收...");
                
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                
                log.error("消息即將再次返回隊列處理...");
                
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消費消息有三種回執方法,我們來分析一下每種方法的含義。

basicAck:表示成功確認,使用此回執方法后,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag都會增加。手動消息確認模式下,我們可以對指定deliveryTag的消息進行acknackreject等操作。

multiple:是否批量確認,值為 true 則會一次性 ack所有小于當前消息 deliveryTag 的消息。

舉個栗子: 假設我先發送三條消息deliveryTag分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag為8,multiple設置為 true,會將5、6、7、8的消息全部進行確認。

basicNack :表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號。

multiple:是否批量確認。

requeue:值為 true 消息將重新入隊列。

basicReject:拒絕消息,與basicNack區別在于不能進行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號。

requeue:值為 true 消息將重新入隊列。

四、測試五、踩坑日志

這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。

開啟消息確認機制,消費消息別忘了channel.basicAck,否則消息會一直存在,導致重復消費。怎么使用springboot + rabbitmq消息確認機制

在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯后確認消息, int a = 1 / 0 發生異常后將消息重新投入隊列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消費者 2 號收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

但是有個問題是,業務代碼一旦出現 bug 99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環。

怎么使用springboot + rabbitmq消息確認機制在這里插入圖片描述

本地的CPU被瞬間打滿了,大家可以想象一下當時在生產環境導致服務死機,我是有多慌。

怎么使用springboot + rabbitmq消息確認機制而且rabbitmq management 只有一條未被確認的消息。

怎么使用springboot + rabbitmq消息確認機制在這里插入圖片描述

經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。

消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反復進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行。

而我們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時我們再次發送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優化設置了消息重試次數,達到了重試上限以后,手動確認,隊列刪除此消息,并將消息持久化入MySQL并推送報警,進行人工處理和定時任務做補償。

如何保證 MQ 的消費是冪等性,這個需要根據具體業務而定,可以借助MySQL、或者redis 將消息持久化,通過再消息中的唯一性屬性校驗。

到此,關于“怎么使用springboot + rabbitmq消息確認機制”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

故城县| 年辖:市辖区| 石阡县| 永和县| 万载县| 牙克石市| 友谊县| 武功县| 乌鲁木齐县| 洪洞县| 富平县| 崇明县| 西吉县| 栖霞市| 通海县| 新巴尔虎右旗| 曲麻莱县| 浦东新区| 正安县| 凌海市| 犍为县| 南投市| 无锡市| 额尔古纳市| 湖口县| 郓城县| 自治县| 会同县| 合肥市| 迁安市| 武山县| 饶平县| 安仁县| 东乌珠穆沁旗| 海盐县| 莎车县| 高尔夫| 泸州市| 镇赉县| 沙湾县| 蓬莱市|