亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Pulsar啟動了哪些服務

發布時間:2021-12-24 10:34:28 來源:億速云 閱讀:272 作者:iii 欄目:大數據

這篇文章主要講解了“Apache Pulsar啟動了哪些服務”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Apache Pulsar啟動了哪些服務”吧!

1.啟動入口

PulsarStandaloneStarter
在standalone模式下,主要啟動了以下幾個服務

  1. PulsarService

  2. PulsarAdmin

  3. LocalBookeeperEnsemble

  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,啟動了以下幾個服務

  1. PulsarService

  2. BookieServer

  3. AutoRecoveryMain

  4. StatsProvider

  5. WorkerService

簡單說一些這幾個服務

  • WorkerService: Pulsar function 相關,可以不啟動

  • PulsarService: 主要的PulsarBroker相關

  • BookieServer: Bookeeper相關

  • AutoRecoveryMain: Bookeeper autorecovery相關

  • StatsProvider: Metric Exporter類似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol處理(kafka協議等)

  2. localZookeeperConnectionProvider
    維護zk session 和zk連接

  3. startZkCacheService

    • LocalZooKeeperCache => LocalZooKeeperCacheService

    • GlobalZooKeeperCache => ConfigurationCacheService

  4. BookkeeperClientFactory
    創建配置Bookkeeper 客戶端

  5. managedLedgerClientFactory
    維護一個ManagedLedger的客戶端,借用BookkeeperClient

  6. BrokerService
    這個是服務器的主要邏輯了,這個放在后面說

  7. loadManager
    收集集群機器負載,并根據負載情況均衡負載

  8. startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相關

  9. schemaStorage

  10. schemaRegistryService
    上面2個都是和Schema相關的

  11. defaultOffloader
    LedgerOffloader,用來將Ledger(Bookkeeper)中的冷數據放到其他存儲當中

  1. WebService

  2. webSocketService
    http,websocket相關

  3. LeaderElectionService
    和LoadManager有關,如果是集中方式的話需要選出一個Leader定期根據集群情況進行均衡負載

  4. transactionMetadataStoreService
    事務相關

  5. metricGenerator
    metric相關

  6. WorkerService
    pulsar function 相關

3. BrokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 網絡層配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 綁定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 啟動了一堆需要定期執行的任務
        this.startInactivityMonitor();
       // 啟動3個schedule任務分別檢測
       // 1. 長時間無效的topic
       // 2. 長時間無效的producer(和message去重相關)
       // 3. 長時間無效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

順著netty的初始化方式我們直接看ChannelInitializer,這里應該和Kafka類似進行處理請求的操作。

protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));

        ch.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

這個類的作用可以對標KafkaApis,處理各種Api請求
這個類實際上是一個ChannelHandler
繼承了PulsarHandler(主要負責一些連接的keepalive邏輯)
PulsarHandler繼承了 PulsarDecoder ( 主要負責序列化,反序列化Api請求)
PulsarDecoder實際上是一個 ChannelInboundHandlerAdapter

而PulsarAPi實際上是通過Pulsar.proto 生成的,這里編寫了各種Api的定義

感謝各位的閱讀,以上就是“Apache Pulsar啟動了哪些服務”的內容了,經過本文的學習后,相信大家對Apache Pulsar啟動了哪些服務這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阳信县| 新密市| 达孜县| 鄂州市| 昭通市| 阳山县| 句容市| 巴彦淖尔市| 师宗县| 江西省| 安泽县| 汕头市| 临沧市| 农安县| 余江县| 石棉县| 喀喇| 诸城市| 莫力| 乌恰县| 宝兴县| 内江市| 乌什县| 康乐县| 宣武区| 甘孜| 江川县| 惠州市| 监利县| 桂阳县| 苍溪县| 邯郸市| 陕西省| 林口县| 江华| 临邑县| 武山县| 万盛区| 贡山| 泗水县| 新闻|