您好,登錄后才能下訂單哦!
一:Spark集群開發環境準備
啟動HDFS,如下圖所示:
通過web端查看節點正常啟動,如下圖所示:
2.啟動Spark集群,如下圖所示:
通過web端查看集群啟動正常,如下圖所示:
3.啟動start-history-server.sh,如下圖所示:
二:HDFS的SparkStreaming案例實戰(代碼部分)
package com.dt.spark.SparkApps.sparkstreaming;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
import java.util.Arrays;
/**
* Created by Jonson on 2016/4/17.
*/
public class SparkStreamingOnHDFS {
public static void main(String[] args){
/**
* 第一步:配置SparkConf
* 1. 至少兩條線程:
* 因為Spark Streaming應用程序在運行的時候,至少有一條線程用于不斷的循環接收數據,
* 并且至少有一條線程用于處理接收的數據(否則的話無法有線程用于處理數據,隨著時間的推移,內存和磁盤都不堪重負)
* 2. 對于集群而言,每個Executor一般而言肯定不止一個線程,對于處理Spark Streaming的應用程序而言,每個Executor一般
* 分配多少個Core合適呢?根據我們過去的經驗,5個左右的core是最佳的(分配為奇數個Core為最佳)。
*/
final SparkConf conf = new SparkConf().setMaster("spark://Master:7077").setAppName("SparkOnStreamingOnHDFS");
/**
* 第二步:創建SparkStreamingContext,這個是Spark Streaming應用程序所有功能的起始點和程序調度的核心
* 1,SparkStreamingContext的構建可以基于SparkConf參數,也可以基于持久化SparkStreamingContext的內容
* 來恢復過來(典型的場景是Driver崩潰后重新啟動,由于Spark Streaming具有連續7*24小時不間斷運行的特征,
* 所有需要在Driver重新啟動后繼續上一次的狀態,此時狀態的恢復需要基于曾經的checkpoint)
* 2,在一個Spark Streaming應用程序中可以創建若干個SparkStreamingContext對象,使用下一個SparkStreamingContext
* 之前需要把前面正在運行的SparkStreamingContext對象關閉掉,由此,我們獲得一個重大啟發:SparkStreamingContext
* 是Spark core上的一個應用程序而已,只不過Spark Streaming框架箱運行的話需要Spark工程師寫業務邏輯
*/
// JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));//Durations.seconds(5)設置每隔5秒
final String checkpointDirectory = "hdfs://Master:9000/library/SparkStreaming/Checkpoint_Data";
JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
return createContext(checkpointDirectory,conf);
}
};
/**
* 可以從失敗中恢復Driver,不過還需要制定Driver這個進程運行在Cluster,并且提交應用程序的時候
* 指定 --supervise;
*/
JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
/**
* 現在是監控一個文件系統的目錄
* 此處沒有Receiver,Spark Streaming應用程序只是按照時間間隔監控目錄下每個Batch新增的內容(把新增的)
* 作為RDD的數據來源生成原始的RDD
*/
//指定從HDFS中監控的目錄
JavaDStream lines = jsc.textFileStream("hdfs://Master:9000/library/SparkStreaming/Data");
/**
* 第四步:接下來就像對于RDD編程一樣基于DStreaming進行編程!!!
* 原因是:
* DStreaming是RDD產生的模板(或者說類)。
* 在Spark Streaming具體發生計算前其實質是把每個batch的DStream的操作翻譯成對RDD的操作!!
* 對初始的DStream進行Transformation級別的處理,例如Map,filter等高階函數的編程,來進行具體的數據計算。
* 第4.1步:將每一行的字符串拆分成單個單詞
*/
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String,String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
/**
* 第4.2步:對初始的JavaRDD進行Transformation級別的處理,例如map,filter等高階函數等的編程,來進行具體的數據計算
* 在4.1的基礎上,在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1)
*/
JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String,Integer>(word,1);
}
});
/**
* 第4.3步:在每個單詞實例計數的基礎上統計每個單詞在文件中出現的總次數
*/
JavaPairDStream<String,Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
/**
* 此處的print并不會直接觸發Job的執行,因為現在的一切都是在Spark Streaming框架控制下的,對于Spark而言具體是否
* 觸發真正的Job運行是基于設置的Duration時間間隔的
* 一定要注意的是:Spark Streaming應用程序要想執行具體的Job,對DStream就必須有output Stream操作,
* output Stream有很多類型的函數觸發,例如:print,saveAsTextFile,saveAsHadoopFiles等,其實最為重要的一個方法是
* foraeachRDD,因為Spark Streaming處理的結果一般都會放在Redis,DB,DashBoard等上面,foreachRDD主要就是用來完成這些
* 功能的,而且可以隨意的自定義具體數據到底存放在哪里!!!
*/
wordscount.print();
/**
* Spark Streaming執行引擎也就是Driver開始運行,Driver啟動的時候是位于一條新的線程中的。
* 當然其內部有消息循環體用于接收應用程序本身或者Executor的消息;
*/
jsc.start();
jsc.awaitTermination();
jsc.close();
}
/**
* 工廠化模式構建JavaStreamingContext
*/
private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf){
System.out.println("Creating new context");
SparkConf = conf;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,Durations.seconds(5));
ssc.checkpoint(checkpointDirectory);
return ssc;
}
}
代碼打包在集群中運行
創建目錄
2.腳本運行
腳本內容如下:
此時Spark Streaming會每隔5秒執行一次,不斷的掃描監控目錄下是否有新的文件。
3.上傳文件到HDFS中的Data目錄下
4.輸出結果
三:Spark Streaming on HDFS源碼解密
JavaStreamingContextFactory的create方法可以創建JavaStreamingContext
而我們在具體實現的時候覆寫了該方法,內部就是調用createContext方法來具體實現。上述實戰案例中我們實現了createContext方法。
/*** Factory interface for creating a new JavaStreamingContext
*/
trait JavaStreamingContextFactory {
def create(): JavaStreamingContext
}
3.checkpoint:
一方面:保持容錯
一方面保持狀態
在開始和結束的時候每個batch都會進行checkpoint
** Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. The graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
*/
def checkpoint(directory: String) {
ssc.checkpoint(directory)
}
4.remember:
流式處理中過一段時間數據就會被清理掉,但是可以通過remember可以延長數據在程序中的生命周期,另外延長RDD更長的時間。
應用場景:
假設數據流進來,進行ML或者Graphx的時候有時需要很長時間,但是bacth定時定條件的清除RDD,所以就可以通過remember使得數據可以延長更長時間。/**
5.
如果設置了checkpoint ,重啟程序的時候,getOrCreate()會重新從checkpoint目錄中初始化出StreamingContext。
/* * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
* recreated from the checkpoint data. If the data does not exist, then the provided factory
* will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
* @deprecated As of 1.4.0, replaced by `getOrCreate` without JavaStreamingContextFactor.
*/
@deprecated("use getOrCreate without JavaStreamingContextFactor", "1.4.0")
def getOrCreate(
checkpointPath: String,
factory: JavaStreamingContextFactory
): JavaStreamingContext = {
val ssc = StreamingContext.getOrCreate(checkpointPath, () => {
factory.create.ssc
})
new JavaStreamingContext(ssc)
}
異常問題思考:
為啥會報錯?
Streaming會定期的進行checkpoint。
重新啟動程序的時候,他會從曾經checkpoint的目錄中,如果沒有做額外配置的時候,所有的信息都會放在checkpoint的目錄中(包括曾經應用程序信息),因此下次再次啟動的時候就會報錯,無法初始化ShuffleDStream。
總結:
備注:
資料來源于:DT_大數據夢工廠(IMF傳奇行動絕密課程)-IMF
更多私密內容,請關注微信公眾號:DT_Spark
如果您對大數據Spark感興趣,可以免費聽由王家林老師每天晚上20:00開設的Spark永久免費公開課,地址YY房間號:68917580
Life is short,you need to Spark!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。