您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關如何解決springboot集成rocketmq關于tag的坑的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
新項目使用springboot的若依框架集成rocketmq,選擇集成RocketMQTemplate這種方式實現消息的發送和接收。
此處回調方法里有些業務不用關注,只關心發送方法
@Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private RocketMQTemplate rocketMQTemplate; public void send(ReqMsg msg){ rocketMQTemplate.asyncSend(msg.getMsg().getTopic()+":"+msg.getMsg().getTags(), msg.getMsg(), new SendCallback(){ @Override public void onSuccess(SendResult sendResult) { logger.debug("msgid:{} 發送成功" , sendResult.getMsgId()); logger.debug("發送mq成功后要執行的service: {}",msg.getMsg().getSendAfterMethod()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),sendResult,"0")); } @Override public void onException(Throwable throwable) { logger.error("mq發送異常!{}",throwable.toString()); logger.debug("發送mq失敗后執行的service: {}",msg.getMsg().getSendAfterMethod()); //異常描述截取500 length入庫 msg.getMsg().putUserProperty("exceptionDesc",throwable.toString()); IsaveSendAfterMqLog saveSendAfterMqLog = SpringUtils.getBean(msg.getMsg().getSendAfterMethod()); saveSendAfterMqLog.saveSendAfterMqLog(new SendAfterLog(msg.getMsg(),"1")); } }); } }
@Service @RocketMQMessageListener(topic = "${rocketmq.topic}", consumerGroup = "${rocketmq.consumer.group}", selectorExpression="${rocketmq.tags}") public class CbiRocketmqConsumer implements RocketMQListener<CbiMsg> { Logger logger = LoggerFactory.getLogger(CbiRocketmqConsumer.class); @Override public void onMessage(CbiMsg message) { String msgBody = new String(message.getBody()); String serviceName = message.getTags(); logger.info("本次消費服務名稱:{}",serviceName); AbSaveReceiveAfter saveReceiveAfter = SpringUtils.getBean(serviceName); saveReceiveAfter.saveReceiveAfter(new RecevieAfterLog(message, Constants.CONSUME_SUCCESS));//默認消費成功 } }
@RocketMQMessageListener這個注解里selectorExpression默認是*,接收topic下全部消息。想動態對tags進行配置。于是利用springboot獲取yml配置。寫死的時候沒有問題,但是改成$表達式配置后怎么都收不到消息,經排查居然是selectorExpression這個不支持配置,會原封的按表達式進入MQ容器初始化。然而注解里面的topic,comsumerGroup都可以正常拿到配置值。
翻源碼發現問題所在,項目啟動時,在ListenerContainerConfiguration在這個類里初始化mq容器時,對配置進行賦值
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setNameServer(rocketMQProperties.getNameServer()); container.setTopic(environment.resolvePlaceholders(annotation.topic())); container.setConsumerGroup(environment.resolvePlaceholders (annotation.consumerGroup())); container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener) bean); container.setObjectMapper(objectMapper); return container; }
topic和comsumerGroup都在springboot環境里獲取配置值了,唯獨selectorExpression這個沒有,直接默認注解里的。下面的問題就是需要自己在項目啟動,springboot容器起來,但是rocketmq容器未起的時候,動態去改注解里配置的值。然后讓Rocketmq啟動。
** * 因為RocketMQMessageListener不提供動態配置功能 * springboot初始化后rocket容器初始化前利用反射動態改變 * RocketMQMessageListener注解selectorExpression的值 * * */ @Component public class ChangeSelectorExpressionBeforeMqStart implements InitializingBean { @Value("${rocketmq.consumer.tags}") private String tags; @Override public void afterPropertiesSet() throws Exception { RocketMQMessageListener annoTable = CbiRocketmqConsumer.class.getAnnotation(RocketMQMessageListener.class); // 獲取代理處理器 InvocationHandler invocationHandler = Proxy.getInvocationHandler(annoTable); // 獲取私有 memberValues 屬性 Field f = invocationHandler.getClass().getDeclaredField("memberValues"); f.setAccessible(true); // 獲取實例的屬性map Map<String, Object> memberValues = (Map<String, Object>) f.get(invocationHandler); // 修改屬性值 memberValues.put("selectorExpression", tags); } }
問題解決。。
說明:springBoot集成RocketMQ開發
環境:阿里云+Centos8+RocketMQ+SpringBoot+Docker
啟動:docker start rmqserver rmqbroker[因為RocketMQ安裝在Docket容器中,所以這樣啟動]
brokerIP1=外網ip namesrvAddr=外網ip:9876 brokerName=broker_tanhua autoCreateTopicEnable=true
說明:
1.brokerIP1 當前broker監聽的IP
2.Broker是RocketMq的核心,負責消息的傳遞(提供者=》消費者)以及消息的持久化存儲,消息的HA機制以及服務器過濾功能。
3.autoCreateTopicEnable:自動創建Topic路由
我第一次配置時,broker.conf配置文件中沒有配置autoCreateTopicEnable,因此在程序運行時會提示沒有路由信息:No route info of this topic: tanhua-sso-login
我發送消息路由名字是tanhua-sso-login
No route info of this topic: tanhua-sso-login
錯誤信息截圖:我沒有截圖網上找了一個,差不多
我當時也在網上找了很多,有在啟動時添加自動創建的也有說防火墻開啟的原因,但是我感覺會這個的話應該都知道關防火墻。
在啟動時添加自動創建可能也好使,但是我沒試過,因為我在搜索時發現問題統一指向說沒有自動創建,因此我想的是直接在配置文件中進行修改,然后重啟
解決方式:
在broker.conf配置文件中添加如下配置:
autoCreateTopicEnable=true
application.properties:
# RocketMQ相關配置 rocketmq.nameServer=外網IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
【注】:這里配置的開通沒有spring,我之前加spring怎么也連接不上
pom.xml:
<!--RocketMQ相關--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.5.1</version> </dependency>
我在修改上面的錯誤后,緊接著又報
錯誤信息:
RemotingTooMuchRequestException: sendDefaultImpl call timeout
錯誤信息截圖:也是沒有截圖網上找了一個,差不多
思路:錯誤信息中提示call timeout,timeout一般想到到時連接或響應超時,因此在網上找到的是在發送MQ時出錯,網上解決方案是:修改Mq配置文件中的sendMsgTimeout,因此想到修改可以修改SpringBoot連接MQ時的配置設置
解決方案:添加rocketmq.producer.send-message-timeout= 6000
說明:給大一點發送信息超時時間。
說明:同時在SpringBoot集成RoctetMQ配置中沒有sendMsgTimeout因此用rocketmq=>輸入'.'=>輸入sendtimeout=>查看有哪些關于這個的配置。
完整配置:
# RocketMQ相關配置 rocketmq.nameServer=外網IP:9876 rocketmq.producer.group=tanhua rocketmq.producer.send-message-timeout= 6000
感謝各位的閱讀!關于“如何解決springboot集成rocketmq關于tag的坑”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。