您好,登錄后才能下訂單哦!
RocketMQ如何快速入門,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
本章簡單講講RocketMQ的入門操作,消息發送和消息接收。
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency>
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { //構建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
查看結果
public static void main(String[] args){ try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_test_push"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: "+ msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
查看結果
看到消費的結果大家可能有疑問,我們生產消息的時候是按照順序生產的消息,消費時候為什么不是順序消費下來的。
MQ消息的無序性,每個主題對應多個隊列,生產消息時是根據算法放置不同的隊列中,消費則就是無序了(有序消息后面討論)
也有可能出現一條消息被消費了多次,RocketMQ的目標就是不丟數據,<u>每條消息至少發送一次</u>,內部通過ACK的確認機制實現的后面會重點討論
為了方便的查看消息的詳情我們可以通過消息的管控臺更好的管理和查看消息詳情,當然我們也可以通過后臺的提供的命令來為運維提供更多的管理。
RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
可以直接下載到本地之后通過mavne進行編譯獲取jar,該項目是SpringBoot項目
mvn clean package -Dmaven.test.skip=true java -jar target/rocketmq-console-ng-1.0.0.jar
丟到linux服務器上啟動
(1)啟動時設置具體的RocketMQ的參數
java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876
(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根據自己的NamesrvAddr進行修改rocketmq.config.namesrvAddr的值,默認端口12581
瀏覽器登錄查看控制臺信息
查看RocketMQ集群的節點信息
根據主題時間段查詢消息
查看某條消息的具體信息
管控臺提供了很多運維功能能極大的提高我們的運維效率,里面的功能包括創建主題、修改主題、發送消息、對消費者的信息進行查看等功能我們不一一介紹,可以簡單的了解使用。
看完上述內容,你們掌握RocketMQ如何快速入門的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。