亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

發布時間:2020-06-27 06:00:35 來源:網絡 閱讀:423 作者:Spark_2016 欄目:大數據

一:Spark集群開發環境準備

  1. 啟動HDFS,如下圖所示:

 第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

通過web端查看節點正常啟動,如下圖所示:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

2.啟動Spark集群,如下圖所示:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

通過web端查看集群啟動正常,如下圖所示:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

3.啟動start-history-server.sh,如下圖所示:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

二:HDFS的SparkStreaming案例實戰(代碼部分)

package com.dt.spark.SparkApps.sparkstreaming;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;

/**
 * Created by Jonson on 2016/4/17.
 */
public class SparkStreamingOnHDFS {

    public static void main(String[] args){
        /**
         * 
第一步:配置SparkConf
         * 1. 至少兩條線程:
         * 因為Spark Streaming應用程序在運行的時候,至少有一條線程用于不斷的循環接收數據,
         * 并且至少有一條線程用于處理接收的數據(否則的話無法有線程用于處理數據,隨著時間的推移,內存和磁盤都不堪重負)
         * 2. 對于集群而言,每個Executor一般而言肯定不止一個線程,對于處理Spark Streaming的應用程序而言,每個Executor一般
         * 分配多少個Core合適呢?根據我們過去的經驗,5個左右的core是最佳的(分配為奇數個Core為最佳)。
         */
        
final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");

        /**
         * 
第二步:創建SparkStreamingContext,這個是Spark Streaming應用程序所有功能的起始點和程序調度的核心
         * 1,SparkStreamingContext的構建可以基于SparkConf參數,也可以基于持久化SparkStreamingContext的內容
         * 來恢復過來(典型的場景是Driver崩潰后重新啟動,由于Spark Streaming具有連續7*24小時不間斷運行的特征,
         * 所有需要在Driver重新啟動后繼續上一次的狀態,此時狀態的恢復需要基于曾經的checkpoint)
         * 2,在一個Spark Streaming應用程序中可以創建若干個SparkStreamingContext對象,使用下一個SparkStreamingContext
         * 之前需要把前面正在運行的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大啟發:SparkStreamingContext
         * 是Spark core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業務邏輯
         */
//        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)設置每隔5秒


        final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
                return createContext(checkpointDirectory,conf);
            }
        };
        /**
         * 
可以從失敗中恢復Driver,不過還需要制定Driver這個進程運行在Cluster,并且提交應用程序的時候
         * 指定 --supervise;
         */
        
JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);

        /**
         * 
現在是監控一個文件系統的目錄
         * 此處沒有Receiver,Spark Streaming應用程序只是按照時間間隔監控目錄下每個Batch新增的內容(把新增的)
         * 作為RDD的數據來源生成原始的RDD
         */
        
//指定從HDFS中監控的目錄

        JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");
        /**
         * 
第四步:接下來就像對于RDD編程一樣基于DStreaming進行編程!!!
         * 原因是:
         *  DStreaming是RDD產生的模板(或者說類)。
         *  在Spark Streaming具體發生計算前其實質是把每個batch的DStream的操作翻譯成對RDD的操作!!
         *  對初始的DStream進行Transformation級別的處理,例如Map,filter等高階函數的編程,來進行具體的數據計算。
         *  第4.1步:將每一行的字符串拆分成單個單詞
         */
        
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() {

            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        /**
         * 
第4.2步:對初始的JavaRDD進行Transformation級別的處理,例如map,filter等高階函數等的編程,來進行具體的數據計算
         * 在4.1的基礎上,在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1)
         */
        
JavaPairDStream<String,Integer> pairs  = words.mapToPair(new PairFunction<String, String, Integer>() {

            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word,1);
            }
        });
        /**
         * 
第4.3步:在每個單詞實例計數的基礎上統計每個單詞在文件中出現的總次數
         */
        
JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {

            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        /**
         * 
此處的print并不會直接觸發Job的執行,因為現在的一切都是在Spark Streaming框架控制下的,對于Spark而言具體是否
         * 觸發真正的Job運行是基于設置的Duration時間間隔的
         * 一定要注意的是:Spark Streaming應用程序要想執行具體的Job,對DStream就必須有output Stream操作,
         * output Stream有很多類型的函數觸發,例如:print,saveAsTextFile,saveAsHadoopFiles等,其實最為重要的一個方法是
         * foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis,DB,DashBoard等上面,foreachRDD主要就是用來完成這些
         * 功能的,而且可以隨意的自定義具體數據到底存放在哪里!!!
         */
        
wordscount.print();

        /**
         * Spark Streaming
執行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的。
         * 當然其內部有消息循環體用于接收應用程序本身或者Executor的消息;
         */
        
jsc.start();

        jsc.awaitTermination();
        jsc.close();
    }
    /**
     * 
工廠化模式構建JavaStreamingContext
     */
    
private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){

        System.out.println("Creating new context");
        SparkConf = conf;
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5));
        ssc.checkpoint(checkpointDirectory);
        return ssc;
    }
}

代碼打包在集群中運行

  1. 創建目錄

  第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

2.腳本運行

  腳本內容如下:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

此時Spark Streaming會每隔5秒執行一次,不斷的掃描監控目錄下是否有新的文件。

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

3.上傳文件到HDFS中的Data目錄下

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

4.輸出結果

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

三:Spark Streaming on HDFS源碼解密

  1. JavaStreamingContextFactory的create方法可以創建JavaStreamingContext

  2. 而我們在具體實現的時候覆寫了該方法,內部就是調用createContext方法來具體實現。上述實戰案例中我們實現了createContext方法。

/*** Factory interface for creating a new JavaStreamingContext
 */
trait JavaStreamingContextFactory {
  def create(): JavaStreamingContext
}

3.checkpoint:

  一方面:保持容錯

  一方面保持狀態

  在開始和結束的時候每個batch都會進行checkpoint

** Sets the context to periodically checkpoint the DStream operations for master

 * fault-tolerance. The graph will be checkpointed every batch interval.
 * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
 */
def checkpoint(directory: String) {
  ssc.checkpoint(directory)
}
4.
remember:
流式處理中過一段時間數據就會被清理掉,但是可以通過remember可以延長數據在程序中的生命周期,另外延長RDD更長的時間。

應用場景:

假設數據流進來,進行ML或者Graphx的時候有時需要很長時間,但是bacth定時定條件的清除RDD,所以就可以通過remember使得數據可以延長更長時間。/**

5.

  如果設置了checkpoint ,重啟程序的時候,getOrCreate()會重新從checkpoint目錄中初始化出StreamingContext。

/* * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.

 * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
 * recreated from the checkpoint data. If the data does not exist, then the provided factory
 * will be used to create a JavaStreamingContext.
 *
 * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
 * @param factory        JavaStreamingContextFactory object to create a new JavaStreamingContext
 * @deprecated As of 1.4.0, replaced by
`getOrCreate` without JavaStreamingContextFactor.
 */
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
    checkpointPath: String,
    factory: JavaStreamingContextFactory
  ): JavaStreamingContext = {
  val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
    factory.create.ssc
  })
  new JavaStreamingContext(ssc)
}

異常問題思考:

第85課:基于HDFS的SparkStreaming案例實戰和內幕源碼解密

為啥會報錯?
  1. Streaming會定期的進行checkpoint。

  2. 重新啟動程序的時候,他會從曾經checkpoint的目錄中,如果沒有做額外配置的時候,所有的信息都會放在checkpoint的目錄中(包括曾經應用程序信息),因此下次再次啟動的時候就會報錯,無法初始化ShuffleDStream。

總結:

使用Spark Streaming可以處理各種數據來源類型,如:數據庫、HDFS,服務器log日志、網絡流,其強大超越了你想象不到的場景,只是很多時候大家不會用,其真正原因是對Spark、spark streaming本身不了解。

備注:

資料來源于:DT_大數據夢工廠(IMF傳奇行動絕密課程)-IMF

更多私密內容,請關注微信公眾號:DT_Spark

如果您對大數據Spark感興趣,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580

Life is short,you need to Spark!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

岐山县| 定边县| 抚松县| 临漳县| 南安市| 汝阳县| 禹城市| 金堂县| 织金县| 麻江县| 渑池县| 安丘市| 普安县| 来凤县| 双城市| 克拉玛依市| 彝良县| 当涂县| 彰武县| 桐梓县| 交口县| 堆龙德庆县| 四川省| 广平县| 龙泉市| 蒲城县| 邯郸市| 灵璧县| 绥宁县| 石门县| 紫金县| 禄劝| 托里县| 广宗县| 微博| 德惠市| 鞍山市| 襄汾县| 桓仁| 营山县| 绩溪县|