您好,登錄后才能下訂單哦!
本篇內容介紹了“SpringBoot怎么整合RocketMQ事務、廣播以及順序消息”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
環境:springboot2.3.9RELEASE + RocketMQ4.8.0
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
server: port: 8080 --- rocketmq: nameServer: localhost:9876 producer: group: demo-mq
發送
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String message) { rocketMQTemplate.convertAndSend("test-topic:tag2", MessageBuilder.withPayload(message).build()); }
接受
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "consumer01-group", selectorExpression = "tag1 || tag2") @Component public class ConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("接收到消息:" + message) ; } }
發送
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendOrder(String topic, String message, String tags, int id) { rocketMQTemplate.asyncSendOrderly(topic + ":" + tags, MessageBuilder.withPayload(message).build(), "order-" + id, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.err.println("msg-id: " + sendResult.getMsgId() + ": " + message +"\tqueueId: " + sendResult.getMessageQueue().getQueueId()) ; } @Override public void onException(Throwable e) { e.printStackTrace() ; } }); }
這里是根據hashkey將消息發送到不同的隊列中
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "consumer02-group", selectorExpression = "tag3 || tag4", consumeMode = ConsumeMode.ORDERLY) @Component public class ConsumerOrderListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(Thread.currentThread().getName() + " 接收到Order消息:" + message) ; } }
consumeMode = ConsumeMode.ORDERLY,指明了消息模式為順序模式,一個隊列,一個線程。
結果
當consumeMode = ConsumeMode.CONCURRENTLY執行結果如下:
發送端
@Resource private RocketMQTemplate rocketMQTemplate ; public void send(String topic, String message, String tags) { rocketMQTemplate.send(topic + ":" + tags, MessageBuilder.withPayload(message).build()) ; }
消費端
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.CLUSTERING) @Component public class ConsumerBroadListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }
messageModel = MessageModel.CLUSTERING
測試
啟動兩個服務分別端口是8080,8081
8080服務
8081服務
集群消息模式下,每個服務分別接收一部分消息,實現了負載均衡
消費端
@RocketMQMessageListener(topic = "broad-topic", consumerGroup = "consumer03-group", selectorExpression = "tag6 || tag7", messageModel = MessageModel.BROADCASTING) @Component public class ConsumerBroadListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("ConsumerBroadListener1接收到消息:" + message) ; } }
messageModel = MessageModel.BROADCASTING
測試
啟動兩個服務分別端口是8080,8081
8080服務
8081服務
集群消息模式下,每個服務分別都接受了同樣的消息。
RocketMQ事務的3個狀態
TransactionStatus.CommitTransaction:提交事務消息,消費者可以消費此消息
TransactionStatus.RollbackTransaction:回滾事務,它代表該消息將被刪除,不允許被消費。
TransactionStatus.Unknown :中間狀態,它代表需要檢查消息隊列來確定狀態。
RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程 整體流程為:
正常事務發送與提交階段
1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務端響應消息寫入結果,半消息發送成功
3、開始執行本地事務
4、根據本地事務的執行狀態執行Commit或者Rollback操作
事務信息的補償流程
1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求
2、生產者收到確認回查請求后,檢查本地事務的執行狀態
3、根據檢查后的結果執行Commit或者Rollback操作
補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。
發送端
@Resource private RocketMQTemplate rocketMQTemplate ; public void sendTx(String topic, Long id, String tags) { rocketMQTemplate.sendMessageInTransaction(topic + ":" + tags, MessageBuilder.withPayload( new Users(id, UUID.randomUUID().toString().replaceAll("-", ""))). setHeader("BID", UUID.randomUUID().toString().replaceAll("-", "")).build(), UUID.randomUUID().toString().replaceAll("-", "")) ; }
生產者對應的監聽器
@RocketMQTransactionListener public class ProducerTxListener implements RocketMQLocalTransactionListener { @Resource private BusinessService bs ; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 這里執行本地的事務操作,比如保存數據。 try { // 創建一個日志記錄表,將這唯一的ID存入數據庫中,在下面的check方法中可以根據這個id查詢是否有數據 String id = (String) msg.getHeaders().get("BID") ; Users users = new JsonMapper().readValue((byte[])msg.getPayload(), Users.class) ; System.out.println("消息內容:" + users + "\t參與數據:" + arg + "\t本次事務的唯一編號:" + id) ; bs.save(users, new UsersLog(users.getId(), id)) ; } catch (Exception e) { e.printStackTrace() ; return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 這里檢查本地事務是否執行成功 String id = (String) msg.getHeaders().get("BID") ; System.out.println("執行查詢ID為:" + id + " 的數據是否存在") ; UsersLog usersLog = bs.queryUsersLog(id) ; if (usersLog == null) { return RocketMQLocalTransactionState.ROLLBACK ; } return RocketMQLocalTransactionState.COMMIT ; } }
消費端
@RocketMQMessageListener(topic = "tx-topic", consumerGroup = "consumer05-group", selectorExpression = "tag10") @Component public class ConsumerTxListener implements RocketMQListener<Users> { @Override public void onMessage(Users users) { System.out.println("TX接收到消息:" + users) ; } }
Service
@Transactional public boolean save(Users users, UsersLog usersLog) { usersRepository.save(users) ; usersLogRepository.save(usersLog) ; if (users.getId() == 1) { throw new RuntimeException("數據錯誤") ; } return true ; } public UsersLog queryUsersLog(String bid) { return usersLogRepository.findByBid(bid) ; }
Controller
@GetMapping("/tx/{id}") public Object sendTx(@PathVariable("id")Long id) { ps.sendTx("tx-topic", id, "tag10") ; return "send transaction success" ; }
測試
調用接口后,控制臺輸出:
從打印日志看出來都保存完畢了后 消費端才接受到消息。
刪除數據,再測試ID為1會報錯的。
數據庫中沒有數據。。。
是不是也不是很復雜,2個階段來處理。
完畢!!!
“SpringBoot怎么整合RocketMQ事務、廣播以及順序消息”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。