您好,登錄后才能下訂單哦!
1.1項目背景:做一個災情預警的消息平臺,災情檢查系統需要向消息平臺里面推送消息,這里是典型的異構系統的消息傳遞,我們需要選擇一個中間件作為消息隊列,調研分析了rabbitmq,zeromq,activemq,kafka等消息中間件,綜合性能,安全,可持久化等角度果斷選擇了rabbitmq作為我們的消息中間件 (其實這里是因為rabbitmq 是spring官方支持的,開發起來方便)。需求上我們有多種類型的消息,這里有緊急推送的和一般的等區分,高并發時,就會有對消息進行優先推送的情況出現,于是rabbitmq消息隊優先級的推送功能是我們需要解決的首個技術點.
1.2技術調研:這里一個概念需要說明,為什么說是消息隊列的優先級而不是消息的優先級,來看下消息隊列的工作原理
生產者生成消息打到交換機里面(如果沒有聲明交換機,會打到default exchange里面),交換機綁定一個或多個隊列,消息進入隊列里面,消費者一直在監聽隊列,發現隊列里面有消息就開始消費,這里就是一個消息傳遞的過程,queue是一個棧隊列,棧是先進先出的,就是說消息來了依次排隊,一個隊列并不能實現消息的插隊和優先推送的功能。但是如果說我們的多個隊列有不同的優先級,不同優先級的消息通過roatingkey進入不同的隊列,優先級高的隊列消息被優先消費,這樣也能形成一個相對意義上的優先級,所以說這里不是消息的優先級而是隊列的優先級.
1.2.1 為什么說是相對意義上的優先級
有并發才有優先級,如果每個消息都能被瞬間處理也不會有消息優先推送的需求,那我們看看消息會在哪里阻塞
1,queue,很明顯高并發的時候隊列里面是會存在很多消息的,2,eschange ,高并發的時候producer發送給exchange的時候也會產生阻塞。
第一種情況由于我們隊列已經定義優先級了,所以進入隊列的消息都是同種優先級別的,并不需要插隊。而對于第二種情況,消息在exchange時阻塞時并不能實現消息優先進入隊列,依然是一個依次處理的情景,但是由于exchang到queue的處理速度極快,所有我們忽略了這塊的優先級。
1.2.3 代碼實現
在rabbitmq3.5版本之前,官方并沒有實現隊列優先級的功能,但論壇里面有一些插件可以實現(末尾附鏈接),這里我們主要說3.5版本之后的實現
1.2.3.1 Java代碼
Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection();//創建連接 Channelchannel = conn.createChannel();//創建channel Map<String,Object> arg = newHashMap<String, Object>(); arg.put("x-max-priority",10); //隊列的屬性參數 有10個優先級別 // 聲明(創建)隊列 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME,true,false, false, arg); // 消息內容 String message ="Hello World!"; channel.basicPublish("",QUEUE_NAME, null, message.getBytes()); BasicPropertiesprop =new BasicProperties(null, null, null, null, 1, null, null, null, null, null, null, null, null,null);//消息的參數,聲明該消息的優先級是1 channel.basicPublish("",QUEUE_NAME, prop, message.getBytes()); //消息發布 System.out.println("[x] Sent '" + message + "'"); //關閉通道和連接 channel.close(); conn.close();
客戶端看下結果:
1.2.3.2結合spring實現:
1.2.3.2.1 xml配置:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > <description>rabbitmq 連接服務配置</description> <!-- 連接配置 --> <rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}" password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.vhost}"/> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <!-- spring template聲明--> <!-- 聲明一個隊列 --> <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false"> <rabbit:queue-arguments> <entry key="x-max-priority"> <value type="java.lang.Integer">10</value>//這個地方一定是integer的,別的不好使!! </entry> </rabbit:queue-arguments> </rabbit:queue> <!-- 監聽配置queues:監聽的隊列,多個的話用逗號(,)分隔 ref:監聽器--> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"> <rabbit:listener queue-names="test_queue_key" ref="queueListenter" method="onMessage"/> </rabbit:listener-container> <bean id="queueListenter" class="com.DF.spring.springAMQP.QueueListener" />
1.2.3.2.2代碼部分:
producter:
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:/spring/rabbitmq-contextDemo2.xml"); RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate.class); Random random = new Random(); for (int i=0; i< 1000; i++){ final int priority = random.nextInt(10 - 1 + 1) + 1;//隨機的優先級 amqpTemplate.convertAndSend("test_queue_key", (Object)("hello world"), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } }); }
customer:
public class QueueListener implements MessageListener{ @Override public void onMessage(Message message) { try{ System.out.print("[x] 接收到的消息:"+new String(message.getBody(),"utf-8")+"&&&"+"優先級"+message.getMessageProperties().getPrority()); Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } } }
從客戶端看下隊列里面的消息:
我們發送隨機優先級的消息進入隊列,看看消費端打印出來的消息:
到這里,rabbitmq結合spring的demo功能實現......
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。