您好,登錄后才能下訂單哦!
fanout模式,生產者發送的消息到Exchange,Exchange同時往多個queue發送,多個消費者同時收到各自監聽的queue消息
1、安裝rabbitmq,pom.xml添加依賴,見之前博文有操作流程
2、添加配置文件,聲明兩個queue,一個fanoutExchange,然后將queue于Exchange進行綁定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author 馮戰魁 * @Date 2018/1/12 下午2:50 */ @Configuration public class AmqpConfig { @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必須要設置 return connectionFactory; } @Bean(name="Amessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean(name="Bmessage") public Queue BMessage() { return new Queue("fanout.B"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange");//配置廣播路由器 } @Bean Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } }
3、編寫生產者方法,發送四條消息
org.springframework.amqp.core.AmqpTemplate; org.springframework.beans.factory.annotation.; org.springframework.web.bind.annotation.; org.springframework.web.bind.annotation.; RabbitSenderController { AmqpTemplate ; () fanout(){ String[] tasks = {,,,}; (i=;i<tasks.;i++){ String content = tasks[i]; System..println(+ content); ..convertAndSend(,,content); } } }
4、編寫消費者,分別監聽兩個queue
org.springframework.amqp.rabbit.annotation.; org.springframework.stereotype.; FanoutRabbit { (queues=) processA(String str1) { System..println(+str1); } (queues=) processB(String str) { System..println(+str); } }
5.執行生產者接口http://localhost:8080/fanout
消費者結果如圖所示
可以看到,兩個消費者接收到相同的生產者發送的消息
至此fanout模式結束
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。