您好,登錄后才能下訂單哦!
標簽(空格分隔): Spark的部分
- 一: 日志清洗的優化
- 二:Spark RDD
- 三:SparkContext三大功能
- 四:Spark on YARN
- 五: spark RDD 的 依賴
hdfs dfs -mkdir /apachelog/
hdfs dfs -put access_log /apachelogs
hdfs dfs -ls /apachelogs
執行結果報錯。
LogAnalyzer.scala
package com.ibeifeng.bigdata.spark.app.core
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by zhangyy on 2016/7/16.
*/
object LogAnalyzer {
def main(args: Array[String]) {
// step 0: SparkContext
val sparkConf = new SparkConf()
.setAppName("LogAnalyzer Applicaiton") // name
.setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn
// Create SparkContext
val sc = new SparkContext(sparkConf)
/** ================================================================== */
val logFile = "/apachelogs/access_log"
// step 1: input data
val accessLogs = sc.textFile(logFile)
// filer logs data
.filter(ApacheAccessLog.isValidateLogLine) // closures
/**
* parse log
*/
.map(line => ApacheAccessLog.parseLogLine(line))
/**
* The average, min, and max content size of responses returned from the server.
*/
val contentSizes = accessLogs.map(log => log.contentSize)
// compute
val avgContentSize = contentSizes.reduce(_ + _) / contentSizes.count()
val minContentSize = contentSizes.min()
val maxContentSize = contentSizes.max()
// println
printf("Content Size Avg: %s , Min : %s , Max: %s".format(
avgContentSize, minContentSize, maxContentSize
))
/**
* A count of response code's returned
*/
val responseCodeToCount = accessLogs
.map(log => (log.responseCode, 1))
.reduceByKey(_ + _)
.take(3)
println(
s"""Response Code Count: ${responseCodeToCount.mkString(", ")}"""
)
/**
* All IPAddresses that have accessed this server more than N times
*/
val ipAddresses = accessLogs
.map(log => (log.ipAddress, 1))
.reduceByKey( _ + _)
// .filter( x => (x._2 > 10))
.take(5)
println(
s"""IP Address : ${ipAddresses.mkString("< ", ", " ," >")}"""
)
/**
* The top endpoints requested by count
*/
val topEndpoints = accessLogs
.map(log => (log.endPoint, 1))
.reduceByKey(_ + _)
.top(3)(OrderingUtils.SecondValueOrdering)
// .map(tuple => (tuple._2, tuple._1))
// .sortByKey(false)
//.take(3)
//.map(tuple => (tuple._2, tuple._1))
println(
s"""Top Endpoints : ${topEndpoints.mkString("[", ", ", " ]")}"""
)
/** ================================================================== */
// Stop SparkContext
sc.stop()
}
}
ApacheAccessLog.scala
package com.ibeifeng.bigdata.spark.app.core
/**
* Created by zhangyy on 2016/7/16.
*
* 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800]
* "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1"
* 200 1234
*/
case class ApacheAccessLog (
ipAddress: String,
clientIndentd: String,
userId: String,
dateTime:String,
method: String,
endPoint: String,
protocol: String,
responseCode: Int,
contentSize: Long)
object ApacheAccessLog{
// regex
// 1.1.1.1 - - [21/Jul/2014:10:00:00 -0800] "GET /chapter1/java/src/main/java/com/databricks/apps/logs/LogAnalyzer.java HTTP/1.1" 200 1234
val PARTTERN ="""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
/**
*
* @param log
* @return
*/
def isValidateLogLine(log: String): Boolean = {
// parse log
val res = PARTTERN.findFirstMatchIn(log)
// invalidate
if (res.isEmpty) {
false
}else{
true
}
}
/**
*
* @param log
* @return
*/
def parseLogLine(log: String): ApacheAccessLog ={
// parse log
val res = PARTTERN.findFirstMatchIn(log)
// invalidate
if(res.isEmpty){
throw new RuntimeException("Cannot parse log line: " + log)
}
// get value
val m = res.get
// return
ApacheAccessLog( //
m.group(1), //
m.group(2),
m.group(3),
m.group(4),
m.group(5),
m.group(6),
m.group(7),
m.group(8).toInt,
m.group(9).toLong)
}
}
OrderingUtils.scala
package com.ibeifeng.bigdata.spark.app.core
import scala.math.Ordering
/**
* Created by zhangyy on 2016/7/16.
*/
object OrderingUtils {
object SecondValueOrdering extends Ordering[(String, Int)]{
/**
*
* @param x
* @param y
* @return
*/
override def compare(x: (String, Int), y: (String, Int)): Int = {
x._2.compare(y._2)
// x._2 compare y._2 // 1 to 10 | 1.to(10)
}
}
}
RDD,全稱為Resilient Distributed Datasets,是一個容錯的、并行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,并能控制數據的分區。同時,RDD還提供了一組豐富的操作來操作這些數據。在這些操作中,諸如map、flatMap、filter等轉換操作實現了monad模式,很好地契合了Scala的集合操作。除此之外,RDD還提供了諸如join、groupBy、reduceByKey等更為方便的操作(注意,reduceByKey是action,而非transformation),以支持常見的數據運算
val rdd = sc.textFile("/spark/rdd")
rdd.partitions.length
rdd.cache
rdd.count
一個分區默認一個task 分區去處理
默認是兩個分區去處理
1. A list of partitions : (protected def getPartitions: Array[Partition])
一系列的的分片,比如說64M一片,類似于hadoop中的split
2. A function ofr computing each split :( @DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T])
在每個分片上都有一個方式去迭代/執行/計算
3. A list of dependencies on other RDD :(protected def getDependencies: Seq[Dependency[_]] = deps)
一系列的依賴:RDDa 轉換為RDDb,轉換為 RDDc, 那么RDDc 就依賴于RDDb , RDDb 又依賴于RDDa
---
wordcount 程序:
## val rdd = sc.textFile("xxxx")
val wordRdd = rdd.flatMap(_.split(""))
val kvRdd = wordRdd.map((_,1))
val WordCountRdd = kvRdd.reduceByKey(_ + _)
# wrodcountRdd.saveAsTextFile("yy")
kvRdd <- wordRdd <- rdd
rdd.toDebugString
---
4. Optionlly,a Partitioner for kev-values RDDs (e,g,to say that the RDDis hash-partitioned) :(/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None)
5. optionlly,a list of preferred location(s) to compute each split on (e,g,block location for an HDFS file)
:(protected def getPreferredLocations(split: Partition): Seq[String] = Nil)
要運行的計算/執行最好在哪(幾)個機器上運行,數據本地型
為什么會有那幾個呢?
比如: hadoop 默認有三個位置,或者spark cache 到內存是可能同過StroageLevel 設置了多個副本,所以一個partition 可能返回多個最佳位置。
方式一:
并行化集合:
并行化集合
List\Seq\Array
SparkContext:
----
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T]
---
list 創建:
val list = List("11","22","33")
val listRdd = sc.parallelize(list)
listRdd.count
listRdd.frist
listRdd.take(10)
seq 創建:
val seq = Sep("aa","bb","cc")
val seqRdd = sc.parallelize(seq)
seqRdd.count
seqRdd.frist
seqRdd.take(10)
Array創建:
val array = Array(1,2,3,4,5)
val arryRdd = sc.parallelize(array)
arryRdd.first
arryRdd.count
arryRdd.take(10)
方式二:從外部存儲創建:
val disFile = sc.textFile("/input")
transformation 轉換
actions 執行出結果
persistence 基本都是cache過程
union()合并應用
val rdd1 = sc.parallelize(Array(1,2,3,4,5))
val rdd2 = sc.parallelize(Array(6,7,8,9,10))
val rdd = rdd1.union(rdd2)
rdd.collect
對于分布式計算框架來說,性能瓶頸
IO
-1,磁盤IO
-2,網絡IO
rdd1 -> rdd2
Shuffle
============================================
groupByKey() & reduceByKey()
在實際開發中,如果可以使用reduceByKey實現的功能,就不要使用groupBykey
使用reduceByKey有聚合功能,類似MapReduce中啟用了Combiner
===============
join()
-1,等值鏈接
-2,左連接
數據去重
結果數據
res-pre.txt - rdd1
新數據進行處理
web.tsv - 10GB - rdd2
解析里面的url,
如果res-pre.txt中包含,就不放入,不包含就加入或者不包含url進行特殊處理
rdd2.leftJoin(rdd1)
join()應用
val list =List("aa","bb","cc","dd")
val rdd1 = sc.parallelize(list).map((_, 1))
rdd1.collect
val list2 = List("bb","cc","ee","hh")
val rdd2 = sc.parallelize(list2).map((_, 1))
rdd2.collect
val rdd = rdd2.leftOuterJoin(rdd1)
rdd.collect
rdd.filter(tuple => tuple._2._2.isEmpty).collect
repartition()應用:
val rdd = sc.textFile("/spark/rdd")
rdd.repartition(2)
rdd.count
val list = List(("aa",1),("bb",4),("aa",56),("cc",0),("aa",89),("cc",34))
val rdd = sc.parallelize(list)
rdd.countByKey
wordcount 轉變
val rdd = sc.textFile("\input")
rdd.flatMap(_.split(" ")).map((_, 1)).countByKey
foreach() 應用
val list = List(1,2,3,4,5)
val rdd = sc.parallelize(list)
rdd.foreach(line => println(line))
分組topkey
aa 78
bb 98
aa 80
cc 98
aa 69
cc 87
bb 97
cc 86
aa 97
bb 78
bb 34
cc 85
bb 92
cc 72
bb 32
bb 23
val rdd = sc.textFile("/topkeytest")
val topRdd = rdd.map(line => line.split(" ")).map(arr => (arr(0), arr(1).toInt)).groupByKey().map(tuple => (tuple._1, tuple._2.toList.sorted.takeRight(3).reverse))
topRdd.collect
SparkContext 的作用:
-1,向Master(主節點,集群管理的主節點)申請資源,運行所有Executor
-2,創建RDD的入口
sc.textFile("") // 從外部存儲系統創建
sc.parxx() // 并行化,從Driver 中的集合創建
-3,調度管理JOB運行
DAGScheduler 、 TaskScheduler
--3.1
為每個Job構建DAG圖
--3.2
DAG圖劃分為Stage
按照RDD之間是否存在Shuffle
倒推(Stack)
--3.3
每個Stage中TaskSet
每個階段中Task代碼相同,僅僅處理數據不同
val list = List(".", "?", "!", "#", "$")
val braodCastList = sc.broadcast(list)
val wordRdd = sc.textFile("")
wordRdd.filter(word => {
braodCastList.value.contains(word)
})
1.spark的默認模式是local模式
spark-submint Scala_Project.jar
2. spark job 運行在客戶端集群模式:
spark-submit --master spark://192.168.3.1:7077 --deploy-mode cluster Scala_Project.jar
方式一:
--jars JARS
Comma-separated list of local jars to include on the driver and executor classpaths.
jar包的位置一定要寫決定路徑。
方式二:
--driver-class-path
Extra class path entries to pass to the driver. Note that jars added with --jars are automatically included in the classpath.
方式三:
SPARK_CLASSPATH
配置此環境變量
spark-app-submit.sh:
#!/bin/sh
## SPARK_HOME
SPARK_HOME=/opt/cdh6.3.6/spark-1.6.1-bin-2.5.0-cdh6.3.6
## SPARK CLASSPATH
SPARK_CLASSPATH=$SPARK_CLASSPATH:/opt/jars/sparkexternale/xx.jar:/opt/jars/sparkexternale/yy.jar
${SPARK_HOME}/bin/spark-submit --master spark://hadoop-senior01.ibeifeng.com:7077 --deploy-mode cluster /opt/tools/scalaProject.jar
cd /soft/hadoop/sbin
啟動rescouremanager:
./yarn-daemon.sh start resourcemanager
啟動nodemanger:
./yarn-daemon.sh start nodemanager
YARN
-1,分布式資源管理
主節點:ResouceManager
從節點:NodeManager -> 負責管理每臺機器上的資源(內存和CPU Core)
-2,資源調度
--1,容器Container
AM/Task
--2,對于運行在YARN上的每個應用,一個應用的管理者ApplicaitonMaster 資源申請和任務調度
Spark Application
-1,Driver Program
資源申請和任務調度
-2,Executors
每一個Executor其實就是一個JVM,就是一個進程
以spark deploy mode : client
AM
-- 全部都允許在Container中
Executor s
運行在Container中,類似于MapReduce任務中Map Task和Reduce Task一樣
Driver -> AM -> RM
spark-shell --master yarn
cd jars/
spark-submit --master yarn --deploy-mode cluster Scala_Project.jar
spark的wordcount
##
val rdd = sc.textFile("/input")
##
val wordRdd = rdd.flatMap(_.split(" "))
val kvRdd = wordRdd.map((_, 1))
val wordcountRdd = kvRdd.reduceByKey(_ + _)
##
wordcountRdd.collect
-----------------
input -> rdd -> wordRdd -> kvRdd : Stage-01 -> ShuffleMapStage -> SMT
->
wordcountRdd -> output :Stage-02 -> ResultStage -> ResultTask
1. 窄依賴(narrow dependencies)
1.1:子RDD的每個分區依賴于常數個父分區(即與數據規模無關)
1.2: 輸入輸出一對一的算子,且結過RDD 的分區結構不變,主要是map,flatMap
1.3:輸出一對一,單結果RDD 的分區結構發生變化,如:union,coalesce
1.4: 從輸入中選擇部分元素的算子,如filer,distinct,subtract,sample
2. 寬依賴(wide dependencies)
2.1: 子RDD的每個分區依賴于所有父RDD 分區
2.2:對單個RDD 基于key進行重組和reduce,如groupByKey,reduceByKey
2.3:對兩個RDD 基于key 進行join和重組,如:join
如何判斷RDD之間是窄依賴還是寬依賴:
父RDD的每個分區數據 給 子RDD的每個分區數據
1 -> 1
1 -> N : MapReduce 中 Shuffle
在MapReduce框架中,shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark作為MapReduce框架的一種實現,自然也實現了shuffle的邏輯。
Shuffle是MapReduce框架中的一個特定的phase,介于Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果需要按key哈希,并且分發到每一個Reducer上去,這個過程就是shuffle。由于shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。
下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介于Map phase和Reduce phase之間。
概念上shuffle就是一個溝通數據連接的橋梁,那么實際上shuffle(partition)這一部分是如何實現的的呢,下面我們就以Spark為例講一下shuffle在Spark中的實現。
1.首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×RM×R,其中MM是Map的個數,RR是Reduce的個數。
2.其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。
當Reducer啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。
這里的bucket是一個抽象概念,在實現中每個bucket可以對應一個文件,可以對應文件的一部分或是其他等。
3. Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有著諸多類似,一些概念可直接套用,例如,Shuffle 過程中,提供數據的一端,被稱作 Map 端,Map 端每個生成數據的任務稱為 Mapper,對應的,接收數據的一端,被稱作 Reduce 端,Reduce 端每個拉取數據的任務稱為 Reducer,Shuffle 過程本質上都是將 Map 端獲得的數據使用分區器進行劃分,并將數據發送給對應的 Reducer 的過程。
那些操作會引起shuffle
1. 具有重新調整分區操作,
eg: repartition,coalese
2. *ByKey eg: groupByKey,reduceByKey
3. 關聯操作 eg:join,cogroup
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。