您好,登錄后才能下訂單哦!
使用spring boot + rabbitmq的時候,在開發過程中,可能會想要臨時停用/啟用監聽,或修改監聽消費者數量。如果每次修改都重啟比較浪費時間,所以研究了一下不停機就啟用停用監聽或修改一些配置
一. 關于rabbitmq監聽的配置
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { ... }
RabbitAnnotationDrivenConfiguration中主要就是監聽工廠的配置、監聽工廠,但是這里也只是創建bean,并沒有真正的初始化
通過配置里的bean類名,分析一下,rabbitmq的監聽肯定是由監聽工廠創建的,所以找到監聽工廠SimpleRabbitListenerContainerFactory
@Bean @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory") public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); configurer.configure(factory, connectionFactory); return factory; }
既然自動配置里面沒有初始化監聽,那就應該是在其他地方調用的,進入監聽工廠類中,發現有initializeContainer(SimpleMessageListenerContainer instance)方法,猜測初始化肯定與這個方法有關,所以查看有哪些地方調用,于是找到RabbitListenerEndpointRegistry.createListenerContainer(RabbitListenerEndpoint endpoint,RabbitListenerContainerFactory<?> factory)方法中有創建監聽容器和初始化的代碼
/** * Create and start a new {@link MessageListenerContainer} using the specified factory. * @param endpoint the endpoint to create a {@link MessageListenerContainer}. * @param factory the {@link RabbitListenerContainerFactory} to use. * @return the {@link MessageListenerContainer}. */ protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer; }
繼續找調用這個方法的地方,找到RabbitListenerEndpointRegistrar.afterPropertiesSet()方法之后,發現調用的地方很多了
看看afterPropertiesSet方法,是InitializingBean接口中的,猜測應該是spring容器創建bean之后都會調用的bean初始化的方法,所以查找找到RabbitListenerEndpointRegistrar是在哪里創建的實例。原來是在RabbitListenerAnnotationBeanPostProcessor中的私有屬性,而RabbitListenerAnnotationBeanPostProcessor是在RabbitBootstrapConfiguration這個自動配置里面初始化的,所以這就找到rabbitmq初始化監聽的源頭了
二. 動態管理rabbitmq監聽
回到最初的問題,想要動態的啟用停用mq的監聽,所以先看看初始化配置的類,既然有初始化,那可能會有相關的管理,于是在RabbitListenerEndpointRegistry中找到了start()和stop()方法,里面有對監聽容器進行操作,主要源碼如下
/** * @return the managed {@link MessageListenerContainer} instance(s). */ public Collection<MessageListenerContainer> getListenerContainers() { return Collections.unmodifiableCollection(this.listenerContainers.values()); } @Override public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } } /** * Start the specified {@link MessageListenerContainer} if it should be started * on startup or when start is called explicitly after startup. * @see MessageListenerContainer#isAutoStartup() */ private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); } } @Override public void stop() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { listenerContainer.stop(); } }
寫個controller,注入RabbitListenerEndpointRegistry,使用start()和stop()對監聽進行啟用停用的操作,并且RabbitListenerEndpointRegistry實例還可以獲取監聽容器,對監聽的一些參數也能進行修改,比如消費者數量。代碼如下:
import java.util.Set; import javax.annotation.Resource; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.itopener.framework.ResultMap; /** * Created by fuwei.deng on 2017年7月24日. */ @RestController @RequestMapping("rabbitmq/listener") public class RabbitMQController { @Resource private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry; @RequestMapping("stop") public ResultMap stop(){ rabbitListenerEndpointRegistry.stop(); return ResultMap.buildSuccess(); } @RequestMapping("start") public ResultMap start(){ rabbitListenerEndpointRegistry.start(); return ResultMap.buildSuccess(); } @RequestMapping("setup") public ResultMap setup(int consumer, int maxConsumer){ Set<String> containerIds = rabbitListenerEndpointRegistry.getListenerContainerIds(); SimpleMessageListenerContainer container = null; for(String id : containerIds){ container = (SimpleMessageListenerContainer) rabbitListenerEndpointRegistry.getListenerContainer(id); if(container != null){ container.setConcurrentConsumers(consumer); container.setMaxConcurrentConsumers(maxConsumer); } } return ResultMap.buildSuccess(); } }
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。