您好,登錄后才能下訂單哦!
本篇內容主要講解“消息隊列RabbitMQ入門與PHP實例分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“消息隊列RabbitMQ入門與PHP實例分析”吧!
說明
MQ(Message Queue) 即消息隊列,是應用間的通信方式,消息發送后可立即返回,由消息系統來確保消息的可靠傳遞。”消息隊列“是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現生產者和消費者的解耦。
為什么使用消息中間件?
消息隊列是分布式系統中重要的組件,解決應用解耦,異步消息,流量削峰等問題,實現高并發,高可用,可伸縮和最終一致性架構
異步處理
用戶注冊信息后需要發送郵件和注冊短信
1、用戶注冊信息寫入數據庫后即使返回注冊成功的信息
2、發送郵件和注冊短信通過消息隊列異步執行,用戶不需要等待這兩個操作
應用解耦
用戶下單后,訂單系統需要通知庫存系統。傳統的做法是,訂單系統調用庫存系統的接口,進行增減庫存
1、用戶下單入列生產,返回成功提示
2、隊列消費庫存系統,進行庫存增減
流量削峰
流量削峰也是消息隊列中的常見場景,一般在秒殺或團搶活動中使用廣泛
1、當一批用戶請求過來進入列隊,控制入列數量,超出一定數量返回秒殺結束
2、然后隊列一個個按照先進先出進行隊列消費
Rabbitmq特性
可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
靈活的路由(Flexible Routing) 在消息進入隊列之前,通過 Exchange 來路由消息的。對于典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的 Exchange 。
消息集群(Clustering) 多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
高可用(Highly Available Queues) 隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
多語言客戶端(Many Clients) RabbitMQ 幾乎支持所有常用語言,比如PHP Java、.NET、Ruby 等等。
管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing) 如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。
RabbitMQ的工作原理
Broker: 接收和分發消息的應用,RabbitMQ Server就是Message Broker。
Virtual host: 類似于mysql的數據庫,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue等。
Connection: publisher/consumer和broker之間的TCP連接。
Channel: 如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。
Exchange: message到達broker的第一站,根據分發規則,匹配查詢表中的routing key,分發消息到queue中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue: 消息最終被送到這里等待consumer取走。一個message可以被同時拷貝到多個queue中。
RabbitMQ官方地址:http://www.rabbitmq.com
安裝rabbitmq需要先安裝erlang
第一步:erlang 安裝
安裝rabbitmq需要先安裝erlang,centos7不支持erlang 24版本的安裝
下載:
# 系統 centos 7# 下載erlang包,手動下載后上傳至服務器,我在使用wget下載后無法安裝,這里沒明白 # 安裝 yum install erlang-23.3.4.4-1.el7.x86_64.rpm # 驗證安裝是否成功 erl
第二步:安裝rabbitmq
# 系統 centos 7# 下載rabbitmq包,手動下載后上傳至服務器,我在使用wget下載后無法安裝,這里沒明白 # 安裝 yum install rabbitmq-server-3.8.19-1.el7.noarch.rpm # 啟動 systemctl start rabbitmq-server # 關閉 systemctl stop rabbitmq-server # 查看默認端口服務是否啟動 netstat -tunlp
4369:epmd(Erlang Port Mapper Daemon),erlang服務端口
5672 :client端通信口
15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)不一定會啟動
25672:用于節點間通信(Erlang分發服務器端口)
rabbitmq 管理命令
啟動15672:HTTP API客戶端,管理UI(僅在啟用了管理插件的情況下)
# 啟動rabbitmq_management插件 rabbitmq-plugins enable rabbitmq_management # 查看所有插件 rabbitmq-plugins list
測試訪問UI界面:(此時非localhost地址是無法登錄)
http://192.168.10.105:15672/
rabbitmq 配置管理界面
# 新增一個用戶 rabbitmqctl add_user 【用戶名Username】 【密碼Password】 rabbitmqctl add_user root root # 刪除一個用戶 rabbitmqctl delete_user Username # 修改用戶的密碼 rabbitmqctl change_password Username Newpassword # 查看當前用戶列表 rabbitmqctl list_users # 設置用戶角色的命令為: rabbitmqctl set_user_tags User Tag rabbitmqctl set_user_tags root administrator # User為用戶名, Tag為角色名(對應于上面的administrator,monitoring,policymaker,management,或其他自定義名稱)。
命令行創建vhost以及php擴展安裝
類似于mysql的數據庫,當多個不同的用戶使用同一個RabbitMQ server提供的服務時,可以劃分出多個vhost,每個用戶在自己的vhost創建exchange/queue等。
1)查看不同用戶的vhost
創建vhost,以及分配權限
# 新增vhost rabbitmqctl add_vhost vhostname rabbitmqctl add_vhost order # 查看vhost列表 rabbitmqctl list_vhosts #為vhost添加用戶 rabbitmqctl set_permissions -p vhostname username ".*" ".*" ".*"rabbitmqctl set_permissions -p order root ".*" ".*" ".*" ".*" ".*" ".*"后邊三個.*分別代表:配置權限、寫權限、讀權限
2)為php安裝rabbitmq擴展安裝
https://github.com/php-amqplib/php-amqplib 擴展安裝
修改阿里云鏡像
composer config -g repo.packagist composer https://mirrors.aliyun.com/composer/
開始下載–這里有時候會下載成2.8低版本的,需要指定版本
,下載不成功則升級composer、php.ini 打開 sockets 擴展和切換國內鏡像
# 升級composer composer self-update #php.ini 打開 sockets 擴展 #下載指定版本 composer require php-amqplib/php-amqplib=^3.0
simple模式生產者消息推送到消息隊列
文檔:
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
簡單的生產者與消息者
生產者代碼
http://localhost/rabbitmq/simple/pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生產者 //Connection: publisher/consumer和broker之間的TCP連接 //Channel: 如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明隊列名為:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); //生產數據 $data = 'this is messge'; //創建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //發布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); //關閉連接 $channel->close(); $connection->close();
運行生產者腳本:
http://localhost/rabbitmq/simple/pro.php
點擊goods隊列可以進入到消息詳情
http://localhost/rabbitmq/simple/con.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明隊列名為:goods $queue_name = 'goods'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //開啟消費 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
rabbitmq Work Queues
一個生產者對應多個消費者,消費特別慢時增加幾個消費分發
生產者,和上文生產者不變
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //生產者 //Connection: publisher/consumer和broker之間的TCP連接 //Channel: 如果每一次訪問RabbitMQ都建立一個Connection,在消息量大的時候建立TCP Connection的開銷將是巨大的,效率也較低。Channel是在connection內部建立的邏輯連接Channel作為輕量級的Connection極大減少了操作系統建立TCP connection的開銷。 //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明隊列名為:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); for ($i = 0; $i < 10; $i++) { //生產數據 $data = 'this is messge' . $i; //創建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //發布消息 $channel->basic_publish($msg, $exchange = '', $queue_name); } //關閉連接 $channel->close(); $connection->close();
消費者worker1
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明隊列名為:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; }; //開啟消費 $channel->basic_consume($queue_name, '', false, true, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
消費者worker2,代碼和worker1一樣,同時運行開啟后會一起消費
D:\phpstudy_pro\WWW\rabbitmq\worker\worker2.php
消費者消費消息ack確認
用以確認不會丟失消息
消費消息
basic_consume($queue = ‘’, $consumer_tag = ‘’, $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array())
no_ack=false,設置為手動應答
開啟后需要進行消息的消費確認后才會進行移除,否者該消息會一直存在消息隊列中
消費端代碼
D:\phpstudy_pro\WWW\rabbitmq\worker\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明隊列名為:task_queue $queue_name = 'task_queue'; $channel->queue_declare($queue_name, false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //確認消息已被消費,從生產隊列中移除 $msg->ack(); }; //設置消費成功后才能繼續進行下一個消費 $channel->basic_qos(null, 1, null); //開啟消費no_ack=false,設置為手動應答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
發布/訂閱模式
是要是公用一個交換機的消費端都能收到同樣的消息,類似廣播的功能
文檔:rabbitmq Publish/Subscribe
https://www.rabbitmq.com/tutorials/tutorial-three-php.html
rabbitmq Exchange類型
交換器、路由鍵、綁定 Exchange:交換器。發送消息的AMQP實體。交換器拿到一個消息之后將它路由給一個或幾個隊列。它使用哪種路由算法是由交換機類型和被稱作綁定(Binding)的規則所決定的。RabbitMQ有四種類型。 RoutingKey:路由鍵。生產者將消息發送給交換器。一般會指定一個RoutingKey,用來指定這個消息的路由規則,而這個RoutingKey需要與交換器類型和綁定鍵(BindingKey)聯合使用才能最終失效。 Binding:綁定。綁定(Binding)是交換機(Exchange)將消息(Message)路由給隊列(Queue)所需遵循的規則。 # 四種模式 Direct 定向 消息與一個特定的路由鍵完全匹配 Topic 通配符 路由鍵和某模式進行匹配 Fanout 廣播 發送到該類型交換機的消息都會被廣播到與該交換機綁定的所有隊列 Headers 不處理路由鍵,而是根據發送的消息內容中的headers屬性進行匹配
exchange_declare($exchange, $type, $passive = false, $durable = false, $auto_delete = true, $internal = false, $nowait = false, $arguments = array(), $ticket = null) 。試探性申請一個交換器,若該交換器不存在,則創建;若存在,則跳過。
生產者代碼
D:\phpstudy_pro\WWW\rabbitmq\ps\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //聲明數據 $data = 'this is fanout message'; //創建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //發布消息 $channel->basic_publish($msg, $exc_name); //關閉連接 $channel->close(); $connection->close();
fanout模式消費者消費消息
是要是公用一個交換機的消費端都能收到同樣的消息,類似廣播的功能
當消費端運行時才會顯示該隊列
消費端:
D:\phpstudy_pro\WWW\rabbitmq\ps\worker1.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'exch'; $channel->exchange_declare($exc_name, 'fanout', false, false, false); //獲取系統生成的消息隊列名稱 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //將隊列名與交換器名進行綁定 $channel->queue_bind($queue_name,$exc_name); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //確認消息已被消費,從生產隊列中移除 $msg->ack(); }; //設置消費成功后才能繼續進行下一個消費 $channel->basic_qos(null, 1, null); //開啟消費no_ack=false,設置為手動應答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
文檔:
https://www.rabbitmq.com/tutorials/tutorial-four-php.html
用來指定不同的交換機和指定routing_key,在消費端進行消費
生產者代碼:
D:\phpstudy_pro\WWW\rabbitmq\routing\pro.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; //指定交換機類型為direct $channel->exchange_declare($exc_name, 'direct', false, false, false); //聲明數據 $data = 'this is ' . $routing_key . ' message'; //創建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //發布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //關閉連接 $channel->close(); $connection->close();
消費者代碼
D:\phpstudy_pro\WWW\rabbitmq\routing\info.php
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'info'; $channel->exchange_declare($exc_name, 'direct', false, false, false); //獲取系統生成的消息隊列名稱 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //將隊列名與交換器名進行綁定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //確認消息已被消費,從生產隊列中移除 $msg->ack(); }; //設置消費成功后才能繼續進行下一個消費 $channel->basic_qos(null, 1, null); //開啟消費no_ack=false,設置為手動應答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
通配符的匹配模式
如消費端中routing_key = ‘user.*’;
生產者:
指定routing_key= ‘user.top’
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'topic_log'; //指定routing_key $routing_key = 'user.top'; //指定交換機類型為direct $channel->exchange_declare($exc_name, 'topic', false, false, false); //聲明數據 $data = 'this is ' . $routing_key . ' message'; //創建消息 $msg = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]); //發布消息 //指定使用的routing_key $channel->basic_publish($msg, $exc_name, $routing_key); //關閉連接 $channel->close(); $connection->close();
消費者
消費端中routing_key = ‘user.*’;
<?php require_once "../vendor/autoload.php"; use PhpAmqpLib\Connection\AMQPStreamConnection; //建立connction $connection = new AMQPStreamConnection('192.168.10.105', 5672, 'root', 'root', 'order'); //Channel $channel = $connection->channel(); //聲明交換器 $exc_name = 'direct_log'; //指定routing_key $routing_key = 'user.*'; $channel->exchange_declare($exc_name, 'topic', false, false, false); //獲取系統生成的消息隊列名稱 list($queue_name, ,) = $channel->queue_declare('', false, false, true, false); //將隊列名與交換器名進行綁定,并指定routing_key $channel->queue_bind($queue_name,$exc_name,$routing_key); $callback = function ($msg) { echo 'received = ', $msg->body . "\n"; //確認消息已被消費,從生產隊列中移除 $msg->ack(); }; //設置消費成功后才能繼續進行下一個消費 $channel->basic_qos(null, 1, null); //開啟消費no_ack=false,設置為手動應答 $channel->basic_consume($queue_name, '', false, false, false, false, $callback); //不斷的循環進行消費 while ($channel->is_open()) { $channel->wait(); } //關閉連接 $channel->close(); $connection->close();
到此,相信大家對“消息隊列RabbitMQ入門與PHP實例分析”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。