在PHP中使用rdkafka處理消息持久化的方法如下:
extension=rdkafka.so
<?php
require_once 'vendor/autoload.php';
use RdKafka\Producer;
use RdKafka\Conf;
$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('message.timeout.ms', '30000');
$conf->set('delivery.timeout.ms', '120000');
$conf->set('retries', '3');
$conf->set('offset.store.type', 'file'); // 設置offset存儲類型為文件
$conf->set('offset.store.path', '/tmp/kafka-consumer-offsets'); // 設置offset存儲路徑
$producer = new Producer($conf);
$producer->start(true);
在這個示例中,我們設置了offset.store.type
為file
,并將offset.store.path
設置為/tmp/kafka-consumer-offsets
。這將使得Kafka將消費者的offset持久化到本地文件系統中。
<?php
$producer->send([
[
'topic' => 'test_topic',
'value' => 'Hello, Kafka!',
'key' => '',
],
]);
$producer->stop();
通過以上步驟,你已經成功配置了PHP rdkafka以持久化消息。當消費者消費消息時,它們的offset將被存儲在指定的路徑中,以便在程序崩潰或重啟后能夠繼續消費。