您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“分布式系統消息中間件RabbitMQ怎么用”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“分布式系統消息中間件RabbitMQ怎么用”這篇文章吧。
前言:
這篇文章主要總結一下RabbitMQ在日常項目開發中比較常用的幾個特性。
一。 mandatory 參數
上一篇文章中我們知道,生產者將消息發送到RabbitMQ的交換器中通過RoutingKey與BindingKey的匹配將之路由到具體的隊列中以供消費者消費。那么當我們通過匹配規則找不到隊列的時候,消息將何去何從呢?Rabbit給我們提供了兩種方式。mandatory與備份交換器。
mandatory參數是channel.BasicPublish方法中的參數。其主要功能是消息傳遞過程中不可達目的地時將消息返回給生產者。當mandatory 參數設為true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么RabbitMQ 會調用BasicReturn 命令將消息返回給生產者。當mandatory 參數設置為false 時。則消息直接被丟棄。其運轉流程與實現代碼如下(以C# RabbitMQ.Client 3.6.9為例):
//連接與創建信道--后續的示例代碼我們會省略掉這部分代碼和釋放連接 ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection();//連接Rabbit IModel channel = conn.CreateModel();//創建信道 channel.ExchangeDeclare("exchangeName", "direct", true);//定義交換器 String queueName = channel.QueueDeclare("TestQueue", true, false, false, null).QueueName;//定義 隊列 隊列名TestQueue,持久化的,非排它的,非自動刪除的。 channel.QueueBind(queueName, "exchangeName", "routingKey");//隊列綁定交換器 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchangeName", "routingKey", true, null, message);//發布一個可以路由到隊列的消息,mandatory參數設置為true var message1 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("exchangeName", "routingKey1", true, null, message);//發布一個不可以路由到隊列的消息,mandatory參數設置為true //生產者回調函數 channel.BasicReturn += (model, ea) => { //do something... 消息若不能路由到隊列則會調用此回調函數。 }; //關閉信道與連接 channel.close(); conn.close() ;
二。 備份交換器
當消息不能路由到隊列時,通過mandatory設置參數,我們可以將消息返回給生產者處理。但這樣會有一個問題,就是生產者需要開一個回調的函數來處理不能路由到的消息,這無疑會增加生產者的處理邏輯。備份交換器(Altemate Exchange)則提供了另一種方式來處理不能路由的消息。備份交換器可以將未被路由的消息存儲在RabbitMQ中,在需要的時候去處理這些消息。其主要實現代碼如下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("alternate-exchange", "altExchange"); channel.ExchangeDeclare("normalExchange", "direct", true, false, args);//定義普通交換器并添加備份交換器參數 channel.ExchangeDeclare("altExchange", "fanout", true, false, null); //定義備份交換器,并聲明為扇形交換器 channel.QueueDeclare("normalQueue", true, false, false, null);//定義普通隊列 channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey1");//普通隊列隊列綁定普通交換器 channel.QueueDeclare("altQueue", true, false, false, null);//定義備份隊列 channel.QueueBind("altQueue", "altExchange", "");//綁定備份隊列與交換器 var msg1 = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey1", false, null, msg1);//發布一個可以路由到隊列的消息,消息最終會路由到normalQueue var msg2 = Encoding.UTF8.GetBytes("TestMsg1"); channel.BasicPublish("normalExchange", "NormalRoutingKey2", false, null, msg2);//發布一個不可以被路由的消息,消息最終會進入altQueue
備份交換器其實和普通的交換器沒有太大的區別,為了方便使用,建議設置為fanout類型,若設置為direct 或者topic的類型。需要注意的是,消息被重新發送到備份交換器時的路由鍵和從生產者發出的路由鍵是一樣的。考慮這樣一種情況,如果備份交換器的類型是direct,并且有一個與其綁定的隊列,假設綁定的路由鍵是key1,當某條攜帶路由鍵為key2 的消息被轉發到這個備份交換器的時候,備份交換器沒有匹配到合適的隊列,則消息丟失。如果消息攜帶的路由鍵為keyl,則可以存儲到隊列中。
對于備份交換器,有以下幾種特殊情況:
如果設置的備份交換器不存在,客戶端和RabbitMQ 服務端都不會有異常出現,此時消息會丟失。
如果備份交換器沒有綁定任何隊列,客戶端和RabbitMQ 服務端都不會有異常出現,此時消息會丟失。
如果備份交換器沒有任何匹配的隊列,客戶端和RabbitMQ 服務端都不會有異常出現,此時消息會丟失。
如果備份交換器和mandatory參數一起使用,那么mandatory參數無效。
三。 過期時間(TTL)
3.1 設置消息的TTL
目前有兩種方法可以設置消息的TTL。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息本身進行單獨設置,每條消息的TTL可以不同。如果兩種方法一起使用,則消息的TTL 以兩者之間較小的那個數值為準。消息在隊列中的生存時間一旦超過設置的TTL值時,就會變成"死信" (Dead Message) ,消費者將無法再收到該消息。(有關死信隊列請往下看)
通過隊列屬性設置消息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl參數實現的,這個參數的單位是毫秒。示例代碼下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-message-ttl", 6000); channel.QueueDeclare("ttlQueue", true, false, false, args);
如果不設置TTL.則表示此消息不會過期;如果將TTL設置為0 ,則表示除非此時可以直接將消息投遞到消費者,否則該消息會被立即丟棄(或由死信隊列來處理)。
針對每條消息設置TTL的方法是在channel.BasicPublish方法中加入Expiration的屬性參數,單位為毫秒。關鍵代碼如下:
BasicProperties properties = new BasicProperties() { Expiration = "20000",//設置TTL為20000毫秒 }; var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, properties, message);
注意:對于第一種設置隊列TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而在第二種方法中,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的。Why?在第一種方法里,隊列中己過期的消息肯定在隊列頭部, RabbitMQ 只要定期從隊頭開始掃描是否有過期的消息即可。而第二種方法里,每條消息的過期時間不同,如果要刪除所有過期消息勢必要掃描整個隊列,所以不如等到此消息即將被消費時再判定是否過期,如果過期再進行刪除即可。
3.2 設置隊列的TTL
注意,這里和上述通過隊列設置消息的TTL不同。上面刪除的是消息,而這里刪除的是隊列。通過channel.QueueDeclare 方法中的x-expires參數可以控制隊列被自動刪除前處于未使用狀態的時間。這個未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,并且在過期時間段內也未調用過channel.BasicGet命令。
設置隊列里的TTL可以應用于類似RPC方式的回復隊列,在RPC中,許多隊列會被創建出來,但是卻是未被使用的(有關RabbitMQ實現RPC請往下看)。RabbitMQ會確保在過期時間到達后將隊列刪除,但是不保障刪除的動作有多及時。在RabbitMQ 重啟后, 持久化的隊列的過期時間會被重新計算。用于表示過期時間的x-expires參數以毫秒為單位, 井且服從和x-message-ttl一樣的約束條件,不同的是它不能設置為0(會報錯)。
示例代碼如下:
IDictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-expires", 6000); channel.QueueDeclare("ttlQueue", false, false, false, args);
四。 死信隊列
DLX(Dead-Letter-Exchange)死信交換器,當消息在一個隊列中變成死信之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX ,綁定DLX的隊列就稱之為死信隊列。
消息變成死信主要有以下幾種情況:
消息被拒絕(BasicReject/BasicNack) ,井且設置requeue 參數為false;(消費者確認機制將會在下一篇文章中涉及)
消息過期;
隊列達到最大長度。
DLX也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列中存在死信時,RabbitMQ 就會自動地將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。可以監聽這個隊列中的消息、以進行相應的處理。
通過在channel.QueueDeclare 方法中設置x-dead-letter-exchange參數來為這個隊列添加DLX。其示例代碼如下:
channel.ExchangeDeclare("exchange.dlx", "direct", true);//定義死信交換器 channel.ExchangeDeclare("exchange.normal", "direct", true);//定義普通交換器 IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-message-ttl",10000);//定義消息過期時間為10000毫秒 args.Add("x-dead-letter-exchange", "exchange.dlx");//定義exchange.dlx為死信交換器 args.Add("x-dead-letter-routing-key", "routingkey");//定義死信交換器的綁定key,這里也可以不指定,則默認使用原隊列的路由key channel.QueueDeclare("queue.normal", true, false, false, args);//定義普通隊列 channel.QueueBind("queue.normal", "exchange.normal", "normalKey");//普通隊列交換器綁定 channel.QueueDeclare("queue.dlx", true, false, false, null);//定義死信隊列 channel.QueueBind("queue.dlx", "exchange.dlx", "routingkey");//死信隊列交換器綁定,若上方為制定死信隊列路由key則這里需要使用原隊列的路由key //發布消息 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("exchange.normal", "normalKey", null, message) ;
以下為死信隊列的運轉流程:
五。 延遲隊列
RabbitMQ本身并未提供延遲隊列的功能。延遲隊列是一個邏輯上的概念,可以通過過期時間+死信隊列來模擬它的實現。延遲隊列的邏輯架構大致如下:
生產者將消息發送到過期時間為n的隊列中,這個隊列并未有消費者來消費消息,當過期時間到達時,消息會通過死信交換器被轉發到死信隊列中。而消費者從死信隊列中消費消息。這個時候就達到了生產者發布了消息在講過了n時間后消費者消費了消息,起到了延遲消費的作用。
延遲隊列在我們的項目中可以應用于很多場景,如:下單后兩個消息取消訂單,七天自動收貨,七天自動好評,密碼凍結后24小時解凍,以及在分布式系統中消息補償機制(1s后補償,10s后補償,5m后補償......)。
六。 優先級隊列
就像我們生活中的“特殊”人士一樣,我們的業務上也存在一些“特殊”消息,可能需要優先進行處理,在生活上我們可能會對這部分特殊人士開辟一套VIP通道,而Rabbit同樣也有這樣的VIP通道(前提是在3.5的版本以后),即優先級隊列,隊列中的消息會有優先級優先級高的消息具備優先被消費的特權。針對這些VIP消息,我們只需做兩件事:
我們只需做兩件事情:
將隊列聲明為優先級隊列,即在創建隊列的時候添加參數 x-max-priority 以指定最大的優先級,值為0-255(整數)。
為優先級消息添加優先級。
其示例代碼如下:
channel.ExchangeDeclare("exchange.priority", "direct", true);//定義交換器 IDictionary<String, Object> args = new Dictionary<String, Object>(); args.Add("x-max-priority", 10);//定義優先級隊列的最大優先級為10 channel.QueueDeclare("queue.priority", true, false, false, args);//定義優先級隊列 channel.QueueBind("queue.priority", "exchange.priority", "priorityKey");//隊列交換器綁定 BasicProperties properties = new BasicProperties() { Priority =8,//設置消息優先級為8 }; var message = Encoding.UTF8.GetBytes("TestMsg8"); //發布消息 channel.BasicPublish("exchange.priority", "priorityKey", properties, message);
注意:沒有指定優先級的消息會將優先級以0對待。 對于超過優先級隊列所定最大優先級的消息,優先級以最大優先級對待。對于相同優先級的消息,后進的排在前面。如果在消費者的消費速度大于生產者的速度且Broker 中沒有消息堆積的情況下, 對發送的消息設置優先級也就沒有什么實際意義。因為生產者剛發送完一條消息就被消費者消費了,那么就相當于Broker 中至多只有一條消息,對于單條消息來說優先級是沒有什么意義的。
關于優先級隊列,好像違背了隊列這種數據結構先進先出的原則,其具體是怎么實現的在這里就不過多討論。有興趣的可以自己研究研究。后續可能也會有相關的文章來分析其原理。
七。 RPC 實現
RPC,是Remote Procedure Call 的簡稱,即遠程過程調用。它是一種通過網絡從遠程計算機上請求服務,而不需要了解底層網絡的技術。RPC 的主要功用是讓構建分布式計算更容易,在提供強大的遠程調用能力時不損失本地調用的語義簡潔性。
有關RPC不多介紹,這里我們主要介紹RabbitMQ如何實現RPC。RabbitMQ 可以實現很簡單的RPC。客戶端發送請求消息,服務端回復響應的消息,為了接收響應的消息,我們需要在請求消息中發送一個回調隊列(可以使用默認的隊列)。其服務器端實現代碼如下:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); channel.QueueDeclare("RpcQueue", true, false, false, null); SimpleRpcServer rpc = new MySimpRpcServer(new Subscription(channel, "RpcQueue")); rpc.MainLoop(); } public class MySimpRpcServer: SimpleRpcServer { public MySimpRpcServer(Subscription subscription) : base(subscription) { } /// <summary> /// 執行完成后進行回調 /// </summary> public override byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties) { replyProperties = null; return Encoding.UTF8.GetBytes("我收到了!"); } /// <summary> /// 進行處理 /// </summary> /// <param name="evt"></param> public override void ProcessRequest(BasicDeliverEventArgs evt) { // todo..... base.ProcessRequest(evt); } }
客戶端實現代碼如下:
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin"; factory.Password = "admin"; factory.HostName = "192.168.121.205"; IConnection conn = factory.CreateConnection(); IModel channel = conn.CreateModel(); SimpleRpcClient client = new SimpleRpcClient(channel, "RpcQueue"); var message = Encoding.UTF8.GetBytes("TestMsg8"); var result = client.Call(message); //do somethings...
以上是Rabbit客戶端自己幫我們封裝好的Rpc客戶端與服務端的邏輯。當然我們也可以自己實現,主要是借助于BasicProperties的兩個參數。
ReplyTo: 通常用來設置一個回調隊列。
CorrelationId : 用來關聯請求(request) 和其調用RPC 之后的回復(response) 。
其處理流程如下:
當客戶端啟動時,創建一個匿名的回調隊列。
客戶端為RPC 請求設置2個屬性: ReplyTo用來告知RPC 服務端回復請求時的目的隊列,即回調隊列; Correlationld 用來標記一個請求。
請求被發送到RpcQueue隊列中。
RPC 服務端監聽RpcQueue隊列中的請求,當請求到來時,服務端會處理并且把帶有結果的消息發送給客戶端。接收的隊列就是ReplyTo設定的回調隊列。
客戶端監昕回調隊列,當有消息時,檢查Correlationld 屬性,如果與請求匹配,那就是結果了。
以上是“分布式系統消息中間件RabbitMQ怎么用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。