亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何淺析Hive和Spark SQL讀文件時的輸入任務劃分

發布時間:2021-12-09 17:35:49 來源:億速云 閱讀:321 作者:柒染 欄目:大數據

如何淺析Hive和Spark SQL讀文件時的輸入任務劃分,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

我們就來講解Hive和Spark SQL是如何切分輸入路徑的。

Hive

Hive是起步較早的SQL on Hadoop項目,最早也是誕生于Hadoop中,所以輸入劃分這部分的代碼與Hadoop相關度非常高。現在Hive普遍使用的輸入格式是CombineHiveInputFormat,它繼承于HiveInputFormat,而HiveInputFormat實現了Hadoop的InputFormat接口,其中的getSplits方法用來獲取具體的劃分結果,劃分出的一份輸入數據被稱為一個“Split”。在執行時,每個Split對應到一個map任務。在劃分Split時,首先挑出不能合并到一起的目錄——比如開啟了事務功能的路徑。這些不能合并的目錄必須單獨處理,剩下的路徑交給私有方法getCombineSplits,這樣Hive的一個map task最多可以處理多個目錄下的文件。在實際操作中,我們一般只要通過set mapred.max.split.size=xx;即可控制文件合并的大小。當一個文件過大時,父類的getSplits也會幫我們完成相應的切分工作。

Spark SQL

Spark的表有兩種:DataSource表和Hive表。另外Spark后續版本中DataSource V2也將逐漸流行,目前還在不斷發展中,暫時就不在這里討論。我們知道Spark SQL其實底層是Spark RDD,而RDD執行時,每個map task會處理RDD的一個Partition中的數據(注意這里的Partition是RDD的概念,要和表的Partition進行區分)。因此,Spark SQL作業的任務切分關鍵在于底層RDD的partition如何切分。

Data Source表

Spark SQL的DataSource表在最終執行的RDD類為FileScanRDD,由FileSourceScanExec創建出來。在創建這種RDD的時候,具體的Partition直接作為參數傳給了構造函數,因此劃分輸入的方法也在DataSourceScanExec.scala文件中。具體分兩步:首先把文件劃分為PartitionFile,再將較小的PartitionFile進行合并。

第一步部分代碼如下:

  if (fsRelation.fileFormat.isSplitable(
    fsRelation.sparkSession, fsRelation.options, file.getPath)) {
    (0L until file.getLen by maxSplitBytes).map { offset =>val remaining = file.getLen - offsetval size = if (remaining > maxSplitBytes) maxSplitBytes else remainingval hosts = getBlockHosts(blockLocations, offset, size)PartitionedFile(
      partition.values, file.getPath.toUri.toString,
      offset, size, partitionDeleteDeltas, hosts)
    }
  } else {val hosts = getBlockHosts(blockLocations, 0, file.getLen)Seq(PartitionedFile(partition.values, file.getPath.toUri.toString,0, file.getLen, partitionDeleteDeltas, hosts))
  }

我們可以看出,Spark SQL首先根據文件類型判斷單個文件是否能夠切割,如果可以則按maxSplitBytes進行切割。如果一個文件剩余部分無法填滿maxSplitBytes,也單獨作為一個Partition。

第二部分代碼如下所示:

  splitFiles.foreach { file =>if (currentSize + file.length > maxSplitBytes) {
      closePartition()
    }// Add the given file to the current partition.currentSize += file.length + openCostInBytes
    currentFiles += file
  }

這樣我們就可以依次遍歷第一步切好的塊,再按照maxSplitBytes進行合并。注意合并文件時還需加上打開文件的預估代價openCostInBytes。那么maxSplitBytesopenCostInBytes這兩個關鍵參數怎么來的呢?

  val defaultMaxSplitBytes =
    fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes  val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes  val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism  val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum  val bytesPerCore = totalBytes / defaultParallelism  val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))

不難看出,主要是spark.sql.files.maxPartitionBytesspark.sql.files.openCostInBytes、調度器默認并發度以及所有輸入文件實際大小所控制。

Hive表

Spark SQL中的Hive表底層的RDD類為HadoopRDD,由HadoopTableReader類實現。不過這次,具體的Partition劃分還是依賴HadoopRDDgetPartitions方法,具體實現如下:

  override def getPartitions: Array[Partition] = {
    ...try {      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }      val array = new Array[Partition](inputSplits.size)      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
      ...
    }
  }

不難看出,在處理Hive表的時候,Spark SQL把任務劃分又交給了Hadoop的InputFormat那一套。不過需要注意的是,并不是所有Hive表都歸為這一類,Spark SQL會默認對ORC和Parquet的表進行轉化,用自己的Data Source實現OrcFileFormatParquetFileFormat來把這兩種表作為Data Source表來處理。

看完上述內容,你們掌握如何淺析Hive和Spark SQL讀文件時的輸入任務劃分的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

邛崃市| 惠安县| 东台市| 阜新| 开封县| 永平县| 二手房| 祁东县| 三门县| 大邑县| 安新县| 广州市| 正阳县| 临邑县| 乌什县| 大足县| 大同县| 平昌县| 西丰县| 宜宾县| 浙江省| 垫江县| 南漳县| 河津市| 桦甸市| 孝义市| 大厂| 怀仁县| 禄劝| 潼关县| 连南| 民丰县| 宿松县| 马龙县| 布尔津县| 牙克石市| 温州市| 保德县| 太原市| 洛南县| 兴安县|