您好,登錄后才能下訂單哦!
這篇文章主要介紹了java中RabbitMQ高級應用方法的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇java中RabbitMQ高級應用方法文章都會有所收獲,下面我們一起來看看吧。
 在使用 RabbitMQ
的時候,生產者在進行消息投遞的時候如果想知道消息是否成功的投遞到對應的交換機和隊列中,有兩種方式可以用來控制消息投遞的可靠性模式 。
 由上圖的整個消息的投遞過程來看,生產者的消息進入到中間件中會首先到達交換機,然后再從交換機傳遞到隊列中去,也就是分為兩步走戰略。那么消息的丟失情況也就是會出現在這兩個階段中,RabbitMQ 貼心的為我們提供了針對于這兩個部分的可靠新傳遞模式:
confirm 模式。
return 模式。
 利用這兩個回調模式來確保消息的傳遞可靠。
 消息從生產者到交換機之間傳遞會返回一個 confirmCallback
的回調。可以直接在 rabbitTemplate
實例中進行確認邏輯的設置。如果是使用 XML
配置的話需要在工廠配置開啟 publisher-confirms="true",YAML
的配置就直接 publisher-confirm-type: correlated,他默認是 NONE
,需要手動開啟。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println(); if (!b) { // 消息重發之類的處理 System.out.println(s); } else { System.out.println("交換機成功接收消息"); } } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
 上面的確認是由一個 confirm
的函數執行的,里面攜帶了三個參數,第一個是配置的相關信息,第二個表示交換機是否成功的接收到消息,第三個參數是指沒有成功接收消息的原因。
 從交換機到消息隊列投遞失敗會返回一個 returnCallback
。在工廠配置中開啟回退模式 publisher-returns="true" ,設置交換機處理消息失敗的模式(默認 false 直接將消息進行丟棄),添加退回處理的邏輯。
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-rabbitmq.xml") public class Producer { @Autowired private RabbitTemplate rabbitTemplate; @Test public void producer() throws InterruptedException { rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 重發邏輯處理 System.out.println(message.getBody() + " 投遞消息隊列失敗"); } }); rabbitTemplate.convertAndSend("default_exchange", "default_queue", "hello world & beordie"); TimeUnit.SECONDS.sleep(5); } }
 returnedMessage
中攜帶五個參數、分別指的是消息對象、錯誤碼、錯誤信息、交換機、路由鍵。
 在消費者抓取消息隊列中的數據取消費之后會有一個確認機制進行消息的確認,防止因為抓取消息之后但沒有消費成功而導致的消息丟失。有三種確認方式:
自動確認:acknowledge="none"
手動確認:acknowledge="manual"
根據異常情況確認:acknowledge="auto"
 其中自動確認是指一旦消息被消費者抓取就自動默認成功,并將消息從消息隊列中進行移除,如果這個時候消費端消費出現問題,那么也會是默認消息消費成功,但是實際上是沒有消費成功的,也就是當前的消息丟失了。默認的情況就是自動確認機制。
 如果設置手動確認的方式,就需要在正常消費消息之后進行回調確認 channel.basicAck()
,手動簽收。如果業務處理過程中發生了異常則調用 channel.basicNack()
重新發送消息。
 首先需要在隊列綁定時進行確認機制的配置,設置為手動簽收。
<!-- 綁定隊列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
 生產者一端不用更改,只需要改變消費者的實現進行消息自動簽收就可以了,正常執行業務則簽收消息,業務發生錯誤則選擇消息拒簽,消息重發或者丟棄。
public class ConsumerAck implements ChannelAwareMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { // 消息唯一ID long tag = message.getMessageProperties().getDeliveryTag(); try { String msg = new String(message.getBody(), "utf-8"); channel.basicAck(tag, true); System.out.println("接收消息: " + msg); } catch (Exception e) { System.out.println("接收消息異常"); channel.basicNack(tag, true, true); e.printStackTrace(); } } }
 里面涉及三個簡單的簽收函數,一是正確簽收的 basicAck
,二是單條拒簽的 basicReject
,三是批量拒簽的 basicNack
。
basicAck 第一個參數表示消息在通道中的唯一ID,只針對當前的 Channel;第二個參數表示是否批量同意,如果是 false 的話只會同意簽收當前ID的一條消息,將其從消息隊列中進行刪除,而如果是 true 的話將會把此ID之前的消息一起給同意簽收了。
basicReject 第一個參數依舊表示消息的唯一ID,第二個參數表示是否重新回隊發送,false 表示直接丟棄該條消息或者有死信隊列可以接收, true 則表示重新回隊進行消息發送,所有操作只針對當前的消息。
basicNack 比第二個多了一個參數,也就是處于中間位置的布爾值,表示是否批量進行。
 在用戶請求和DB服務處理之間增加消息中間件的隔離,使得突發流量全部讓消息隊列來抗,降低服務端被沖垮的可能性。讓所有的請求都往隊列中存,消費端只需要勻速的取出消息進行消費,這樣就能保證運行效率,也不會因為后臺的阻塞而導致客戶端得不到正常的響應(當然指的是一些不需要同步回顯的任務)。
 只需要在消費者綁定消息隊列時指定取出消息的速率即可,需要使用手動簽收的方式,每進行一次的簽收才會從隊列中再取出下一條數據。
<!-- 綁定隊列 --> <rabbit:listener-container connection-factory="rabbitFactory" auto-declare="true" acknowledge="manual" prefetch="1"> <rabbit:listener ref="rabbirConsumer" queue-names="default_queue"/> </rabbit:listener-container>
 消息隊列提供了存儲在隊列中消息的過期時間,分為兩個方向的實現,一個是針對于整個隊列中的所有消息,也就是隊列的過期時間,另一個是針對當前消息的過期時間,也就是針對于單條消息單獨設置。
 隊列的過期時間設置很簡單,只需要在創建隊列時進行過期時間的指定即可,也可以通過控制臺直接創建指定過期時間。一旦隊列過期時間到了,隊列中還未被消費的消息都將過期,進行隊列的過期處理。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> </rabbit:queue-arguments> </rabbit:queue>
 單條消息的過期時間需要在發送的時候進行單獨的指定,發送的時候指定配置的額外信息,配置的編寫由配置類完成。
 如果一條消息的過期時間到了,但是他此時處于隊列的中間,那么他將不會被處理,只有當之后處理到時候才會進行判斷是否過期。
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 設置 message 的過期時間 message.getMessageProperties().setExpiration("5000"); // 返回該消息 return message; } }; rabbitTemplate.convertAndSend("exchange", "route", "msg", messagePostProcessor);
 如果說同時設置了消息的過期時間和隊列的過期時間,那么最終的過期時間由最短的時間進行決定,也就是說如果當前消息的過期時間沒到,但是整個隊列的過期時間到了,那么隊列中的所有消息也自然就過期了,執行過期的處理策略。
死信隊列指的是死信交換機,當一條消息成為死信之后可以重新發送到另一個交換機進行處理,而進行處理的這個交換機就叫做死信交換機。
消息成為死信消息有幾種情況
隊列的消息長度達到限制
消費者拒接消息的時候不把消息重新放入隊列中
隊列存在消息過期設置,消息超時未被消費
消息存在過期時間,在投遞給消費者時發現過期
 在創建隊列時可以在配置中指定相關的信息,例如死信交換機、隊列長度等等,之后的一系列工作就不由程序員進行操作了,MQ 會自己完成配置過的事件響應。
<rabbit:queue id="default_queue" name="default_queue" auto-declare="true"> <rabbit:queue-arguments> <!-- 死信交換機 --> <entry key="x-dead-letter-exchange" value-type="dlx_exchane"/> <!-- 路由 --> <entry key="x-dead-letter-routing-key" value-type="dlx_routing"/> <!-- 隊列過期時間 --> <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/> <!-- 隊列長度 --> <entry key="x-max-length" value-type="java.lang.Integer" value="10"/> </rabbit:queue-arguments> </rabbit:queue>
 延遲隊列指的是消息在進入隊列后不會立即被消費,只有到達指定時間之后才會被消費,也就是需要有一個時間的判斷條件。
 消息隊列實際上是沒有提供對延遲隊列的實現的,但是可以通過 TTL
+ 死信隊列
的方式完成,設置一個隊列,不被任何的消費者所消費,所有的消息進入都會被保存在里面,設置隊列的過期時間,一旦隊列過期將所有的消息過渡到綁定的死信隊列中。
 再由具體的消費者來消費死信隊列中的消息,這樣就實現了延遲隊列的功能。
 例如實現一個下單超時支付取消訂單的功能:
關于“java中RabbitMQ高級應用方法”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“java中RabbitMQ高級應用方法”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。