您好,登錄后才能下訂單哦!
RabbitMQ是一個由erlang開發的基于AMQP(Advanced Message Queue)協議的開源實現。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面都非常的優秀。是當前最主流的消息中間件之一。
RabbitMQ的官方
操作起來很簡單,只需要在DOS下面,進入安裝目錄(安裝路徑RabbitMQ Serverabbitmq_server-3.2.2sbin
)執行如下命令就可以成功安裝。
rabbitmq-plugins enable rabbitmq_management
可以通過訪問:http://localhost:15672
進行測試,默認的登陸賬號為:guest,密碼為:guest。
1. 安裝完以后erlang需要手動設置ERLANG_HOME 的系統變量。
set ERLANG_HOME=F:Program Fileserl9.0
#環境變量`path`里加入:%ERLANG_HOME%in
#環境變量`path`里加入: 安裝路徑RabbitMQ Serverabbitmq_server-3.6.10sbin
2.激活Rabbit MQ’s Management Plugin
使用Rabbit MQ 管理插件,可以更好的可視化方式查看Rabbit MQ 服務器實例的狀態,你可以在命令行中使用下面的命令激活。
rabbitmq-plugins.bat enable rabbitmq_management
3.創建管理用戶
rabbitmqctl.bat add_user sa 123456
4. 設置管理員
rabbitmqctl.bat set_user_tags sa administrator
5.設置權限
rabbitmqctl.bat set_permissions -p / sa ".*" ".*" ".*"
6. 其他命令
#查詢用戶:
rabbitmqctl.bat list_users
#查詢vhosts:
rabbitmqctl.bat list_vhosts
#啟動RabbitMQ服務:
net stop RabbitMQ && net start RabbitMQ
以上這些,賬號、vhost、權限、作用域等基本就設置完了。
RabbitMQ.Client 是RabbiMQ 官方提供的的客戶端
EasyNetQ 是基于RabbitMQ.Client 基礎上封裝的開源客戶端,使用非常方便
以下操作RabbitMQ的代碼例子,都是基于EasyNetQ的使用和再封裝,在文章底部有demo例子的×××地址
創建 IBus
/// <summary>
/// 消息服務器連接器
/// </summary>
public class BusBuilder {
public static IBus CreateMessageBus() {
// 消息服務器連接字符串
// var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];
string connString = "host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456";
if (connString == null || connString == string.Empty) throw new Exception("messageserver connection string is missing or empty");
return RabbitHutch.CreateBus(connString);
}
}
Fanout Exchange
所有發送到Fanout Exchange的消息都會被轉發到與該Exchange 綁定(Binding)的所有Queue上。
Fanout Exchange 不需要處理RouteKey 。只需要簡單的將隊列綁定到exchange 上。這樣發送到exchange的消息都會被轉發到與該交換機綁定的所有隊列上。類似子網廣播,每臺子網內的主機都獲得了一份復制的消息。
所以,Fanout Exchange 轉發消息是最快的。
/// <summary>
/// 消息消耗(fanout)
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="handler">回調</param>
/// <param name="exChangeName">交換器名</param>
/// <param name="queueName">隊列名</param>
/// <param name="routingKey">路由名</param>
public static void FanoutConsume<T>(Action<T> handler, string exChangeName = "fanout_mq", string queueName = "fanout_queue_default", string routingKey = "") where T : class {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout);
var queue = CreateQueue(adbus, queueName);
adbus.Bind(exchange, queue, routingKey);
adbus.Consume(queue, registration => {
registration.Add<T>((message, info) => {
handler(message.Body);
});
});
}
/// <summary>
/// 消息上報(fanout)
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="topic">主題名</param>
/// <param name="t">消息命名</param>
/// <param name="msg">錯誤信息</param>
/// <returns></returns>
public static bool FanoutPush<T>(T t, out string msg, string exChangeName = "fanout_mq", string routingKey = "") where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Fanout);
adbus.Publish(exchange, routingKey, false, new Message<T>(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
所有發送到Direct Exchange的消息被轉發到RouteKey中指定的Queue。
Direct模式,可以使用RabbitMQ自帶的Exchange:default Exchange 。所以不需要將Exchange進行任何綁定(binding)操作 。消息傳遞時,RouteKey必須完全匹配,才會被隊列接收,否則該消息會被拋棄。
/// <summary>
/// 消息發送(direct)
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="queue">發送到的隊列</param>
/// <param name="message">發送內容</param>
public static void DirectSend<T>(string queue, T message) where T : class {
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Send(queue, message);
}
}
/// <summary>
/// 消息接收(direct)
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="queue">接收的隊列</param>
/// <param name="callback">回調操作</param>
/// <param name="msg">錯誤信息</param>
/// <returns></returns>
public static bool DirectReceive<T>(string queue, Action<T> callback, out string msg) where T : class {
msg = string.Empty;
try {
var bus = BusBuilder.CreateMessageBus();
bus.Receive<T>(queue, callback);
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
return true;
}
/// <summary>
/// 消息發送
/// <![CDATA[(direct EasyNetQ高級API)]]>
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="t"></param>
/// <param name="msg"></param>
/// <param name="exChangeName"></param>
/// <param name="routingKey"></param>
/// <returns></returns>
public static bool DirectPush<T>(T t, out string msg, string exChangeName = "direct_mq", string routingKey = "direct_rout_default") where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct);
adbus.Publish(exchange, routingKey, false, new Message<T>(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
/// <summary>
/// 消息接收
/// <![CDATA[(direct EasyNetQ高級API)]]>
/// </summary>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="handler">回調</param>
/// <param name="exChangeName">交換器名</param>
/// <param name="queueName">隊列名</param>
/// <param name="routingKey">路由名</param>
public static bool DirectConsume<T>(Action<T> handler, out string msg, string exChangeName = "direct_mq", string queueName = "direct_queue_default", string routingKey = "direct_rout_default") where T : class {
msg = string.Empty;
try {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Direct);
var queue = CreateQueue(adbus, queueName);
adbus.Bind(exchange, queue, routingKey);
adbus.Consume(queue, registration => {
registration.Add<T>((message, info) => {
handler(message.Body);
});
});
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
return true;
}
Topic Exchange
要使用主題發布,只需使用帶有主題的重載的Publish方法:
var bus = RabbitHutch.CreateBus(...);
bus.Publish(message, "X.A");
訂閱者可以通過指定要匹配的主題來過濾郵件。
所以發布的主題為“X.A.2”的消息將匹配“#”,“X.#”,“ .A.”,而不是“X.B. *”或“A”。
警告,Publish只顧發送消息到隊列,但是不管有沒有消費端訂閱,所以,發布之后,如果沒有消費者,該消息將不會被消費甚至丟失。
EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會創建一個用于接收消息的隊列,不過與消息發布不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*"));
警告: 具有相同訂閱者但不同主題字符串的兩個單獨訂閱可能不會產生您期望的效果。 subscriberId有效地標識個體AMQP隊列。 具有相同subscriptionId的兩個訂閱者將連接到相同的隊列,并且兩者都將添加自己的主題綁定。 所以,例如,如果你這樣做:
bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));
匹配“x.”或“ .B”的所有消息將被傳遞到“XXX_my_id”隊列。 然后,RabbitMQ將向兩個消費者傳遞消息,其中handlerOfXDotStar和handlerOfStarDotB輪流獲取每條消息。
現在,如果你想要匹配多個主題(“X. ”OR“ .B”),你可以使用另一個重載的訂閱方法,它采用多個主題,如下所示:
bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));
/// <summary>
/// 獲取主題
/// </summary>
/// <typeparam name="T">主題內容類型</typeparam>
/// <param name="subscriptionId">訂閱者ID</param>
/// <param name="callback">消息接收響應回調</param>
/// <param name="topics">訂閱主題集合</param>
public static void TopicSubscribe<T>(string subscriptionId, Action<T> callback, params string[] topics) where T : class {
var bus = BusBuilder.CreateMessageBus();
bus.Subscribe(subscriptionId, callback, (config) => {
foreach (var item in topics) config.WithTopic(item);
});
}
/// <summary>
/// 發布主題
/// </summary>
/// <typeparam name="T">主題內容類型</typeparam>
/// <param name="topic">主題名稱</param>
/// <param name="message">主題內容</param>
/// <param name="msg">錯誤信息</param>
/// <returns></returns>
public static bool TopicPublish<T>(string topic, T message, out string msg) where T : class {
msg = string.Empty;
try {
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Publish(message, topic);
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
/// <summary>
/// 發布主題
/// </summary>
/// <![CDATA[(topic EasyNetQ高級API)]]>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="t">消息內容</param>
/// <param name="topic">主題名</param>
/// <param name="msg">錯誤信息</param>
/// <param name="exChangeName">交換器名</param>
/// <returns></returns>
public static bool TopicSub<T>(T t, string topic, out string msg, string exChangeName = "topic_mq") where T : class {
msg = string.Empty;
try {
if (string.IsNullOrWhiteSpace(topic)) throw new Exception("推送主題不能為空");
using (var bus = BusBuilder.CreateMessageBus()) {
var adbus = bus.Advanced;
//var queue = adbus.QueueDeclare("user.notice.zhangsan");
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic);
adbus.Publish(exchange, topic, false, new Message<T>(t));
return true;
}
} catch (Exception ex) {
msg = ex.ToString();
return false;
}
}
/// <summary>
/// 獲取主題
/// </summary>
/// <![CDATA[(topic EasyNetQ高級API)]]>
/// <typeparam name="T">消息類型</typeparam>
/// <param name="subscriptionId">訂閱者ID</param>
/// <param name="callback">回調</param>
/// <param name="exChangeName">交換器名</param>
/// <param name="topics">主題名</param>
public static void TopicConsume<T>(Action<T> callback, string exChangeName = "topic_mq",string subscriptionId = "topic_subid", params string[] topics) where T : class {
var bus = BusBuilder.CreateMessageBus();
var adbus = bus.Advanced;
var exchange = adbus.ExchangeDeclare(exChangeName, ExchangeType.Topic);
var queue = adbus.QueueDeclare(subscriptionId);
foreach (var item in topics) adbus.Bind(exchange, queue, item);
adbus.Consume(queue, registration => {
registration.Add<T>((message, info) => {
callback(message.Body);
});
});
}
具體發布/訂閱消息的Demo和相關測試看源碼Demo
當在創建訂閱者去消費隊列的時候
/// <summary>
/// 獲取主題
/// </summary>
/// <param name="topic"></param>
public static void GetSub<T>(T topic, Action<T> callback) where T : class
{
using (var bus = BusBuilder.CreateMessageBus()) {
bus.Subscribe<T>(topic.ToString(), callback, x => x.WithTopic(topic.ToString()));
}
}
using里的對象在執行完成后被回收了,導致剛連接上去就又斷開了(剛開始寫的時候,習慣性加using,排查了好久才發現,欲哭無淚)
源碼項目運行前的準備與確認:
到RabbitMQ管理后臺添加TestQueue
VHost,并且分配用戶權限,然后到RabbitMQHelper.BusBuilder
類里配置RabbitMQ連接服務的相關信息host=127.0.0.1:5672;virtualHost=TestQueue;username=sa;password=123456
,(根據配置的內容和用戶修改)
參考資料(鳴謝):
附:Demo源碼GitHub地址
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。