您好,登錄后才能下訂單哦!
這篇文章主要介紹“RabbitMQ消息隊列怎么實現延遲任務”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“RabbitMQ消息隊列怎么實現延遲任務”文章能幫助大家解決問題。
延遲任務應用廣泛,延遲任務典型應用場景有訂單超時自動取消;支付回調重試。其中訂單超時取消具有冪等性屬性,無需考慮重復消費問題;支付回調重試需要考慮重復消費問題。
延遲任務具有如下特點:在未來的某個時間點執行;一般僅執行一次。
生產者將帶有延遲信息的消息發送到RabbitMQ交換機中,等待延遲時間結束方將消息轉發到綁定的隊列中,消費者通過監聽隊列消費消息。延遲任務的關鍵在消息在交換機中停留。
顯而易見,基于RabbitMQ實現延遲任務對服務器的可靠性要求極高,交換機內部消息無持久化機制,比如單機模式服務重啟,未開始的延遲任務均丟失。
RabbitMQ服務需要安裝x-delayed-message
插件以處理延遲消息。
延遲任務的實現對生產者的要求是將消息可靠的投遞到交換機,因此使用confirm確認機制即可。
訂單生成之后,先入庫,然后以訂單ID為key將訂單詳情存入Redis中(持久化),向RabbitMQ發送異步confirm確定請求。如果收到正常投遞返回,則刪除Redis中訂單ID為key的數據,回收內存,否則以訂單ID為key,從Redis中查詢出訂單數據,重新發送。
延遲任務的實現對消費者的要求是以信息不丟失的方式消費消息,具體表現在:手動確認消息的消費,防止消息丟失;消費端持續穩定,防止消息堆積;消息消費失敗有重試機制。
考慮到訂單延遲取消屬于冪等性操作,因此無需考慮消息的重復消費問題。
實現部分僅貼一部分核心源碼,完整項目請訪問GitHub。
考慮到下單是極為重要的操作,因此首先將訂單落庫、存盤,然后進行后續操作。
for (long i = 1; i <= 10; i++) { /* 1.模擬生成訂單 */ BuOrder order = createOrder(i); /* 2.訂單入庫 */ orderService.removeById(order); orderService.saveOrUpdate(order); /* 3.將訂單存入信息Redis */ RedisUtils.setObject(RabbitTemplateConfig.ORDER_PREFIX + i, order); /* 4.向RabbitMQ異步投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); }
生產者可靠投遞消息
public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String key = ORDER_PREFIX + correlationData.getId(); if (ack) { /* 如果消息投遞成功,則刪除Redis中訂單數據,回收內存 */ RedisUtils.deleteObject(key); } else { /* 從Redis中讀取訂單數據,重新投遞 */ BuOrder order = RedisUtils.getObject(key, BuOrder.class); /* 重新投遞消息 */ rabbitTemplate.convertAndSend(RabbitmqConfig.DELAY_EXCHANGE_NAME, RabbitmqConfig.DELAY_KEY, order, RabbitUtils.setDelay(30000), RabbitUtils.correlationData(order.getOrderId())); } }
消費者端手動確認,避免消息丟失;失敗自動重試。
@RabbitListener(queues = RabbitmqConfig.DELAY_QUEUE_NAME) public void consumeNode01(Channel channel, Message message, BuOrder order) throws IOException { if (Objects.equals(0, order.getOrderStatus())) { /* 修改訂單狀態,設置為關閉狀態 */ orderService.updateById(new BuOrder(order.getOrderId(), -1)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info(String.format("消費者節點01消費編號為【%s】的消息", order.getOrderId())); } }
消費者可靠消費應至少開啟兩個及以上應用,確保消息隊列中不積壓消息。
上述代碼涉及一個工具類RabbitUtils
,存在于如下依賴中,主要封裝RabbitMQ極常用的工具方法。
<dependency> <groupId>xin.altitude.cms</groupId> <artifactId>ucode-cms-common</artifactId> <version>1.4.3.1</version> </dependency>
關于“RabbitMQ消息隊列怎么實現延遲任務”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。