亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

AMQP消息傳遞協議

發布時間:2020-05-28 14:11:06 來源:億速云 閱讀:545 作者:鴿子 欄目:編程語言

AMQP協議

AMQP全稱是Advanced Message Queuing Protocol,它是一個(分布式)消息傳遞協議,使用和符合此協議的客戶端能夠基于使用和符合此協議的消息傳遞中間件代理(Broker,也就是經紀人,個人感覺叫代理合口一些)進行通信。AMQP目前已經推出協議1.0,實現此協議的比較知名的產品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ實現的AMQP版本是0.9.1,官方文檔中也提供了該協議pdf文本下載,有興趣可以翻閱一下。

消息中間件代理的職責

Messaging Broker,這里稱為消息中間件代理。它的職責是從發布者(Publisher,或者有些時候稱為Producer,生產者)接收消息,然后把消息路由到消費者(Consumer,或者有些時候稱為Listener,監聽者)。

因為消息中間件代理、發布者客戶端和消費者客戶端都是基于AMQP這一網絡消息協議,所以消息中間件代理、發布者客戶端和消費者客戶端可以在不同的機器上,從而實現分布式通訊和服務解耦。

消息中間件代理不僅僅提供了消息接收和消息路由這兩個基本功能,還有其他高級的特性如消息持久化功能、監控功能等等。

AMQP-0-9-1在RabbitMQ中的基本模型

AMQP-0-9-1模型的基本視圖是:消息發布者消息發布到交換器(Exchange)中,交換器的角色有點類似于日常見到的郵局或者信箱。然后,交換器把消息的副本分發到隊列(Queue)中,分發消息的時候遵循的規則叫做綁定(Binding)。接著,消息中間件代理向訂閱隊列的消費者發送消息(push模式),或者消費者也可以主動從隊列中拉取消息(fetch/pull模式)。

AMQP消息傳遞協議

發布者在發布消息的時候可以指定消息屬性(消息元數據),某些消息元數據可能由消息中間件代理使用,其他消息元數據對于消息中間件代理而言是不透明的,僅供消息消費者使用。

由于網絡是不可靠的,客戶端可能無法接收消息或者處理消息失敗,這個時候消息中間件代理無法感知消息是否正確傳遞到消費者中,因此AMQP模型提供了消息確認(Message Acknowledgement)的概念:當消息傳遞到消費者,消費者可以自動向消息中間件代理確認消息已經接收成功或者由應用程序開發者選擇手動確認消息已經接收成功并且向消息中間件代理確認消息,消息中間件代理只有在接收到該消息(或者消息組)的確認通知后才會從隊列中完全刪除該消息。

在某些情況下,交換器無法正確路由到隊列中,那么該消息就會返回給發布者,或者丟棄,或者如果消息中間件代理實現了"死信隊列(Dead Letter Queue)"擴展,消息會被放置到死信隊列中。消息發布者可以選擇使用對應的參數控制路由失敗的處理策略。

交換器和交換器類型

交互器(Exchange)是消息發送的第一站目的地,它的作用就是就收消息并且將其路由到零個或者多個隊列。路由消息的算法取決于交互器的類型和路由規則(也就是Binding)。RabbitMQ消息中間件代理支持四種類型的交互器,分別是:

交換器類型Broker默認預聲明的交換器
Direct(空字符串[(AMQP default)])和amq.direct
Fanoutamq.fanout
Topicamq.topic
Headersamq.match (和RabbitMQ中的amq.headers)

聲明交互器的時候需要提供一些列的屬性,其中比較重要的屬性如下:

  • Name:交互器的名稱。
  • Type:交換器的類型。
  • Durability:(交換器)持久化特性,如果啟動此特性,則Broker重啟后交換器依然存在,否則交換器會被刪除。
  • Auto-delete:是否自動刪除,如果啟用此特性,當最后一個隊列解除與交換器的綁定關系,交換器會被刪除。
  • Arguments:可選參數,一般配合插件或者Broker的特性使用。

之所以存在Durability和Auto-delete特性是因為并發所有的場景和用例都要求交互器是持久化的。

Direct交換器

Direct類型的交換器基于消息路由鍵(RoutingKey)把消息傳遞到隊列中。Direct交換器是消息單播路由的理想實現(當然,用于多播路由也可以),它的工作原理如下:

  • 隊列使用路由鍵K綁定到交換器。
  • 當具有路由鍵R的新消息到達交換器的時候,如果K = R,那么交換器會把消息傳遞到隊列中。

AMQP消息傳遞協議

默認交換器

默認交換器(Default Exchange)是一種特殊的Direct交互器,它的名稱是空字符串(也就是""),它由消息中間件代理預聲明,在RabbitMQ Broker中,它在Web管理界面中的名稱是(AMQP default)。每個新創建的隊列都會綁定到默認交換器,路由鍵就是該隊列的隊列名,也就是所有的隊列都可以通過默認交換器進行消息投遞,只需要指定路由鍵為相應的隊列名即可。

AMQP消息傳遞協議

Fanout交換器

Fanout其實是一個組合單詞,fan也就是扇形,out就是向外發散的意思,Fanout交換器可以想象為"扇形"交換器。Fanout交換器會忽略路由鍵,它會路由消息到所有綁定到它的隊列。也就是說,如果有N個隊列綁定到一個Fanout交換器,當一個新的消息發布到該Fanout交換器,那么這條新消息的一個副本會分發到這N個隊列中。Fanout交換器是消息廣播路由的理想實現。

AMQP消息傳遞協議

Topic交換器

Topic交換器基于路由鍵和綁定隊列和交換器的模式進行匹配從而把消息路由到一個或者多個隊列。綁定隊列和交換器的Topic模式(這個模式串其實就是聲明綁定時候的路由鍵,和消息發布的路由鍵并非同一個)一般使用點號(dot,也就是'.')分隔,例如source.target.key,綁定模式支持通配符:

  • 符號'#'匹配一個或者多個詞,例如:source.target.#可以匹配source.target.dogesource.target.doge.throwable等等。
  • 符號''只能匹配一個詞,例如:`source.target.可以匹配source.target.dogesource.target.throwable`等等。

對每一條消息,Topic交換器會遍歷所有的綁定關系,檢查消息指定的路由鍵是否匹配綁定關系中的路由鍵,如果匹配,則將消息推送到相應隊列。

AMQP消息傳遞協議

Topic交換器是消息多播路由的理想實現。

Headers交換器

Headers交換器是一種不常用的交換器,它使用多個屬性進行路由,這些屬性一般稱為消息頭,它不使用路由鍵進行消息路由。消息頭(Message Headers)是消息屬性(消息元數據)部分,因此,使用Headers交換器在建立隊列和交換器的綁定關系的時候需要指定一組鍵值對,發送消息到Headers交換器時候,需要在消息屬性中攜帶一組鍵值對作為消息頭。消息頭屬性支持匹配規則x-match如下:

  • x-match = all:表示所有的鍵值對都匹配才能接受到消息。
  • x-match = any:表示只要存在鍵值對匹配就能接受到消息。

Headers交換器也是忽略路由鍵的,只依賴于消息屬性中的消息頭進行消息路由。

AMQP消息傳遞協議

隊列

AMQP 0-9-1模型中的隊列與其他消息或者任務隊列系統中的隊列非常相似:它們存儲應用程序所使用的消息。隊列和交換器的基本屬性有類似的地方:

  • Name:隊列名稱。
  • Durable:是否持久化,開啟持久化意味著消息中間件代理重啟后隊列依然存在,否則隊列會被刪除。
  • Exclusive:是否獨占的,開啟隊列獨占特性意味著隊列只能被一個連接使用并且連接關閉之后隊列會被刪除。
  • Auto-delete:是否自動刪除,開啟自動刪除特性意味著隊列至少有一個消費者并且最后一個消費者解除訂閱狀態(一般是消費者對應的通道關閉)后隊列會自動刪除。
  • Arguments:隊列參數,一般和消息中間件代理或者插件的特性相關,如消息的過期時間(Message TTL)和隊列長度等。

一個隊列只有被聲明(Declare)了才能使用,也就是隊列的第一次聲明就是隊列的創建操作(因為第一次聲明的時候隊列并不存在)。如果使用相同的參數再次聲明已經存在的隊列,那么此次聲明會不生效(當然也不會出現異常)。但是如果使用不相同的參數再次聲明已經存在的隊列,那么會拋出通道級別的異常,異常代碼是406(PRECONDITION_FAILED)。

隊列名稱

隊列名必須由255字節(bytes)長度以內的UTF-8編碼字符組成。實現AMQP 0-9-1規范的消息中間件代理具備自動生成隨機隊列名的功能,也就是在聲明隊列的時候,隊列名指定為空字符串,那么消息中間件代理會自動生成一個隊列名,并且在隊列聲明的返回結果中帶上對應的隊列名。

以"amq."開頭的隊列是由消息中間件代理內部生成的,有其特殊的作用,因此不能聲明此類名稱的新隊列,否則會導致通道級別的異常,異常代碼為403(ACCESS_REFUSED)。

隊列的持久化特性

持久化的隊列會持久化到磁盤中,這種隊列在消息中間件代理重啟后不會被刪除。不開啟持久化特性的隊列稱為瞬時(transient)隊列,并非所有的場景都需要開啟隊列的持久化特性。

隊列的持久化特性并不意味著路由到它上面的消息是持久化的,也就是隊列的持久化跟消息的持久化是兩回事。如果息中間件代理掛了,它重啟后會重新聲明開啟了持久化特性的隊列,這些隊列中只有使用了消息持久化特性的消息會被恢復。

綁定

綁定(Binding)是交換器路由消息到隊列的規則。例如交換器E可以路由消息到隊列Q,那么Q必須通過一定的規則綁定到E。綁定中使用的某些交換器的類型決定了它可以使用可選的路由鍵(RoutingKey)。路由鍵的作用類似于過濾器,可以篩選某些發布到交換器的消息路由到目標隊列。

如果發布的消息沒有路由到任意一個目標隊列,例如,消息已經發布到交換器,交換器中沒有任何綁定,這個時候消息會被丟棄或者返回給發布者,取決于消息發布者發布消息時候使用的參數。

消費者

如果隊列只有發布者生產消息,那么是沒有意義的,必須有消費者對消息進行使用,或者叫這個操作為消息消費,消息消費的方式有兩種:

  • 消息代理中間件向消費者推送消息(推模式,代表方法是basic.consume)。
  • 消費者主動向消息代理中間件拉取消息(拉模式,代表方法是basic.get)。

使用推模式的情況下,消費者必須指定需要訂閱的隊列。每個隊列可以存在多個消費者,或者僅僅注冊一個獨占的消費者。

每個消費者(訂閱者)都有一個稱為消費者標簽(consumer tag)的標識符,消費者標簽是一個字符串。通過消費者標簽可以實現取消訂閱的操作。

消息確認

消費者應用程序有可能在接收和處理消息的時候崩潰,也有可能因為網絡原因導致消息中間件代理投遞消息到消費者的時候失敗了,這樣就會催生一個問題:AMQP消息中間件代理應該在什么時候從隊列中刪除消息?因此,AMQP 0-9-1規范提供了兩種選擇:

  • 消息中間件代理向應用程序發送消息(使用AMQP方法basic.deliverbasic.get-ok)。
  • 應用程序收到消息后向消息中間件代理發送確認(使用AMQP方法basic.ack <= 個人感覺這個地方少寫了basic.nackbasic.reject)

前一種稱為自動確認模型(動作觸發的同時進行了消息確認),后一種稱為顯式確認模型。顯式確認模型中,需要消費者主動向消息中間件代理進行消息主動確認,這個消息主動確認動作的執行時機完全由應用程序控制。消息主動確認有三種方式:積極確認(ack)、消極確認(nack)和拒絕(reject)。

預取消息

預取消息(Prefetching Messages)是一個特性。對于多個消費者共享同一個隊列的情況,能夠告知消息中間件代理在發送下一個確認之前指定每個消費者一次可以接收消息的消息量。這個特性可以理解為簡單的負載均衡技術,在批量發布消息的場景下能夠提高吞吐量。

消息屬性和有效負載

AMQP模型中,消息具有屬性值。AMQP 0-9-1規范定義了一些常見的屬性,一般開發人員不需要太關注這些屬性:

  • Content type
  • Content encoding
  • Routing key
  • Delivery mode (persistent or not)
  • Message priority
  • Message publishing timestamp
  • Expiration period
  • Publisher application id

這些通用的屬性一般是消息中間件代理使用的,還有可以定制的可選屬性header,形式是鍵值對,類似于HTTP中的請求頭。消息屬性是在發布消息的時候設置的。

AMQP消息還有一個有效載荷(payload,其實就是消息數據體),AMQP代理將其視為不透明的字節數組,也就是AMQP代理不會檢查或者修改消息的有效載荷。有些消息可能只包含屬性而沒有有效負載。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)來序列化和結構化數據,以便將其作為消息有效負載發布。在一般約定下,消息屬性中的Content typeContent encoding一般可以表明其序列化的方式。

消息發布支持消息的持久化特性,消息持久化特性開啟后,消息中間件代理會把消息保存到磁盤中,如果重啟代理消息也不會丟失。開啟消息持久化特性將會影響性能,主要是因為涉及到刷盤操作。

AMQP-0-9-1方法

AMQP 0-9-1定義了一些方法,對應了客戶端和消息中間件代理之間交互的一些操作方法,這些操作方法的設計跟面向對象編程語言中的方法沒有任何共同之處。常用的交換器相關的操作方法有:

  • exchange.declare
  • exchange.declare-OK
  • exchange.delete
  • exchange.delete-OK

在邏輯上,上面幾個操作方法在客戶端和消息中間件代理之間的交互如下:

AMQP消息傳遞協議

對于隊列,也有類似的操作方法:

  • queue.declare
  • queue.declare-OK
  • queue.delete
  • queue.delete-OK

AMQP消息傳遞協議

并非所有的AMQP操作方法都有響應結果操作方法,像消息發布方法basic.publish的使用是最廣泛的,此操作方法沒有對應的響應結果操作方法。有些操作方法可能有多個響應結果(操作方法),例如basic.get

連接(Connection)

AMQP的連接(Connection)通常是長期存在的。AMQP是一種使用TCP進行可靠傳遞的應用程序級協議。AMQP連接使用用戶身份驗證,可以使用TLS(SSL)進行保護。當應用程序不再需要連接到AMQP代理時,它應該正常關閉AMQP連接,而不是突然關閉底層TCP連接。

通道(Channel)

某些應用程序需要與AMQP代理程序建立多個連接。但是,不希望同時打開許多TCP連接,因為這樣做會消耗系統資源并使配置防火墻變得十分困難。通道(Channel)可以認為是"共享一個單獨的TCP連接的輕量級連接",一個AMQP連接可以擁有多個通道。

對于使用了多線程處理的應用程序,有一種使用場景十分普遍:每個線程開啟一個新的通道使用,這些通道是線程間隔離的。

另外,每個特定的通道和其他通道是相互隔離的,每個執行的AMQP操作方法(包括響應)都攜帶一個通道的唯一標識,這樣客戶端就能通過該通道的唯一標識得知操作方法是對應哪個通道發生的。

虛擬主機(Virtual Host)

為了使單個消息中間件代理可以托管多個完全隔離的"環境"(這里的隔離指的是用戶組、交互器、隊列等),AMQP提供了虛擬主機(Virtual Host)的概念。多個虛擬主機類似于許多主流的Web服務器的虛擬主機,提供了AMQP組件完全隔離的環境。AMQP客戶端可以在連接消息中間件代理時指定需要連接的虛擬主機。

個人理解

關于Exchange、Queue和Binding

理解RabbitMQ中的AMQP模型,其實從開發者的角度來看,最重要的是Exchange、Queue、Binding三者的關系,這里談談個人的見解。消息的發布第一站總是Exchange,從模型上看,消息發布無法直接發送到隊列中。Exchange本身不存儲消息,它在接收到消息之后,會基于路由規則也就是Binding,把消息路由到目標Queue中。從實際操作來看,聲明路由規則總是在發布消息和消費消息之前,也就是一般步驟如下:

  • 1、聲明Exchange。
  • 2、聲明Queue。
  • 3、基于Exchange和Queue聲明Binding,這個過程有可能自定義一個RoutingKey。
  • 4、通過Exchange消息發布,這個過程有可能使用到上一步定義的RoutingKey。
  • 5、通過Queue消費消息。

我們最關注的兩個階段,消息發布和消息消費中,消息發布實際上只跟Exchange有關,而消息消費實際上只跟Queue有關。Binding實際上就是Exchange和Queue的契約關系,會直接影響消息發布階段的消息路由。那么,路由失敗一般是什么情況導致的?路由失敗,其實就是消息已經發布到Exchange,而Exchange中從既有的Binding中無法找到存在的目標Queue用于傳遞消息副本(一般而言,很少人會發送消息到一個不存在的Exchange)。消息路由失敗,從理解AMQP的模型來看,可以從根本上避免的,除非是消息發布者故意胡亂使用或者人為錯誤使用了未存在的RoutingKey、Exchange或者說是Binding關系而導致的。

關于Exchange的類型

AMQP-0-9-1模型中支持了四種交換器direct(單播)、fanout(廣播)、topic(多播)、headers,實際上,從使用者角度來看,四種交換器的功能是可以相互取代的。例如可以使用fanout類型交換器實現廣播,其實使用direct類型交換器也是可以實現廣播的,只是對應的direct類型交換器需要通過多個路由鍵綁定到多個目標隊列中。在面對生產環境的技術選型的時候,我們需要考慮性能、維護難度、合理性等角度去考慮選擇什么類型的交換器,就上面的廣播消息的例子,顯然使用fanout類型交換器可以避免聲明多個綁定關系,這樣在性能、合理性上是更優的選擇。

關于負載均衡

在AMQP-0-9-1模型中,負載均衡的實現是基于消費者而不是基于隊列(準確來說應該是消息傳遞到隊列的方式)。實際上,出現消息生產速度大大超過消費者的消費速度的時候,隊列中有可能會出現消息積壓。AMQP-0-9-1模型中沒有提供基于隊列負載均衡的特性,也就是出現消息生產速度大大超過消費者的消費速度時候,并不會把消息路由到多個隊列中,而是通過預取消息(Prefetching Messages)的特性,確定消息者的消費能力,從而調整消息中間件代理推送消息到對應消費者的數量,這樣就能夠實現消費速度快的消費者能夠消費更多的消息,減少產生有消費者處于饑餓狀態和有消費者長期處于忙碌狀態的問題。

關于消息確認機制

AMQP中提供的消息確認機制主要包括積極確認(一般叫ack,Acknowledgement)、消極確認(一般叫nack,Negative Acknowledgement)和拒絕(reject)。消息確認機制是保證消息不丟失的重要措施,當消費者接收到消息中間件代理推送的消息時候,需要主動通知消息中間件代理消息已經確認投遞成功,然后消息中間件代理才會從隊列中刪除對應的消息。沒有主動確認的消息就會變為"nack"狀態,可以想象為暫存在隊列的"nack區"中,這些消息不會投遞到消費者,直到消費者重啟后,"nack區"中的消息會重新變為"ready"狀態,可以重新投遞給消費者。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临武县| 洛扎县| 兴文县| 盘锦市| 东光县| 广南县| 海阳市| 从化市| 寿阳县| 察雅县| 灯塔市| 波密县| 沧州市| 绍兴市| 遵义市| 酉阳| 滕州市| 大埔区| 永嘉县| 嵊泗县| 县级市| 永清县| 平谷区| 唐山市| 乌兰县| 湟源县| 仁怀市| 雷州市| 青阳县| 池州市| 专栏| 富顺县| 福建省| 金昌市| 永登县| 和田市| 涡阳县| 保定市| 竹溪县| 洮南市| 漳浦县|