亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RDD血緣關系源碼詳解!

發布時間:2020-07-29 18:28:40 來源:網絡 閱讀:830 作者:Stitch_x 欄目:大數據
一、RDD的依賴關系

RDD的依賴關系分為兩類:寬依賴和窄依賴。我們可以這樣認為:

  • (1)窄依賴:每個parent RDD 的 partition 最多被 child RDD 的一個partition 使用。
  • (2)寬依賴:每個parent RDD partition 被多個 child RDD 的partition 使用。

窄依賴每個 child RDD 的 partition 的生成操作都是可以并行的,而寬依賴則需要所有的 parent RDD partition shuffle 結果得到后再進行。

二、org.apache.spark.Dependency.scala 源碼解析

Dependency是一個抽象類:

// Denpendency.scala
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

它有兩個子類:NarrowDependency 和 ShuffleDenpendency,分別對應窄依賴和寬依賴。

(1)NarrowDependency也是一個抽象類

定義了抽象方法getParents,輸入partitionId,用于獲得child RDD 的某個partition依賴的parent RDD的所有 partitions。

// Denpendency.scala
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {  
/**
   * Get the parent partitions for a child partition.
   * @param partitionId a partition of the child RDD
   * @return the partitions of the parent RDD that the child partition depends upon
   */
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

窄依賴又有兩個具體的實現:OneToOneDependency和RangeDependency。
(a)OneToOneDependency指child RDD的partition只依賴于parent RDD 的一個partition,產生OneToOneDependency的算子有map,filter,flatMap等。可以看到getParents實現很簡單,就是傳進去一個partitionId,再把partitionId放在List里面傳出去。

// Denpendency.scala
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
        (b)RangeDependency指child RDD partition在一定的范圍內一對一的依賴于parent RDD partition,主要用于union。

// Denpendency.scala
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)  
  extends NarrowDependency[T](rdd) {//inStart表示parent RDD的開始索引,outStart表示child RDD 的開始索引
  override def getParents(partitionId: Int): List[Int] = {    
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)//表示于當前索引的相對位置
    } else {
      Nil
    }
  }
}
(2)ShuffleDependency指寬依賴

表示一個parent RDD的partition會被child RDD的partition使用多次。需要經過shuffle才能形成。

// Denpendency.scala
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],    
    val partitioner: Partitioner,    
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {  //shuffle都是基于PairRDD進行的,所以傳入的RDD要是key-value類型的
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)  //獲取shuffleId
  val shuffleId: Int = _rdd.context.newShuffleId()  //向shuffleManager注冊shuffle信息
  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于shuffle涉及到網絡傳輸,所以要有序列化serializer,為了減少網絡傳輸,可以map端聚合,通過mapSideCombine和aggregator控制,還有key排序相關的keyOrdering,以及重輸出的數據如何分區的partitioner,還有一些class信息。Partition之間的關系在shuffle處戛然而止,因此shuffle是劃分stage的依據。

三、兩種依賴的區分

首先,窄依賴允許在一個集群節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然后filter操作;而寬依賴則需要首先計算好所有父分區數據,然后在節點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分區的父分區,而且不同節點之間可以并行計算;而對于一個寬依賴關系的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分區,因而需要整體重新計算。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

泽库县| 南充市| 腾冲县| 唐山市| 彭水| 石家庄市| 长寿区| 阿拉尔市| 枣阳市| 锦屏县| 百色市| 理塘县| 高台县| 五家渠市| 安徽省| 舒城县| 新竹县| 邯郸县| 益阳市| 石狮市| 丰宁| 萍乡市| 寻甸| 渑池县| 竹北市| 铅山县| 丹寨县| 固阳县| 远安县| 云龙县| 临汾市| 益阳市| 阳东县| 城固县| 安国市| 桦南县| 威海市| 沙雅县| 东乌| 昭觉县| 和田县|