亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

kafka手動提交偏移量怎么實現

小億
162
2023-11-28 17:11:46
欄目: 大數據

Kafka 提供了兩種方式來手動提交偏移量:

  1. 使用 commitSync() 方法同步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitSync(); // 手動提交偏移量
    }
} finally {
    consumer.close();
}
  1. 使用 commitAsync() 方法異步提交偏移量:
import org.apache.kafka.clients.consumer.*;

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 關閉自動提交

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 處理消息
        }
        consumer.commitAsync(); // 異步提交偏移量
    }
} finally {
    consumer.close();
}

在這兩種方式中,commitSync() 方法會一直阻塞直到偏移量提交成功或發生錯誤。而 commitAsync() 方法則會在提交請求發送后立即返回,不會等待確認。如果發生錯誤,可以在 commitAsync() 方法的回調函數中處理。

0
晋城| 赣州市| 青铜峡市| 南和县| 贵定县| 嵩明县| 开封县| 本溪市| 南木林县| 兰溪市| 镇雄县| 营口市| 理塘县| 鄂温| 历史| 怀安县| 浦东新区| 英山县| 辉县市| 中西区| 湖州市| 武冈市| 监利县| 苍溪县| 垣曲县| 克什克腾旗| 福安市| 峡江县| 托克托县| 顺义区| 恭城| 屏南县| 大同县| 汉源县| 光山县| 南皮县| 阿拉尔市| 沭阳县| 永德县| 永济市| 宿州市|