亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SparkStreaming整合kafka的補充

發布時間:2020-07-06 09:43:00 來源:網絡 閱讀:596 作者:原生zzy 欄目:大數據

(1)SparkStreaming 整合 kafka 兩種方式對比

Direct 方式的優缺點分析

  • 優點:
    • 簡化并行(Simplified Parallelism)。不現需要創建以及 union 多輸入源,Kafka topic 的partition 與 RDD 的 partition 一一對應。
    • 高效(Efficiency)。基于 Receiver-based 的方式保證數據零丟失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此種方式需要保存兩份數據,浪費存儲空間也影響效率。而 Direct 方式則不存在這個問題。
    • 強一致語義(Exactly-once semantics)。High-level 數據由 Spark Streaming 消費,但是Offsets 則是由 Zookeeper 保存。通過參數配置,可以實現 at-least once 消費,此種情況有重復消費數據的可能。
    • 降低資源。Direct 不需要 Receivers,其申請的 Executors 全部參與到計算任務中;而Receiver-based 則需要專門的 Receivers 來讀取 Kafka 數據且不參與計算。因此相同的資源申請,Direct 能夠支持更大的業務。
    • 降低內存。Receiver-based 的 Receiver 與其他 Exectuor 是異步的,并持續不斷接收數據,對于小業務量的場景還好,如果遇到大業務量時,需要提高 Receiver 的內存,但是參與計算的 Executor 并無需那么多的內存。而 Direct 因為沒有 Receiver,而是在計算時讀取數據,然后直接計算,所以對內存的要求很低。
  • 缺點:
    • 提高成本。Direct 需要用戶采用 checkpoint 或者第三方存儲來維護 offsets,而不像Receiver-based 那樣,通過 ZooKeeper 來維護 Offsets,此提高了用戶的開發成本。
    • 監控可視化。Receiver-based 方式指定 topic 指定 consumer 的消費情況均能通過ZooKeeper 來監控,而 Direct 則沒有這種便利,不能自動保存 offset 到 zookeeper,如果做到監控并可視化,則需要投入人力開發。
      Receiver 方式的優缺點分析
  • 優點:
    • 專注計算。Kafka 的 high-level 數據讀取方式讓用戶可以專注于所讀數據,而不用關注或維護 consumer 的 offsets,這減少用戶的工作量以及代碼量而且相對比較簡單。
  • 缺點:
    • 防數據丟失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable參數,配置 spark.streaming.receiver.writeAheadLog.enable 參數,每次處理之前需要將該batch 內的日志備份到 checkpoint 目錄中,這降低了數據處理效率,反過來又加重了Receiver 端的壓力;另外由于數據備份機制,會受到負載影響,負載一高就會出現延遲的風險,導致應用崩潰。
    • 單 Receiver 內存。由于 receiver 也是屬于 Executor 的一部分,那么為了提高吞吐量
    • 重復消費。在程序失敗恢復時,有可能出現數據部分落地,但是程序失敗,未更新 offset的情況,這導致數據重復消費。
    • Receiver 和計算的 Executor的異步的,那么遇到網絡等因素原因,導致計算出現延遲,計算隊列一直在增加,而Receiver 則在一直接收數據,這非常容易導致程序崩潰。

      (2)對kafka消費的offset的管理

  • spark自帶的checkpoint:
    • 啟用spark streaming的checkpoint是存儲偏移量的最簡單方法
    • 流式checkpoint專門保存用戶應用程序的狀態
    • 但是checkpoint的目錄是不能共享的,無法跨越應用程序進行恢復
    • 一般不使用checkpoint管理offset
  • 使用zookeeper管理offset
    • 如果Zookeeper中未保存offset,根據kafkaParam的配置使用最新或者最舊的offset
    • 如果 zookeeper中有保存offset,我們會利用這個offset作為kafkaStream 的起始位置
  • 使用hbase保存offset
    • Rowkey的設計:topic名稱 + groupid + streaming的batchtime.milliSeconds
  • 使用hdfs管理offset:當然這種情況不推薦使用,因為在hdfs中會生成大量的小文件,導致,hdfs的性能急劇下降

    (3)Driver的HA

      介紹:他能夠在driver失敗的時候,通過讀取checkpoint目錄下的元數據,恢復當前streamingContext對象的狀態;它能夠察覺到driver進程異常退出之后,自動重啟。
      具體流程:當第一次運行程序時,發現checkpoint中沒有數據,則根據定義的函數來第一次創建StreamingContext對象,當程序異常退出的時候,此時會根據checkpoint中的元數據恢復一個StreamingContext對象,達到異常退出之前的狀態,而實現異常退出并自動啟動則是sparkStreaming應用程序對driver進行監控,并且在他失敗的時候感知,并進行重啟。
      必要條件
      - spark-submit提交作業的時候,必須是集群模式(cluster),并且必須在spark-standalong下。

    spark-submit \
    --class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct \
    //這里只能使用spark的standalong模式,所以配置為spark集群
    --master spark://hadoop02:7077,hadoop04:7077 \
    --driver-memory 512m \
    --total-executor-cores 3 \
    --executor-memory 512m \
    #這句代碼一定要加,他可以使異常退出的driver程序,重新啟動
    --supervise \   
    --name SparkStreamDemo_Direct \
    --jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,\
    /home/hadoop/lib/metrics-core-2.2.0.jar,\
    /home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\
    /home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,\
    /home/hadoop/lib/zkclient-0.3.jar \
    /home/hadoop/original-spark-1.0-SNAPSHOT.jar \
    spark://hadoop02:7077,hadoop04:7077

      - 需要添加--supervise \,才能實現失敗自啟動
      - 需要配置checkpoint目錄,并且是存儲在hdfs上,jar也要放置在hdfs上

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

手游| 泉州市| 德昌县| 岱山县| 丹阳市| 松阳县| 元氏县| 武邑县| 冕宁县| 丰县| 平昌县| 河北省| 琼结县| 昌乐县| 武夷山市| 高安市| 安宁市| 镇江市| 什邡市| 阿拉善左旗| 普宁市| 庄浪县| 紫阳县| 朝阳市| 沙河市| 嘉兴市| 鄂托克前旗| 松溪县| 正定县| 静宁县| 巩留县| 合水县| 孟州市| 阿巴嘎旗| 阜城县| 都匀市| 隆回县| 巴林右旗| 五台县| 石门县| 武邑县|