您好,登錄后才能下訂單哦!
Message Queue Selector如何實現順序消費,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
順序消息是指消息的消費順序和生產順序相同,在某些場景下,必須保證順序消息。比如訂單的生成、付款、發貨.順序消息又分為全局順序消息和部分順序消息,全局順序消息指某一個topic下的所有消息都要保證順序;部分順序消息只要保證某一組消息被順序消費。對于訂單消息來說,只要保證同一個訂單ID的生成、付款、發貨消息按照順序消費即可。
1. 發送端:保證相同訂單ID的各種消息發往同一個MessageQueue(同一個Topic下的某一個queue)
2.消費端:保證同一個MessageQueue里面的消息不被并發處理 (同一個Topic的不同MessageQueue是可以同時消費的)
DefaultMQProducer producer = new DefaultMQProducer("local-test-producer"); producer.setNamesrvAddr("10.76.0.38:9876"); producer.start(); for (int i = 0; i < 1000; i++) { Order order = new Order(); order.orderId = i; order.status = "生成"; Message msg1 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult1 = producer.send(msg1, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult1={}",sendResult1); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="付款"; Message msg2 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); SendResult sendResult2 = producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); log.info("sendResult2={}",sendResult2); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); order.status="發貨"; Message msg3 = new Message("local-test-producer", "TagA", JsonUtils.toJson(order).getBytes() ); producer.send(msg2, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } }, order.orderId); Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); SendResult sendResult3 = producer.send(msg3, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } //MessageQueueSelector保證同一個orderId的消息都存儲在同一個MessageQueue。 }, order.orderId); log.info("sendResult3={}",sendResult1); }
消費端主要邏輯如下,主要MessageListenerOrderly回調實現同一個MessageQueue里面的消息不會被并發消費:
//同一個MessageQueue里面的消息要順序消費,不能并發消費。 //但是同一個Topic的不同MessageQueue是可以同時消費的 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("local-test-consumer2"); consumer.setNamesrvAddr("10.76.0.38:9876"); consumer.subscribe("test", ""); consumer.setPullBatchSize(1); consumer.setConsumeThreadMin(1); consumer.setConsumeThreadMax(1); // consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { List<String> messages = new ArrayList<>(); for (MessageExt msg : msgs) { messages.add(new String(msg.getBody()) +"\tbroker:"+msg.getStoreHost()); } System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), messages); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); Thread.currentThread().join();
源碼分析:
我們知道在RocketMQ中是可以給一個消費者實例設置多個線程并發消費的. consumer.setConsumeThreadMin 和 setConsumeThreadMax,
那MessageListenerOrderly是如何保證某一個時刻,只有一個消費者的某一個線程在消費某一個MessageQueue的呢?
就在Client模塊的 ConsumeMessageOrderlyService里面,消費者端并不是簡單的禁止并發處理,而是給每一個Consumer Queue加鎖,
private final MessageQueueLock messageQueueLock = new MessageQueueLock();
在消費每個消息之前,需要先獲取這個消息對應的Consumer Queue所對應的鎖,保證同一個Consumer Queue的消息不會被并發消費,但是不同的Consumer Queue的消息是可以并發處理的。
看完上述內容,你們掌握Message Queue Selector如何實現順序消費的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。