在PHP中,可以使用多線程來開啟多個進程,以實現同時處理多個任務。以下是一個使用php-amqplib庫和多線程的示例代碼:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPTableFlags;
// AMQP連接參數
$host = 'localhost';
$port = 5672;
$user = 'guest';
$password = 'guest';
$vhost = '/';
$exchange = 'my_exchange';
$routingKey = 'my_routing_key';
$queue = 'my_queue';
// 創建AMQP連接
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 聲明交換機和隊列
$channel->exchange_declare($exchange, 'direct', false, true);
// 設置隊列參數,開啟多個消費者進程
$args = new AMQPTable();
$args->set('x-max-priority', 10); // 設置隊列最大優先級為10
$args->set('x-max-length', 1000); // 設置隊列最大長度為1000
$args->set('x-overflow', 'drop-head'); // 隊列溢出策略為刪除頭部消息
// 聲明隊列
$channel->queue_declare($queue, false, true, false, false, false, $args);
// 將隊列綁定到交換機
$channel->queue_bind($queue, $exchange, $routingKey);
// 設置消費者回調函數
$callback = function (AMQPMessage $message) {
// 處理消息
echo 'Received message: ' . $message->getBody() . PHP_EOL;
$message->ack();
};
// 開啟多個消費者進程
$processes = 5;
for ($i = 0; $i < $processes; $i++) {
$pid = pcntl_fork();
if ($pid == -1) {
die('Could not fork');
} elseif ($pid) {
// 父進程,繼續創建下一個子進程
continue;
} else {
// 子進程,創建新的連接和通道
$connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
$channel = $connection->channel();
// 設置消費者回調函數
$channel->basic_consume($queue, '', false, false, false, false, $callback);
// 循環接收消息
while (count($channel->callbacks)) {
$channel->wait();
}
// 關閉連接和通道
$channel->close();
$connection->close();
// 子進程退出
exit(0);
}
}
// 等待子進程退出
while (pcntl_waitpid(0, $status) != -1) {
$status = pcntl_wexitstatus($status);
echo "Child process $status completed" . PHP_EOL;
}
// 關閉連接和通道
$channel->close();
$connection->close();
以上代碼使用pcntl_fork()
函數創建了多個子進程,每個子進程都擁有自己的AMQP連接和通道,并通過設置不同的消費者回調函數來處理消息。請根據實際需求修改代碼中的連接參數和回調函數。
注意:使用多線程時,需要將代碼保存為獨立的PHP文件,并通過命令行來運行,如php filename.php
。