亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java常用消息隊列原理介紹及性能對比

發布時間:2020-03-30 22:11:43 來源:網絡 閱讀:1428 作者:CACZJZ 欄目:開發技術

消息隊列使用場景

為什么會需要消息隊列(MQ)?

解耦
 在項目啟動之初來預測將來項目會碰到什么需求,是極其困難的。消息系統在處理過程中間插入了一個隱含的、基于數據的接口層,兩邊的處理過程都要實現這一接口。這允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
冗余
 有些情況下,處理數據的過程會失敗。除非數據被持久化,否則將造成丟失。消息隊列把數據進行持久化直到它們已經被完全處理,通過這一方式規避了數據丟失風險。許多消息隊列所采用的”插入-獲取-刪除”范式中,在把一個消息從隊列中刪除之前,需要你的處理系統明確的指出該消息已經被處理完畢,從而確保你的數據被安全的保存直到你使用完畢。
擴展性
 因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。不需要改變代碼、不需要調節參數。擴展就像調大電力按鈕一樣簡單。
靈活性 & 峰值處理能力
 在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷的請求而完全崩潰。
可恢復性
 系統的一部分組件失效時,不會影響到整個系統。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統恢復后被處理。
順序保證
 在大多使用場景下,數據處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數據會按照特定的順序來處理。Kafka保證一個Partition內的消息的有序性。
緩沖
 在任何重要的系統中,都會有需要不同的處理時間的元素。例如,加載一張圖片比應用過濾器花費更少的時間。消息隊列通過一個緩沖層來幫助任務最高效率的執行———寫入隊列的處理會盡可能的快速。該緩沖有助于控制和優化數據流經過系統的速度。
異步通信
 很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。


MQ常用的使用場景:
 1.  進程間通訊和系統間的消息通知,比如在分布式系統中。
 2.  解耦,比如像我們公司有許多開發團隊,每個團隊負責業務的不同模塊,各個開發團隊可以使用MQ來通信。
 3.  在一些高并發場景下,使用MQ的異步特性。

消息隊列和RPC對比

系統架構

RPC系統結構:

| Consumer | <=> | Provider |

Consumer調用的Provider提供的服務。
Message Queue系統結構:

| Sender | <=> | Queue | <=> | Receiver |

Sender發送消息給Queue;Receiver從Queue拿到消息來處理。

功能特點

在架構上,RPC和Message Queue的差異點是,Message Queue有一個中間結點Message Queue(broker),可以把消息存儲。

消息隊列的特點

  • Message Queue把請求的壓力保存一下,逐漸釋放出來,讓處理者按照自己的節奏來處理。

  • Message Queue引入一下新的結點,讓系統的可靠性會受Message Queue結點的影響。

  • Message Queue是異步單向的消息。發送消息設計成是不需要等待消息處理的完成。

所以對于有同步返回需求,用Message Queue則變得麻煩了。

RPC的特點

  • 同步調用,對于要等待返回結果/處理結果的場景,RPC是可以非常自然直覺的使用方式。RPC也可以是異步調用。

  • 由于等待結果,Consumer(Client)會有線程消耗。

如果以異步RPC的方式使用,Consumer(Client)線程消耗可以去掉。但不能做到像消息一樣暫存消息/請求,壓力會直接傳導到服務Provider。

RPC適用場合說明
  • 希望同步得到結果的場合,RPC合適。

  • 希望使用簡單,則RPC;RPC操作基于接口,使用簡單,使用方式模擬本地調用。異步的方式編程比較復雜。

  • 不希望發送端(RPC Consumer、Message Sender)受限于處理端(RPC Provider、Message Receiver)的速度時,使用Message Queue。

隨著業務增長,有的處理端處理量會成為瓶頸,會進行同步調用到異步消息的改造。這樣的改造實際上有調整業務的使用方式。

比如原來一個操作頁面提交后就下一個頁面會看到處理結果;改造后異步消息后,下一個頁面就會變成“操作已提交,完成后會得到通知”。

RPC不適用場合說明

RPC同步調用使用Message Queue來傳輸調用信息。 上面分析可以知道,這樣的做法,發送端是在等待,同時占用一個中間點的資源。變得復雜了,但沒有對等的收益。
 對于返回值是void的調用,可以這樣做,因為實際上這個調用業務上往往不需要同步得到處理結果的,只要保證會處理即可。(RPC的方式可以保證調用返回即處理完成,使用消息方式后這一點不能保證了。)
 返回值是void的調用,使用消息,效果上是把消息的使用方式Wrap成了服務調用(服務調用使用方式成簡單,基于業務接口)。

常用的消息隊列及使用場景

ActiveMQ

AcitveMQ是作為一種消息存儲和分發組件,涉及到client與broker端數據交互的方方面面,它不僅要擔保消息的存儲安全性,還要提供額外的手段來確保消息的分發是可靠的。

ActiveMQ消息傳送機制

Producer客戶端使用來發送消息的, Consumer客戶端用來消費消息;它們的協同中心就是ActiveMQ broker,broker也是讓producer和consumer調用過程解耦的工具,最終實現了異步RPC/數據交換的功能。隨著ActiveMQ的不斷發展,支持了越來越多的特性,也解決開發者在各種場景下使用ActiveMQ的需求。比如producer支持異步調用;使用flow control機制讓broker協同consumer的消費速率;consumer端可以使用prefetchACK來最大化消息消費的速率;提供”重發策略”等來提高消息的安全性等。在此我們不詳細介紹。

一條消息的生命周期如下:
Java常用消息隊列原理介紹及性能對比

圖片中簡單的描述了一條消息的生命周期,不過在不同的架構環境中,message的流動行可能更加復雜.將在稍后有關broker的架構中詳解..一條消息從producer端發出之后,一旦被broker正確保存,那么它將會被consumer消費,然后ACK,broker端才會刪除;不過當消息過期或者存儲設備溢出時,也會終結它。

ActiveMQ的安裝

  • 從官網下載安裝包, http://activemq.apache.org/download.html

  • 賦予運行權限 chmod +x,windows可以忽略此步

  • 運行 ./active start | stop

啟動后,activeMQ會占用兩個端口,一個是負責接收發送消息的tcp端口:61616,一個是基于web負責用戶界面化管理的端口:8161。這兩個端口可以在conf下面的xml中找到。http服務器使用了jettry。這里有個問題是啟動mq后,很長時間管理界面才可以顯示出來。可以使用netstat -an|find “61616”來測試ActiveMQ是否啟動。

Jms與ActiveMQ的結合

JMS是一個用于提供消息服務的技術規范,它制定了在整個消息服務提供過程中的所有數據結構和交互流程。而MQ則是消息隊列服務,是面向消息中間件(MOM)的最終實現,是真正的服務提供者;MQ的實現可以基于JMS,也可以基于其他規范或標準。目前選擇的最多的是ActiveMQ。

JMS支持兩種消息傳遞模型:點對點(point-to-point,簡稱PTP)和發布/訂閱(publish/subscribe,簡稱pub/sub)。這兩種消息傳遞模型非常相似,但有以下區別:

  • PTP消息傳遞模型規定了一條消息之恩能夠傳遞費一個接收方。

  • Pub/sub消息傳遞模型允許一條消息傳遞給多個接收方

點對點模型

通過點對點的消息傳遞模型,一個應用程序可以向另外一個應用程序發送消息。在此傳遞模型中,目標類型是隊列。消息首先被傳送至隊列目標,然后從該隊列將消息傳送至對此隊列進行監聽的某個消費者,如下圖:

Java常用消息隊列原理介紹及性能對比

一個隊列可以關聯多個隊列發送方和接收方,但一條消息僅傳遞給一個接收方。如果多個接收方正在監聽隊列上的消息,JMS Provider將根據“先來者優先”的原則確定由哪個價售房接受下一條消息。如果沒有接收方在監聽隊列,消息將保留在隊列中,直至接收方連接到隊列為止。這種消息傳遞模型是傳統意義上的拉模型或輪詢模型。在此列模型中,消息不時自動推動給客戶端的,而是要由客戶端從隊列中請求獲得。
     點對點模型的代碼(springboot+jms+activemq)實現如下:

@Service("queueproducer")public class QueueProducer {
    @Autowired 
    // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
    private JmsMessagingTemplate jmsMessagingTemplate;   
     // 發送消息,destination是發送到的隊列,message是待發送的消息
    @Scheduled(fixedDelay=3000)//每3s執行1次
    public void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }   
     @JmsListener(destination="out.queue")   
     public void consumerMessage(String text){
        System.out.println("從out.queue隊列收到的回復報文為:"+text);
    }
}

Producer的實現

@Componentpublic class QueueConsumer2 {
    // 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息
    @JmsListener(destination = "mytest.queue")    
    //SendTo 該注解的意思是將return回的值,再發送的"out.queue"隊列中
    @SendTo("out.queue")    
    public String receiveQueue(String text) {
        System.out.println("QueueConsumer2收到的報文為:"+text);        
        return "return message "+text;
    }
}

Consumer的實現

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqQueueTests {
   @Autowired
   private QueueProducer producer;   
   @Test
   public void contextLoads() throws InterruptedException {
      Destination destination = new ActiveMQQueue("mytest.queue");    
        for(int i=0; i<10; i++){
         producer.sendMessage(destination, "myname is Flytiger" + i);
      }
   }
}

Test的實現

其中QueueConsumer2表明的是一個雙向隊列。

發布/訂閱模型

通過發布/訂閱消息傳遞模型,應用程序能夠將一條消息發送到多個接收方。在此傳送模型中,目標類型是主題。消息首先被傳送至主題目標,然后傳送至所有已訂閱此主題的或送消費者。如下圖:

Java常用消息隊列原理介紹及性能對比


主題目標也支持長期訂閱。長期訂閱表示消費者已注冊了主題目標,但在消息到達目標時該消費者可以處于非活動狀態。當消費者再次處于活動狀態時,將會接收該消息。如果消費者均沒有注冊某個主題目標,該主題只保留注冊了長期訂閱的非活動消費者的消息。與PTP消息傳遞模型不同,pub/sub消息傳遞模型允許多個主題訂閱者接收同一條消息。JMS一直保留消息,直至所有主題訂閱者都接收到消息為止。pub/sub消息傳遞模型基本上是一個推模型。在該模型中,消息會自動廣播,消費者無須通過主動請求或輪詢主題的方法來獲得新的消息。


上面兩種消息傳遞模型里,我們都需要定義消息生產者和消費者,生產者把消息發送到JMS Provider的某個目標地址(Destination),消息從該目標地址傳送至消費者。消費者可以同步或異步接收消息,一般而言,異步消息消費者的執行和伸縮性都優于同步消息接收者,體現在:
  1. 異步消息接收者創建的網絡流量比較小。單向對東消息,并使之通過管道進入消息監聽器。管道操作支持將多條消息聚合為一個網絡調用。
  2. 異步消息接收者使用線程比較少。異步消息接收者在不活動期間不使用線程。同步消息接收者在接收調用期間內使用線程,結果線程可能會長時間保持空閑,尤其是如果該調用中指定了阻塞超時。
  3. 對于服務器上運行的應用程序代碼,使用異步消息接收者幾乎總是最佳選擇,尤其是通過消息驅動Bean。使用異步消息接收者可以防止應用程序代碼在服務器上執行阻塞操作。而阻塞操作會是服務器端線程空閑,甚至會導致死鎖。阻塞操作使用所有線程時則發生死鎖。如果沒有空余的線程可以處理阻塞操作自身解鎖所需的操作,這該操作永遠無法停止阻塞。

發布/訂閱模型的代碼(springboot+jms+activemq)實現如下:

@Service("topicproducer")
public class TopicProducer {
    @Autowired 
    // 也可以注入JmsTemplate,JmsMessagingTemplate對JmsTemplate進行了封裝
    private JmsMessagingTemplate jmsMessagingTemplate;    
    // 發送消息,destination是發送到的隊列,message是待發送的消息
    @Scheduled(fixedDelay=3000)//每3s執行1次
    public void sendMessage(Destination destination, final String message){
        jmsMessagingTemplate.convertAndSend(destination, message);
    }
}

Producer的實現

@Component
public class TopicConsumer2 {    
// 使用JmsListener配置消費者監聽的隊列,其中text是接收到的消息
    @JmsListener(destination = "mytest.topic")   
     public void receiveTopic(String text) {
        System.out.println("TopicConsumer2收到的topic報文為:"+text);
    }
}

Consumer的實現

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqTopicTests {
   @Autowired
   private TopicProducer producer;   
   @Test
   public void contextLoads() throws InterruptedException {
      Destination destination = new ActiveMQTopic("mytest.topic");     
       for(int i=0; i<3; i++){
         producer.sendMessage(destination, "myname is TopicFlytiger" + i);
      }
   }
}

Test的實現

Topic模式工作時,默認只能發送和接收queue消息,如果要發送和接收topic消息,需要加入:

spring.jms.pub-sub-domain=true1
Queue與Topic的比較
  1. JMS Queue執行load balancer語義
    一條消息僅能被一個consumer收到。如果在message發送的時候沒有可用的consumer,那么它講被保存一直到能處理該message的consumer可用。如果一個consumer收到一條message后卻不響應它,那么這條消息將被轉到另外一個consumer那兒。一個Queue可以有很多consumer,并且在多個可用的consumer中負載均衡

  2. Topic實現publish和subscribe語義
    一條消息被publish時,他將發送給所有感興趣的訂閱者,所以零到多個subscriber將接收到消息的一個拷貝。但是在消息代理接收到消息時,只有激活訂閱的subscriber能夠獲得消息的一個拷貝。

  3. 分別對應兩種消息模式
    Point-to-Point(點對點),Publisher/Subscriber Model(發布/訂閱者)
    其中在Publicher/Subscriber模式下又有Nondurable subscription(非持久化訂閱)和durable subscription(持久化訂閱)兩種消息處理方式。

ActiveMQ優缺點

優點:是一個快速的開源消息組件(框架),支持集群,同等網絡,自動檢測,TCP,SSL,廣播,持久化,XA,和J2EE1.4容器無縫結合,并且支持輕量級容器和大多數跨語言客戶端上的Java虛擬機。消息異步接受,減少軟件多系統集成的耦合度。消息可靠接收,確保消息在中間件可靠保存,多個消息也可以組成原子事務。
缺點:ActiveMQ默認的配置性能偏低,需要優化配置,但是配置文件復雜,ActiveMQ本身不提供管理工具;示例代碼少;主頁上的文檔看上去比較全面,但是缺乏一種有效的組織方式,文檔只有片段,用戶很難由淺入深進行了解,二、文檔整體的專業性太強。在研究階段可以通過查maillist、看Javadoc、分析源代碼來了解。

RabbitMQ

簡介

Rabbitmq簡介可以參考我的兩篇文章:
 openstack的RPC機制之AMQP協議(https://blog.51cto.com/caczjz/2148164) 
 RabbitMQ安裝好之后的默認賬號密碼是(guest/guest)


需要注意的是:
 多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。這種分發方式叫做round-robin(循環的方式)。
 當publisher將消息發給queue的過程中,publisher會指明routing key。Direct模式中,Direct Exchange 根據 Routing Key 進行精確匹配,只有對應的 Message Queue 會接受到消息。Topic模式中Exchange會根據routing key和bindkey進行模式匹配,決定將消息發送到哪個queue中。


有一個疑問:當有多個consumer時,rabbitmq會平均分攤給這些consumer;沒辦法把同一個message發給不同的consumer嗎?
 我之前的猜想是,當有多個consumer使用topic模式訂閱消息時,所有的消息它們都會收到;但如果是direct模式,只有一個consumer會收到消息。(理解錯誤,topic和direct只是publisher用來選擇發到不同的queue,不是consumer接收消息。一個隊列一個消息只能發送給一個消費者,不然消費者的ack也會有很多,RabbitMQ Server也不好處理)

RabbitMQ的消息確認

默認情況下,如果Message 已經被某個Consumer正確的接收到了,那么該Message就會被從queue中移除。當然也可以讓同一個Message發送到很多的Consumer。
     如果一個queue沒被任何的Consumer Subscribe(訂閱),那么,如果這個queue有數據到達,那么這個數據會被cache,不會被丟棄。當有Consumer時,這個數據會被立即發送到這個Consumer,這個數據被Consumer正確收到時,這個數據就被從queue中刪除。
      那么什么是正確收到呢?通過ack。每個Message都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數據沒有被ack,那么:
      RabbitMQ Server會把這個信息發送到下一個Consumer。而且ack的機制可以起到限流的作用(Benefitto throttling):在Consumer處理完成數據后發送ack,甚至在額外的延時后發送ack,將有效的balance Consumer的load。

RabbitMQ高可用方案

RabbitMQ有以下幾種集群模式:
普通模式(默認)
Java常用消息隊列原理介紹及性能對比

  上圖是由3個節點(Node1,Node2,Node3)組成的RabbitMQ普通集群環境,Exchange A的元數據信息在所有節點上是一致的;而Queue的完整信息只有在創建它的節點上,各個節點僅有相同的元數據,即隊列結構。
   當producer發送消息到Node1節點的Queue1中后,consumer從Node3節點拉取時,RabbitMQ會臨時在Node1、Node3間進行消息傳輸,把Node1中的消息實體取出并經過Node3發送給consumer。
 該模式存在一個問題:當Node1節點發生故障后,Node3節點無法取到Node1節點中還未被消費的消息實體。如果消息沒做持久化,那么消息將永久性丟失;如果做了持久化,那么只有等Node1節點故障恢復后,消息才能被其他節點消費。
   對于publish,客戶端任意連接集群的一個節點,轉發給創建queue的節點存儲消息的所有信息;
 對于consumer,客戶端任意連接集群中的一個節點,如果數據不在該節點中,則從存儲該消息data的節點拉取。可見當存儲有queue內容的節點失效后,只要等待該節點恢復后,queue中存在的消息才可以獲取消費的到。
   顯然增加集群的節點,可以提高整個集群的吞吐量,但是在高可用方面要稍微差一些


至于為什么只在一個節點存儲queue?
     官方認為,如果之前一個節點的消息隊列容量是1GB,那現在如果有三個節點,至少要增加2GB;同時rabbitmq的消息存儲在磁盤上,如果每個消息在所有的節點活動的話,會大大增加網絡和磁盤的負載,降低了集群的性能。

鏡像模式

  它是在普通模式的基礎上,把需要的隊列做成鏡像隊列,存在于多個節點來實現高可用(HA)。該模式解決了上述問題,Broker會主動地將消息實體在各鏡像節點間同步,在consumer取數據時無需臨時拉取。
   該模式帶來的副作用也很明顯,除了降低系統性能外,如果鏡像隊列數量過多,加之大量的消息進入,集群內部的網絡帶寬將會被大量消耗。通常地,對可靠性要求較高的場景建議采用鏡像模式。
   


  在實現機制上,mirror queue內部實現了一套選舉算法,有一個master和多個slave,queue中的消息以master為主,
   對于publish,可以選擇任意一個節點進行連接,rabbitmq內部若該節點不是master,則轉發給master,master向其他slave節點發送該消息,后進行消息本地化處理,并組播復制消息到其他節點存儲,
 對于consumer,可以選擇任意一個節點進行連接,消費的請求會轉發給master,為保證消息的可靠性,consumer需要進行ack確認,master收到ack后,才會刪除消息,ack消息會同步(默認異步)到其他各個節點,進行slave節點刪除消息。
 若master節點失效,則mirror queue會自動選舉出一個節點(slave中消息隊列最長者)作為master,作為消息消費的基準參考;在這種情況下可能存在ack消息未同步到所有節點的情況(默認異步),若slave節點失效,mirror queue集群中其他節點的狀態無需改變。

其他模式

  當然還有其他的方式,比如active/passive和shovel,主備方式(active,passive)只有一個節點處于服務狀態,可以結合pacemaker和ARBD,shovel簡單從一個broker的一個隊列中消費消息,且轉發該消息到另一個broker的交換機。 這兩種方式用的比較少,這里就不做介紹了。

RabbitMQ功能測試

本次測試依然是RabbitMQ+springboot,首先需要application.properties

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest1234

這里的端口是5672,,15672時管理端的端口。
pom要添加依賴:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId></dependency>
Direct模型

Sender的實現:

@Component
public class Sender {

    @Autowired
    private RabbitTemplate rabbitTemplate;  
      public void send(String msg) {   
         this.rabbitTemplate.convertAndSend("tiger", msg);
    }
}

Listener和listener2的實現均如下:

@Configuration
@RabbitListener(queues = "tiger")
public class Listener {
    private static final Logger LOGGER = LoggerFactory.getLogger(Listener.class); 
     @Bean
    public Queue fooQueue() {      
      return new Queue("tiger");
    }   
     @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

此時多次發送消息時,listener和listener2會按順序分別收到消息。Listener收到的消息如下:

com.example.rabbitmq.Listener  : Listener: this is a test
com.example.rabbitmq.Listener2 : Listener2: this is a test
com.example.rabbitmq.Listener  : Listener: this is a test
com.example.rabbitmq.Listener2 : Listener2: this is a test
com.example.rabbitmq.Listener  : Listener: this is a test
com.example.rabbitmq.Listener2 : Listener2: this is a test
com.example.rabbitmq.Listener  : Listener: this is a test
com.example.rabbitmq.Listener2 : Listener2: this is a test
Topic模型

Sender的實現:

@Component
public class SenderTopic {
    @Autowired
    private RabbitTemplate rabbitTemplate;    
    /*queue的key,用于和routing key 根據binding模式匹配*/
    @Bean(name="message")  
      public Queue queueMessage() {      
      return new Queue("topic.message");
    }   
     @Bean(name="messages")    
     public Queue queueMessages() { 
            return new Queue("topic.messages");
    }  
      @Bean
    public TopicExchange exchange() {     
       return new TopicExchange("exchange");
    }    
    /*設置binding key,此時所有發送到這個exchange的消息,
      exchange都會根據routing key將消息與@Qualifier定義的queue進行匹配*/
    @Bean
    Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) {     
       return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }   
     @Bean
    Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { 
           return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞
    }   
     public void send(String routingKey, String msg) {    
         this.rabbitTemplate.convertAndSend("exchange",routingKey, msg);
    }
}

Listener的實現如下:

@Configuration
@RabbitListener(queues = "topic.message")
//監聽器監聽指定的Queue
public class ListenerTopic {

    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerTopic.class);   
     @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

listener2的實現如下

@Configuration
@RabbitListener(queues = "topic.messages")
public class ListenerTopic2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerTopic2.class);   
     @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener2: " + foo);
    }
}

發送topic.message會匹配到topic.#和topic.message 兩個Receiver都可以收到消息,發送topic.messages(或者top、topic等)只有topic.#可以匹配所有只有Receiver2監聽到消息。

Fanout模型
@Configurationpublic class SenderFanout {
    @Autowired
    private RabbitTemplate rabbitTemplate;    
    /*queue的key,用于和routing key 根據binding模式匹配*/
    @Bean(name="Amessage")    
    public Queue AMessage() {        
    return new Queue("fanout.A");
    }    
    @Bean(name="Bmessage")    
    public Queue BMessage() {       
     return new Queue("fanout.B");
    }    
    @Bean
    FanoutExchange fanoutExchange() {       
     return new FanoutExchange("fanoutExchange");//配置廣播路由器
    }    
    @Bean
    Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) {        
    return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }    
    @Bean
    Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) {       
     return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }    
    public void send(String msg) {        
    this.rabbitTemplate.convertAndSend("fanoutExchange","", msg);
    }
}
@Configuration
@RabbitListener(queues = "fanout.A")//監聽器監聽指定的Queue
public class ListenerFanout {
    private static final Logger LOGGER = LoggerFactory.getLogger(ListenerFanout.class);    
    @RabbitHandler
    public void process(@Payload String foo) {
        LOGGER.info("Listener: " + foo);
    }
}

Fanout模式下,所有綁定fanout交換機的隊列,都能收到消息。

Kafka

Kafka簡介

Kafka是一種分布式的,基于發布/訂閱的消息系統。主要設計目標如下:

  • 以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間復雜度的訪問性能。

  • 高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。

  • 支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸。

  • 同時支持離線數據處理和實時數據處理。

  • Scale out:支持在線水平擴展。

Java常用消息隊列原理介紹及性能對比

如上圖所示,一個典型的Kafka集群中包含若干Producer(可以是web前端產生的Page View,或者是服務器日志,系統CPU、Memory等),若干broker(Kafka支持水平擴展,一般broker數量越多,集群吞吐率越高),若干Consumer Group,以及一個Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發生變化時進行rebalance。Producer使用push模式將消息發布到broker,Consumer使用pull模式從broker訂閱并消費消息。

Kafka代理

與其它消息系統不同,Kafka代理是無狀態的。這意味著消費者必須維護已消費的狀態信息。這些信息由消費者自己維護,代理完全不管。這種設計非常微妙,它本身包含了創新。

  • 從代理刪除消息變得很棘手,因為代理并不知道消費者是否已經使用了該消息。Kafka創新性地解決了這個問題,它將一個簡單的基于時間的SLA應用于保留策略。當消息在代理中超過一定時間后,將會被自動刪除。

  • 這種創新設計有很大的好處,消費者可以故意倒回到老的偏移量再次消費數據。這違反了隊列的常見約定,但被證明是許多消費者的基本特征。

MQ性能對比及選型

MQ性能對比

Java常用消息隊列原理介紹及性能對比

從社區活躍度

按照目前網絡上的資料,RabbitMQ 、activeM 、ZeroMQ 三者中,綜合來看,RabbitMQ 是首選。

持久化消息比較

ZeroMq 不支持,ActiveMq 和RabbitMq 都支持。持久化消息主要是指我們機器在不可抗力因素等情況下宕機了,消息不會丟失的機制。

綜合技術實現

可靠性、靈活的路由、集群、事務、高可用的隊列、消息排序、問題追蹤、可視化管理工具、插件系統等等。
RabbitMq / Kafka 最好,ActiveMq 次之,ZeroMq 最差。當然ZeroMq 也可以做到,不過自己必須手動寫代碼實現,代碼量不小。尤其是可靠性中的:持久性、投遞確認、發布者證實和高可用性。

高并發

毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高并發高可用的erlang 語言。

比較關注的比較, RabbitMQ 和 Kafka

RabbitMq 比Kafka 成熟,在可用性上,穩定性上,可靠性上,  RabbitMq  勝于  Kafka  (理論上)。RabbitMQ使用ProtoBuf序列化消息。極大的方便了Consumer的數據高效處理,與XML相比,ProtoBuf有以下優勢:  
 1.簡單  
 2.size小了3-10倍  
 3.速度快了20-100倍  
 4.易于編程  
 5.減少了語義的歧義.

ProtoBuf具有速度和空間的優勢,使得它現在應用非常廣泛。
另外,Kafka 的定位主要在日志等方面, 因為Kafka 設計的初衷就是處理日志的,可以看做是一個日志(消息)系統一個重要組件,針對性很強,所以 如果業務方面還是建議選擇 RabbitMq 。
還有就是,Kafka 的性能(吞吐量、TPS )比RabbitMq 要高出來很多。

選型最后總結:

如果我們系統中已經有選擇 Kafka,或者 RabbitMq,并且完全可以滿足現在的業務,建議就不用重復去增加和造輪子。
可以在  Kafka  和   RabbitMq  中選擇一個適合自己團隊和業務的,這個才是最重要的。但是毋庸置疑現階段,綜合考慮沒有第三選擇。


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

晋中市| 南漳县| 泗水县| 西林县| 宝丰县| 凤山县| 南雄市| 新乡县| 武穴市| 株洲县| 秀山| 临潭县| 夏津县| 塘沽区| 金昌市| 屏山县| 雅江县| 陈巴尔虎旗| 金塔县| 钟祥市| 西吉县| 库尔勒市| 双城市| 青冈县| 长治县| 武强县| 庄浪县| 龙陵县| 辰溪县| 闸北区| 喀喇沁旗| 天气| 盐津县| 永修县| 固原市| 嵊泗县| 建水县| 抚顺市| 揭西县| 康乐县| 沭阳县|