亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換

發布時間:2020-05-27 17:06:11 來源:網絡 閱讀:1085 作者:Java_老男孩 欄目:編程語言

前文摘要

在前面的文章《Kafka的Lag計算誤區及正確實現》中介紹了如何計算消費者的消費滯后量(Lag),并且講解了如何調用Kafka的kafka.admin.ConsumerGroupCommand文件中的KafkaConsumerGroupService來發送OffsetRequest和OffsetFetchRequest兩個請求,進而通過兩個請求結果之間的差值來獲得結果。不過如果你不想修改kafka-core的代碼并重新編譯的話,這種實現方式無法成功,所以本文的主要目的就是通過調用更底層的API來實現不修改kafka-core的代碼來實現KafkaConsumerGroupService的功能,即通過Java調用Scala的代碼來實現獲取Kafka的消費者詳情的功能。

目標及實現

實現如同 bin/kafka-consumer-group.sh –describe –bootstrap-server localhost:9092 –group CONSUMER_GROUP_ID的效果:

[root@node2 kafka_2.12-1.0.0]# bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --group CONSUMER_GROUP_ID
TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                   CLIENT-ID
topic-test1          0          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          1          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          2          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID
topic-test1          3          1648            1648            0          CLIENT_ID-e2d41f8d-dbd2-4f0e-9239-efacb55c6261    /192.168.92.1          CLIENT_ID

KafkaConsumerGroupService的核心方法是CollectGroupAssignment,其方法參數為一個consumer group的groupId,方法輸出為上面示例中的列表信息。CollectGroupAssignment方法主要有以下幾個步驟:

  1. 根據groupId調用describeConsumerGroup方法(內部原理是發送DescribeGroupsRequest請求)來獲取consumer group的基本信息,參考上面示例中的CONSUMER-ID、HOST、CLIENT-ID以及TopicPartition信息,但是沒有CURRENT-OFFSET、LOG-END-OFFSET、LAG信息。注意這里的LOG-END-OFFSET是消費者可見的LEO,不是生產者可見的LEO,也就是通俗意義上的HW。
  2. 根據groupId調用listGroupOffsets方法(內部原理是發送OffsetFetchRequest請求)來獲取各個分區(Partition)的對應的消費位移CURRENT-OFFSET。
  3. 通過調用KafkaConsumer的endOffsets方法來獲取TopicPartition對應的HW,即示例中的LOG-END-OFFSET。
  4. 計算Lag并組合成信息列表List<partitionassignmentstate style="margin-block-start: 0px; margin-block-end: 0px; margin: 0px; padding: 0px;">。</partitionassignmentstate>
改造

對應Java版的KafkaConsumerGroupService改造代碼可以參見代碼,目錄結構如下圖所示:
如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換

其中model中的ConsumerGroupSummary、ConsumerSummary和PartitionAssignmentState是簡單的JavaBean, PartitionAssignmentState是用來保存每個TopicPartition的消費者信息的,具體內容參考如下。KafkaConsumerGroupCustomService就是本文所要陳述的Java改造辦的KafkaConsumerGroupSerivice,ConsumerGroupUtils用來存放一些公用的代碼。

@Data
@Builder
public class PartitionAssignmentState {
    private String group; // groupId
    private Node coordinator; // consumer coodinator節點信息
    private String topic;
    private int partition;
    private long offset;
    private long lag;
    private String consumerId;
    private String host;
    private String clientId;
    private long logEndOffset;
}

初始化KafkaConsumerGroupCustomService需要Kafka的服務端地址,然后初始化AdminClient和KafkaConsumer,AdminClient中包含了眾多管理類方法,主要是通過發送各種自定義協議請求來完成,上面步驟中所說的describeConsumerGroup和listGroupOffsets方法也是通過AdminClient來實現的;KafkaConsumer主要是用來獲取TopicPartition對應的HW(消費者可見的LogEndOffsets)的。

KafkaConsumerGroupCustomService中與scala版對應的collectGroupAssignment方法如下(詳細步驟參考代碼注釋):

public List<PartitionAssignmentState> collectGroupAssignment(
        AdminClient adminClient, KafkaConsumer<String, String> consumer,
        String group) {
    //1. 獲取consumer group的基本信息,包括CONSUMER-ID、HOST、
    // CLIENT-ID以及TopicPartition信息
    AdminClient.ConsumerGroupSummary consumerGroupSummary
            = adminClient.describeConsumerGroup(group, 0);
    List<TopicPartition> assignedTopicPartitions = new ArrayList<>();
    List<PartitionAssignmentState> rowsWithConsumer = new ArrayList<>();
    scala.collection.immutable.List<AdminClient.ConsumerSummary> consumers
            = consumerGroupSummary.consumers().get();
    if (consumers != null) {
        //2. 獲取各個分區(Partition)的對應的消費位移CURRENT-OFFSET
        scala.collection.immutable.Map<TopicPartition, Object> offsets
                = adminClient.listGroupOffsets(group);
        if (offsets.nonEmpty()) {
            String state = consumerGroupSummary.state();
            // 3. 還有一個狀態是Dead表示"group"對應的consumer group不存在
            if (state.equals("Stable") || state.equals("Empty")
                    || state.equals("PreparingRebalance")
                    || state.equals("AwaitingSync")) {
                List<ConsumerSummary> consumerList = changeToJavaList(consumers);
                // 4. 獲取當前有消費者的消費信息,即包含CONSUMER-ID、HOST、CLIENT-ID
                rowsWithConsumer = getRowsWithConsumer(consumerGroupSummary, offsets,
                        consumer, consumerList, assignedTopicPartitions, group);
            }
        }
        //5. 獲取當前沒有消費者的消費信息
        List<PartitionAssignmentState> rowsWithoutConsumer =
                getRowsWithoutConsumer(consumerGroupSummary,
                offsets, consumer, assignedTopicPartitions, group);
        //6. 合并結果
        rowsWithConsumer.addAll(rowsWithoutConsumer);
    }
    return rowsWithConsumer;
}

KafkaConsumerGroupCustomService類中包含有getRowsWithConsumer()、getRowsWithoutConsumer()、changeToJavaList等私有方法也都是在Scala語言與Java語言之間進行切換,這樣可以不需要修改kafka-core的原生代碼而通過外部的封裝調用既可以實現獲取Kafka消費者詳情的功能。光看代碼比較抽象,建議對此感興趣的同學可以親自對比一下kafka-core包中kafka.admin.ConsumerGroupCommand的KafkaConsumerGroupSerivice與筆者自定義的KafkaConsumerGroupCustomService的實現來了解下Scala語言到Java語言的轉換。

如果需要打印詳情可以調用KafkaConsumerGroupCustomService同目錄的ConsumerGroupUtils類中的printPasList(List list)方法。注意要運行這些代碼需要JDK8的環境,筆者為了讓代碼顯得“騷氣”一點就用來一點Java8的語法,如果需要Java7的代碼實現可以關注私聊。

或許有些同學對于Scala和Java交叉的代碼并不感冒,想要尋求一種存Java式的實現方式,那么在這里怎么實現呢?答案是通過KafkaAdminClient,它是AdminClient的Java版實現,從Kafka0.11.0.0版本開始引入的,不過KafkaAdminClient本身并沒有提供describeConsumerGroup、listGroupOffsets之類的方法給我們直接使用,擴展一下也很方便,由于篇幅限制,這部分的內容將在下一篇文章中進行介紹,如果想要先一睹為快,可以參考下代碼實現,詳細的邏輯解析敬請期待….


本文的重點是你有沒有收獲與成長,其余的都不重要,希望讀者們能謹記這一點。同時我經過多年的收藏目前也算收集到了一套完整的學習資料,包括但不限于:分布式架構、高可擴展、高性能、高并發、Jvm性能調優、Spring,MyBatis,Nginx源碼分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多個知識點高級進階干貨,希望對想成為架構師的朋友有一定的參考和幫助

需要更詳細思維導圖和以下資料的可以加一下技術交流分享群:“708 701 457”免費獲取

如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換
如何獲取 Kafka 的消費者詳情 —— 從 Scala 到 Java 的切換

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

灌南县| 若尔盖县| 台东县| 盐城市| 大化| 淮阳县| 通道| 武陟县| 金塔县| 德安县| 墨脱县| 富锦市| 平阴县| 六枝特区| 海宁市| 泸定县| 彭阳县| 青河县| 兰坪| 白朗县| 资源县| 合作市| 凯里市| 海南省| 绍兴市| 天津市| 丹凤县| 曲阜市| 紫金县| 财经| 安仁县| 尤溪县| 金沙县| 阳谷县| 礼泉县| 赤壁市| 罗甸县| 凤山市| 东乌珠穆沁旗| 南召县| 黄平县|