您好,登錄后才能下訂單哦!
本篇內容介紹了“RabbitMQ是怎么確定消息是否投遞到隊列中的”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1. 前言
在使用RabbitMQ消息中間件時,因為消息的投遞是異步的,默認情況下,RabbitMQ會刪除那些無法路由的消息。為了能夠檢出消息是否順利投遞到隊列,我們需要相應的處理機制。今天就來驗證一下相關的驗證機制。
2. 消息投遞失敗
那么哪些情況消息會投遞失敗呢?RabbitMQ消息會先到達指定的交換機,然后由交換機路由到對應的隊列。所以以下幾種情況會導致消息投遞失敗。
投遞的交換機不可用。
投遞的交換機可用,但是沒有匹配到隊列。
3. 投遞失敗的處理機制
對應上面的兩種情況,RabbitMQ提供了對應的解決方案。
ConfirmCallback
RabbitMQ提供了ConfirmCallback接口用于實現消息發送到RabbitMQ交換器后進行確認回調。
在Spring Boot中需要開啟:
spring: rabbitmq: # 通常選擇 correlated publisher-confirm-type:
通常有三種選擇:
NONE ,禁用發布確認模式,是默認值。
CORRELATED,發布消息時會攜帶一個CorrelationData,被ack/nack時CorrelationData會被返回進行對照處理,CorrelationData可以包含比較豐富的元信息進行回調邏輯的處理。
SIMPLE,當被ack/nack后會等待所有消息被發布,如果超時會觸發異常,甚至關閉連接通道。
這里我使用CORRELATED模式,聲明一個ConfirmCallback并設置到RabbitTemplate中
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { // correlationData 可能為空 if (ack) { log.debug("消息發送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息發送到exchange失敗,原因: {}", cause); } });
當消息投遞到一個不存在的交換機Exchange且ack=false時會輸出日志:
- Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=a088eb3f-a234-4e15-bb7a-3aa9a6f043e6, spring_returned_message_correlation=29975bc1-f363-4e3a-85ca-010d13888720, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE1], routingKey = [DIRECT_ROUTING_KEY2] - 消息發送到exchange失敗,原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE1' in vhost 'my_vhost', class-id=60, method-id=40)
這里實現的比較簡單你可以增加一些消息投遞到交換機失敗后的操作處理邏輯。
ReturnCallback
ReturnCallback接口用于實現消息已經成功發送到RabbitMQ交換機,但沒有匹配到隊列時的回調。
在Spring Boot中需要同時開啟:
spring: rabbitmq: publisher-returns: true template: mandatory: true
RabbitTemplate中的mandatory設置值優先級要高一些。
我們聲明一個ReturnCallback并設置到RabbitTemplate中
rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties() .getHeader(PublisherCallbackChannel.RETURNED_MESSAGE_CORRELATION_KEY); log.debug("消息:{} 發送失敗, 應答碼:{} 原因:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey); });
當消息成功投遞到交換機但是無法匹配到隊列時:
- Publishing message [(Body:'"hello"' MessageProperties [headers={spring_listener_return_correlation=155648bd-fc3e-4c8b-a650-7b1ce720c7a6, spring_returned_message_correlation=7029ee49-357a-42fc-8532-dc41b4bb8e87, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=7, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [DIRECT_EXCHANGE], routingKey = [DIRECT_ROUTING_KEY2] - 消息:7029ee49-357a-42fc-8532-dc41b4bb8e87 發送失敗, 應答碼:312 原因:NO_ROUTE 交換機: DIRECT_EXCHANGE 路由鍵: DIRECT_ROUTING_KEY2 - 消息發送到exchange成功,id: 7029ee49-357a-42fc-8532-dc41b4bb8e87
從上面我們也可以看出ReturnCallback只處理投遞到隊列失敗的情況,并不像ConfirmCallback既能處理失敗的情況也能處理成功的情況。
“RabbitMQ是怎么確定消息是否投遞到隊列中的”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。