亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

coalesce與repartition怎么使用

發布時間:2021-12-09 16:52:27 來源:億速云 閱讀:163 作者:iii 欄目:大數據

這篇文章主要介紹“coalesce與repartition怎么使用”,在日常操作中,相信很多人在coalesce與repartition怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”coalesce與repartition怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

coalesce

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

一、功能介紹

coalesce算子最基本的功能就是返回一個numPartitions個partition的RDD。

二、使用及注意事項

這個算子的結果默認是窄依賴,舉個例子

coalesce(100)

如果你想把1000個partition減少到100個partition,此時不會發生shuffle,而是每一個你設定的新partition都會替代原來的10個partition。如果初始的最大partition是100個,而你想用coalesce(1000)把partition數增至1000,這是不行的。
現在有一個需求,需要將某一個文件做ETL,最后想輸出成一個文件,你會怎么辦呢?
這樣么?

val logs=sc.textFile(args(0),6)//你想初始化6個分區,并行執行,之后再合并成1個文件

logs.map(x=>{
      if(x.split("\t").length==72){
        val clean=parse(x)  //此處是進行了ETL
        clean
      }
    }).coalesce(2).saveAsTextFile(args(1))

如果你同意的話,可以寫個demo測試一下,你會發現,僅僅有一個task!在生產上這是絕對不行!因為上述ETL的spark job僅僅有一個stage,你雖然初始化RDD是設定的6個partition,但是在action之前你使用了.coalesce(1),此時會優先使用coalesce里面的partition數量初始化RDD,所以僅僅有一個task。生產中文件很大的話,你就只能用兩個節點處理,這樣無法發揮集群的優勢了。解決:要在coalesce中加shuffle=tule

val logs=sc.textFile(args(0),6)

logs.map(x=>{
      if(x.split("\t").length==72){
        val clean=parse(x)  //此處是進行了ETL
        clean
      }
    }).coalesce(2,shuffle = true).saveAsTextFile(args(1))

這樣,我們就會有兩個stage,stage1是6個并行高速ETL處理,stage2是通過shuffle合并成2個文件
如下圖
coalesce與repartition怎么使用
我們知道了,可以手動設定shuffle的發生,那么問題來了,剛剛我們不能將初始化的分區數變大,如果加上shuffle可不可以呢?答案是可以的~
如果出事RDD為100個分區,你覺得并行度不夠,你可以coalesce(1000,shuffle = true),將分區數增加到1000(默認hash partitioner進行重新),當然你也可以使用自定義分區器,但是一定要序列化。

三、總結

  1. coalesce算子默認只能減少分區數量,但是可以通過開啟shuffle增加分區數量

  2. coalesce的作用常常是減少分區數,已達到輸出時合并小文件的效果。

  3. 在一個stage中,coalesce中設定的分區數是優先級最高的,如果想增加并行度,并合并文件,那么請開啟coalesce中的shuffle,這樣就會變成兩個stage。達到并行且合并的效果。

repartition

/**
   * Return a new RDD that has exactly numPartitions partitions.
   *
   * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
   * a shuffle to redistribute data.
   *
   * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
   * which can avoid performing a shuffle.
   *
   * TODO Fix the Shuffle+Repartition data loss issue described in SPARK-23207.
   */
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

這個算子前后是一個寬依賴,字面就是重新分區的意思,與coalesce不同,repartition一定會將分區變成numPartitions個的!通過看源碼可知,它底層時調用的coalesce算子,并且使用該算子一定會shuffle。
coalesce與repartition怎么使用

到此,關于“coalesce與repartition怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

封丘县| 明溪县| 新密市| 彝良县| 崇明县| 佳木斯市| 大埔县| 墨竹工卡县| 海宁市| 乌兰浩特市| 武夷山市| 凌云县| 马龙县| 齐齐哈尔市| 眉山市| 永昌县| 临湘市| 平邑县| 光山县| 长沙市| 吴江市| 安泽县| 离岛区| 陕西省| 峡江县| 延庆县| 察雅县| 四会市| 延吉市| 焉耆| 望奎县| 玛曲县| 阿克苏市| 页游| 天长市| 湟源县| 绩溪县| 江川县| 桐柏县| 百色市| 五家渠市|