您好,登錄后才能下訂單哦!
本文小編為大家詳細介紹“如何使用Spark Shell進行交互式分析”,內容詳細,步驟清晰,細節處理妥當,希望這篇“如何使用Spark Shell進行交互式分析”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。
Spark shell 提供了一種來學習該 API 比較簡單的方式, 以及一個強大的來分析數據交互的工具。在 Scala(運行于 Java 虛擬機之上, 并能很好的調用已存在的 Java 類庫)或者 Python 中它是可用的。通過在 Spark 目錄中運行以下的命令來啟動它:
Scala
Python
./bin/spark-shell
Spark 的主要抽象是一個稱為 Dataset 的分布式的 item 集合。Datasets 可以從 Hadoop 的 InputFormats(例如 HDFS文件)或者通過其它的 Datasets 轉換來創建。讓我們從 Spark 源目錄中的 README 文件來創建一個新的 Dataset:
scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string]
您可以直接從 Dataset 中獲取 values(值), 通過調用一些 actions(動作), 或者 transform(轉換)Dataset 以獲得一個新的。更多細節, 請參閱 API doc。
scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark
現在讓我們 transform 這個 Dataset 以獲得一個新的。我們調用 filter
以返回一個新的 Dataset, 它是文件中的 items 的一個子集。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我們可以鏈式操作 transformation(轉換)和 action(動作):
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
Dataset actions(操作)和 transformations(轉換)可以用于更復雜的計算。例如, 統計出現次數最多的單詞 :
Scala
Python
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
第一個 map 操作創建一個新的 Dataset, 將一行數據 map 為一個整型值。在 Dataset 上調用 reduce
來找到最大的行計數。參數 map
與 reduce
是 Scala 函數(closures), 并且可以使用 Scala/Java 庫的任何語言特性。例如, 我們可以很容易地調用函數聲明, 我們將定義一個 max 函數來使代碼更易于理解 :
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
一種常見的數據流模式是被 Hadoop 所推廣的 MapReduce。Spark 可以很容易實現 MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在這里, 我們調用了 flatMap
以 transform 一個 lines 的 Dataset 為一個 words 的 Dataset, 然后結合 groupByKey
和 count
來計算文件中每個單詞的 counts 作為一個 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts, 我們可以調用 collect
:
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Spark 還支持 Pulling(拉取)數據集到一個群集范圍的內存緩存中。例如當查詢一個小的 “hot” 數據集或運行一個像 PageRANK 這樣的迭代算法時, 在數據被重復訪問時是非常高效的。舉一個簡單的例子, 讓我們標記我們的 linesWithSpark
數據集到緩存中:
Scala
Python
scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15
使用 Spark 來探索和緩存一個 100 行的文本文件看起來比較愚蠢。有趣的是, 即使在他們跨越幾十或者幾百個節點時, 這些相同的函數也可以用于非常大的數據集。您也可以像 編程指南. 中描述的一樣通過連接 bin/spark-shell
到集群中, 使用交互式的方式來做這件事情。
假設我們希望使用 Spark API 來創建一個獨立的應用程序。我們在 Scala(SBT), Java(Maven)和 Python 中練習一個簡單應用程序。
Scala
Java
Python
我們將在 Scala 中創建一個非常簡單的 Spark 應用程序 - 很簡單的, 事實上, 它名為 SimpleApp.scala
:
/* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } }
注意, 這個應用程序我們應該定義一個 main()
方法而不是去擴展 scala.App
。使用 scala.App
的子類可能不會正常運行。
該程序僅僅統計了 Spark README 文件中每一行包含 ‘a’ 的數量和包含 ‘b’ 的數量。注意, 您需要將 YOUR_SPARK_HOME 替換為您 Spark 安裝的位置。不像先前使用 spark shell 操作的示例, 它們初始化了它們自己的 SparkContext, 我們初始化了一個 SparkContext 作為應用程序的一部分。
我們調用 SparkSession.builder
以構造一個 [[SparkSession]], 然后設置 application name(應用名稱), 最終調用 getOrCreate
以獲得 [[SparkSession]] 實例。
我們的應用依賴了 Spark API, 所以我們將包含一個名為 build.sbt
的 sbt 配置文件, 它描述了 Spark 的依賴。該文件也會添加一個 Spark 依賴的 repository:
name := "Simple Project" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
為了讓 sbt 正常的運行, 我們需要根據經典的目錄結構來布局 SimpleApp.scala
和 build.sbt
文件。在成功后, 我們可以創建一個包含應用程序代碼的 JAR 包, 然后使用 spark-submit
腳本來運行我們的程序。
# Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.11/simple-project_2.11-1.0.jar ... Lines with a: 46, Lines with b: 23
恭喜您成功的運行了您的第一個 Spark 應用程序!
更多 API 的深入概述, 從 RDD programming guide 和 SQL programming guide 這里開始, 或者看看 “編程指南” 菜單中的其它組件。
為了在集群上運行應用程序, 請前往 deployment overview.
最后, 在 Spark 的 examples
目錄中包含了一些 (Scala, Java, Python, R) 示例。您可以按照如下方式來運行它們:
# 針對 Scala 和 Java, 使用 run-example: ./bin/run-example SparkPi # 針對 Python 示例, 直接使用 spark-submit: ./bin/spark-submit examples/src/main/python/pi.py # 針對 R 示例, 直接使用 spark-submit: ./bin/spark-submit examples/src/main/r/dataframe.R
讀到這里,這篇“如何使用Spark Shell進行交互式分析”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。