亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

(版本定制)第8課:Spark Streaming源碼解讀之

發布時間:2020-03-01 17:59:42 來源:網絡 閱讀:531 作者:Spark_2016 欄目:大數據

本篇博客將詳細探討DStream模板下的RDD是如何被創建,然后被執行的。在開始敘述之前,先來思考幾個問題,本篇文章也就是基于此問題構建的。 
1. RDD是誰產生的? 
2. 如何產生RDD? 
帶著這兩個問題開啟我們的探索之旅。

DStream是RDD的模板,每隔一個Batch Interval會根據DStream模板生成一個對應的RDD,然后將RDD存儲到DStream中的generatedRDDs數據結構中,下面是存儲結構格式。

// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

1、簡單的WordCount程序

object WordCount {  def main(args:Array[String]): Unit ={
    val sparkConf = new SparkConf().setMaster("Master:7077").setAppName("WordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(10)) // Timer觸發頻率

    val lines = ssc.socketTextStream("Master",9999) //接收數據
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
首先我們先看看print方法,具體的代碼如下:
/**
* Print the first num elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print(num: Int): Unit = ssc.withScope {
 def foreachFunc: (RDD[T], Time) => Unit = {
   (rdd: RDD[T], time: Time) => {
     val firstNum = rdd.take(num + 1)
     // scalastyle:off println
     println("-------------------------------------------")
     println("Time: " + time)
     println("-------------------------------------------")
     firstNum.take(num).foreach(println)
     if (firstNum.length > num) println("...")
     println()
     // scalastyle:on println
   }
 }
 foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}

首先定義了一個函數,該函數用來從RDD中取出前幾條數據,并打印出結果與時間等,后面會調用foreachRDD函數。

private def foreachRDD(
   foreachFunc: (RDD[T], Time) => Unit,
   displayInnerRDDOps: Boolean): Unit = {
   new ForEachDStream(this,context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

/**
* Register this streaming as an output stream. This would ensure that RDDs of this
* DStream will be generated.
*/
private[streaming] def register(): DStream[T] = {
 ssc.graph.addOutputStream(this)
 this
}

def addOutputStream(outputStream: DStream[_]) {
 this.synchronized {
   outputStream.setGraph(this)
   outputStreams += outputStream
 }

在foreachRDD中new出了一個ForEachDStream對象,并將這個注冊給DStreamGraph,ForEachDStream對象也就是DStreamGraph中的outputStreams。

當每到達一個BatchInterval時候,就會調用DStreamingGraph中的generateJobs.

def generateJobs(time: Time): Seq[Job] = {
 logDebug("Generating jobs for time " + time)
 val jobs = this.synchronized {
   outputStreams.flatMap { outputStream =>
     val jobOption = outputStream.generateJob(time)
     jobOption.foreach(_.setCallSite(outputStream.creationSite))
     jobOption
   }
 }
 logDebug("Generated " + jobs.length + " jobs for time " + time)
 jobs
}

這里就會調用outputStream的generateJob方法


private[streaming] def generateJob(time: Time): Option[Job] = {
 getOrCompute(time) match {
   case Some(rdd) => {
     val jobFunc = () => {
       val emptyFunc = { (iterator: Iterator[T]) => {} }
       context.sparkContext.runJob(rdd, emptyFunc)
     }
     Some(new Job(time, jobFunc))
   }
   case None => None
 }
}

這里會調用getOrCompute(time)來產生新RDD,并將其存入到generatedRDDs中,整理的過程如下圖:
(版本定制)第8課:Spark Streaming源碼解讀之


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

娄烦县| 昭觉县| 通化市| 汉沽区| 福州市| 崇仁县| 广德县| 静海县| 古蔺县| 揭阳市| 安泽县| 盐源县| 句容市| 西乌| 花垣县| 庄河市| 康马县| 无棣县| 科技| 开封市| 宝兴县| 太仆寺旗| 封丘县| 特克斯县| 达拉特旗| 开化县| 罗江县| 师宗县| 油尖旺区| 大港区| 玛纳斯县| 离岛区| 绵阳市| 靖边县| 子长县| 延安市| 南雄市| 垣曲县| 长岭县| 新沂市| 宽城|