您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關SparkSQl中運行原理的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Spark SQL是Spark的一個模塊,用于處理結構化的數據,它提供了一個數據抽象DataFrame(最核心的編程抽象就是DataFrame),并且SparkSQL作為分布式SQL查詢引擎。
Spark SQL就是將SQL轉換成一個任務,提交到集群上運行,類似于Hive的執行方式。
將Spark SQL轉化為RDD,然后提交到集群執行。
(1)容易整合,Spark SQL已經集成在Spark中
(2)提供了統一的數據訪問方式:JSON、CSV、JDBC、Parquet等都是使用統一的方式進行訪問
(3)兼容 Hive
(4)標準的數據連接:JDBC、ODBC
在Spark中,DataFrame是一種以RDD為基礎的分布式數據集,類似于傳統數據庫中的二維表格。
DataFrame是組織成命名列的數據集。
它在概念上等同于關系數據庫中的表,但在底層具有更豐富的優化。
關系型數據庫中的表由表結構和數據組成,而DataFrame也類似,由schema(結構)和數據組成,其數據集是RDD。
DataFrame可以根據很多源進行構建,包括:結構化的數據文件,hive中的表,外部的關系型數據庫,以及RDD
上圖展示了Spark的模塊及各模塊之間的關系:
底層是Spark-core核心模塊,Spark每個模塊都有一個核心抽象,Spark-core的核心抽象是RDD,
Spark SQL等都基于RDD封裝了自己的抽象,在Spark SQL中是DataFrame/DataSet。
相對來說RDD是更偏底層的抽象,DataFrame/DataSet是在其上做了一層封裝,做了優化,使用起來更加方便。
從功能上來說,DataFrame/DataSet能做的事情RDD都能做,RDD能做的事情DataFrame/DataSet不一定能做。
DataFrame與RDD的主要區別在于:
DataFrame
DataFrame帶有schema元信息,即DataFrame所表示的二維表數據集的每一列都帶有名稱和類型。
使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數據源以及作用于DataFrame之上的變換進行了針對性的優化,最終達到大幅提升運行時效率的目標。
RDD
RDD,由于無從得知所存數據元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。
DataFrame和RDD聯系:
DataFrame底層是以RDD為基礎的分布式數據集,和RDD的主要區別的是:RDD中沒有schema信息,而DataFrame中數據每一行都包含schema
DataFrame = RDD[Row] + shcema
SparkSession是Spark 2.0引如的新概念。SparkSession為用戶提供了統一的切入點,來讓用戶學習spark的各項功能。
在spark的早期版本中,SparkContext是spark的主要切入點,由于RDD是主要的API,我們通過sparkcontext來創建和操作RDD。
對于每個其他的API,我們需要使用不同的context。
例如,對于Streming,我們需要使用StreamingContext;對于sql,使用sqlContext;對于Hive,使用hiveContext。
但是隨著DataSet和DataFrame的API逐漸成為標準的API,就需要為他們建立接入點。所以在spark2.0中,引入SparkSession作為DataSet和DataFrame API的切入點。
SparkSession封裝了SparkConf、SparkContext和SQLContext。為了向后兼容,SQLContext和HiveContext也被保存下來。
SparkSession實質上是SQLContext和HiveContext的組合(未來可能還會加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同樣是可以使用的。
SparkSession內部封裝了sparkContext,所以計算實際上是由sparkContext完成的。
----為用戶提供一個統一的切入點使用Spark各項功能
----允許用戶通過它調用DataFrame和Dataset相關 API來編寫程序
----減少了用戶需要了解的一些概念,可以很容易的與Spark進行交互
----與Spark交互之時不需要顯示的創建SparkConf, SparkContext以及 SQlContext,這些對象已經封閉在SparkSession中
case class People(val name:String,val age:Int) //可以聲明數據類型 object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設置運行模式為本地運行,不然默認是集群模式 //conf.setMaster("local") //默認是集群模式 //設置任務名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //設置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD People(line.split(",")(0),line.split(",")(1).trim().toInt) }) import Sqlsc.implicits._ //引入全部方法 //將RDD轉換成DataFrame val df = peopleRDD.toDF() //將DataFrame轉換成一個臨時的視圖 df.createOrReplaceTempView("people") //使用SQL語句進行查詢 Sqlsc.sql("select * from people").show() } }
object WordCount { def main(args:Array[String]):Unit={ val conf = new SparkConf() //設置運行模式為本地運行,不然默認是集群模式 //conf.setMaster("local") //默認是集群模式 //設置任務名 conf.setAppName("WordCount").setMaster("local") conf.set("spark.default.parallelism","5") //設置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據SparkContext生成SQLContext val array = Array("mark,14","kitty,23","dasi,45") //1.需要將RDD數據映射成Row,需要引入import org.apache.spark.sql.Row val peopleRDD = sc.parallelize(array).map(line=>{ //生成RDD val fields = line.split(",") Row(fields(0),fields(1).trim().toInt) }) //2.創建StructType定義結構 val st:StructType = StructType( //字段名,字段類型,是否可以為空 List( //傳參是列表類型,或者使用StructField("name", StringType, true) :: StructField("age", IntegerType, true) :: Nil來構建列表 StructField("name",StringType,true), StructField("age",IntegerType,true) ) ) //3.使用SparkSession建立DataFrame val df = Sqlsc.createDataFrame(peopleRDD,st) //將DataFrame轉換成一個臨時的視圖 df.createOrReplaceTempView("people") //使用SQL語句進行查詢 Sqlsc.sql("select * from people").show() } }
[{"name":"dafa","age":12},{"name":"safaw","age":17},{"name":"ge","age":34}]
def main(args:Array[String]):Unit={ val conf = new SparkConf() //設置運行模式為本地運行,不然默認是集群模式 //conf.setMaster("local") //默認是集群模式 //設置任務名 conf.setAppName("WordCount").setMaster("local") //設置SparkContext,是SparkCore的程序入口 val sc = new SparkContext(conf) val Sqlsc = new SQLContext(sc) //根據SparkContext生成SQLContext //通過json數據直接創建DataFrame val df = Sqlsc.read.json("E:\\1.json") //將DataFrame轉換成一個臨時的視圖 df.createOrReplaceTempView("people1") //使用SQL語句進行查詢 Sqlsc.sql("select * from people1").show() }
視圖是一個虛表,跟Mysql里的概念是一樣的,視圖基于實際的表而存在,其實質是一系列的查詢語句
局部視圖(Temoporary View):只在當前會話中有效,如果創建它的會話終止,則視圖也會消失。
全局視圖(Global Temporary View): 在全局范圍內有效,不同的Session中都可以訪問,生命周期是Spark的Application運行周期,全局視圖會綁定到系統保留的數據庫global_temp中,因此使用它的時候必須加上相應前綴。
創建局部視圖:df.createOrReplaceTempView("emp")
創建全局視圖:df.createOrReplaceGlobalTempView("empG")
spark.sql("select * from emp").show
spark.sql("select * from global_temp.empG").show //查詢全局視圖,需要添加前綴
spark.newSession.sql("select * from emp").show -----> 報錯,Table or View Not Found
spark.newSession.sql("select * from global_temp.empG").show ---->可以正常查詢
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //方式一 val df1 = sqlContext.read.json("E:\\666\\people.json") val df2 = sqlContext.read.parquet("E:\\666\\users.parquet") //方式二 val df3 = sqlContext.read.format("json").load("E:\\666\\people.json") val df4 = sqlContext.read.format("parquet").load("E:\\666\\users.parquet") //方式三,默認是parquet格式 val df5 = sqlContext.load("E:\\666\\users.parquet") //方式四,使用MySQL進行數據源讀取 val url = "jdbc:mysql://192.168.123.102:3306/hivedb" val table = "dbs" val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","root") //需要傳入Mysql的URL、表明、properties(連接數據庫的用戶名密碼) val df = sqlContext.read.jdbc(url,table,properties) df.createOrReplaceTempView("dbs") sqlContext.sql("select * from dbs").show()
使用Hive作為數據源:需要在pom.xml文件中添加依賴
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.3.0</version> </dependency>
開發環境則把resource文件夾下添加hive-site.xml文件,集群環境把hive的配置文件要發到$SPARK_HOME/conf目錄下
<configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://localhost:3306/hivedb?createDatabaseIfNotExist=true</value> <description>JDBC connect string for a JDBC metastore</description> <!-- 如果 mysql 和 hive 在同一個服務器節點,那么請更改 hadoop02 為 localhost --> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> <description>Driver class name for a JDBC metastore</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> <description>password to use against metastore database</description> </property> <property> <name>hive.metastore.warehouse.dir</name> <value>/hive/warehouse</value> <description>hive default warehouse, if nessecory, change it</description> </property> </configuration> hive-site.xml配置文件
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new HiveContext(sc) sqlContext.sql("select * from myhive.student").show()
val conf = new SparkConf().setAppName("TestDataFrame2").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val df1 = sqlContext.read.json("E:\\666\\people.json") //方式一 df1.write.json("E:\\111") df1.write.parquet("E:\\222") //方式二 df1.write.format("json").save("E:\\333") df1.write.format("parquet").save("E:\\444") //方式三 df1.write.save("E:\\555")
df1.write.format("parquet").mode(SaveMode.Ignore).save("E:\\444")
Dataset也是一個分布式數據容器,簡單來說是類似二維表,Dataset里頭存有schema數據結構信息和原生數據,Dataset的底層封裝的是RDD,當RDD的泛型是Row類型的時候,我們也可以稱它為DataFrame。即Dataset<Row> = DataFrame。DataFrame是特殊的Dataset。
Spark整合了Dataset和DataFrame,前者是有明確類型的數據集,后者是無明確類型的數據集。根據官方的文檔:
Dataset是一種強類型集合,與領域對象相關,可以使用函數或者關系進行分布式的操作。
每個Dataset也有一個無類型的視圖,叫做DataFrame,也就是關于Row的Dataset。
簡單來說,Dataset一般都是Dataset[T]形式,這里的T是指數據的類型,如上圖中的Person,而DataFrame就是一個Dataset[Row]。
Datasets是懶加載的,即只有actions被調用的時候才會觸發計算。在內部,Dataset代表一個邏輯計劃,用來描述產生數據需要的計算。當一個action被調用的時候,Spark的query優化器會優化這個邏輯計劃并以分布式的方式在物理上進行實際的計算操作。
(1,"Tom") (2,"Mary")
測試數據
(1)定義case class
case class MyData(a:Int,b:String)
(2)使用序列創建DataSet
val DS = Seq(MyData(1,"Tom"),MyData(2,"Mary")).toDS
(1)定義case class
case class Person(name:String,age:BigInt)
(2)讀入JSON的數據
val df = spark.read.json("/root/temp/people.json")
(3)將DataFrame轉換成DataSet
val PersonDS =df.as[Person]
(1)讀取HDFS的文件,直接創建DataSet
val lineDS = spark.read.text("hdfs://bigdata111:9000/input/data.txt").as[String]
(2)分詞操作,查詢長度大于3的單詞
val words = lineDS.flatMap(_.split(" ")).filter(_.length > 3)
words.show
words.collect
關于“SparkSQl中運行原理的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。