亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Parquet介紹及簡單使用

發布時間:2020-06-26 02:25:37 來源:網絡 閱讀:7996 作者:菜鳥的征程 欄目:大數據

==> 什么是parquet

        Parquet 是列式存儲的一種文件類型


==> 官網描述:

            Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language

        無論數據處理框架,數據模型或編程語言的選擇如何,Apache Parquet都是Hadoop生態系統中任何項目可用的列式存儲格式


==> 由來

    Parquet的靈感來自于2010年Google發表的Dremel論文,文中介紹了一種支持嵌套結構的存儲格式,并且使用了列式存儲的方式提升查詢性能,在Dremel論文中還介紹了Google如何使用這種存儲格式實現并行查詢的,如果對此感興趣可以參考論文和開源實現Apache Drill。

==> 特點:

    ---> 可以跳過不符合條件的數據,只讀取需要的數據,降低 IO 數據量

    ---> 壓縮編碼可以降低磁盤存儲空間(由于同一列的數據類型是一樣的,可以使用更高效的壓縮編碼(如 Run Length Encoding t  Delta Encoding)進一步節約存儲空間)

    ---> 只讀取需要的列,支持向量運算,能夠獲取更好的掃描性能

    ---> Parquet 格式是 Spark SQL 的默認數據源,可通過 spark.sql.sources.default 配置


==> parquet 常用操作

    ---> load 和 save 函數

// 讀取 Parquet 文件
val usersDF = spark.read.load("/test/users.parquet")

// 查詢 Schema 和數據
usersDF.printSchema
usersDF.show

// 查詢用戶的 name 和喜愛顏色并保存
usersDF.select($"name", $"favorite_color").write.save("/test/result/parquet")
// 驗證結果 可通過 printSchema 查詢數據結構,使用 show 查看數據

// 顯式指定文件格式: 加載 json 格式
val usersDF = spark.read.format("json").load("/test/people.json")

// 存儲模式(Save Modes) 
// 可以采用 SaveMode 執行存儲操作, SaveMode 定義 了對數據的處理模式,需要注意的是,這些保存模式不使用任何鎖定,不是原子操作
// 當使用 Overwrite 方式執行時,在輸出新數據之前,原數據就已經被刪除
usersDF.select($"name").write.save("/test/parquet1")   // 若 /test/parquet1 存在會報錯
usersDF.select($"name").wirte.mode("overwrite").save("/test/parquet1")        // 使用 overwrite 即可

// 將結果保存為表, 也可以進行分區, 分桶等操作: partitionBy  bucketBy
usersDF.select($"name").write.saveAsTable("table1")


    ---> Parquet文件 

            Parquet 是一個列格式而且用于多個數據處理系統中

       Spark SQL 提供支持對于 Parquet 文件的讀寫,也就是自動保存原始 數據的 Schema, 當寫 Parquet 文件時,所有的列被自動轉化為 nullable,因為兼容性的緣故


        ---- 讀取 Json 格式的數據,將其轉換成 parquet 格式,創建相應的表,使用 SQL 語句查詢

// 從 json 文件中讀入數據
val empJson = spark.read.json("/test/emp.json")
// 將數據保存為 parquet
empJson.write.mode("overwrite").parquet("/test/parquet")
// 讀取 parquet
val empParquet = spark.read.parquet("/test/parquet")
// 創建臨時表 emptable
empParquet.createOrReplaceTempView("emptalbe")
// 使用 SQL 語句執行查詢
spark.sql("select * from emptable where deptno=10 and sal>1500").show

        ---- Schematic 的合并: 先定義一個簡單的 Schema,然后逐漸增加列描述,用戶可以獲取多個有多個不同 Schema 但相互兼容的 Parquet 文件

// 創建第一個文件
val df1 = sc.makeRDD(1 to 5).map(x=> (x, x*2)).toDF("single", "double")
scala> df1.printSchema
root
 |-- single: integer (nullable = false)
 |-- double: integer (nullable = false)
 
 
// 創建第二個文件 
 scala> val df2 = sc.makeRDD(6 to 10).map(x=> (x, x*2)).toDF("single", "triple")
df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

scala> df2.printSchema
root
 |-- single: integer (nullable = false)
 |-- triple: integer (nullable = false)
  
 scala> df2.write.parquet("/data/testtable/key=2")

 // 合并上面的兩個文件
scala> val df3 = spark.read.option("mergeSchema", "true").parquet("/data/testtable")
df3: org.apache.spark.sql.DataFrame = [single: int, double: int ... 2 more fields]

scala> df3.printSchema
root
 |-- single: integer (nullable = true)
 |-- double: integer (nullable = true)
 |-- triple: integer (nullable = true)
 |-- key: integer (nullable = true)
 
 scala> df3.show
+------+------+------+---+
|single|double|triple|key|
+------+------+------+---+
|     8|  null|    16|  2|
|     9|  null|    18|  2|
|    10|  null|    20|  2|
|     3|     6|  null|  1|
|     4|     8|  null|  1|
|     5|    10|  null|  1|
|     6|  null|    12|  2|
|     7|  null|    14|  2|
|     1|     2|  null|  1|
|     2|     4|  null|  1|
+------+------+------+---+



    ---> Json Datasets(兩種寫法

// 第一種
scala> val df4 = spark.read.json("/app/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
df4: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df4.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

// 第二種
scala> val df5 = spark.read.format("json").load("/app/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
df5: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df5.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


    ---> JDBC 方式讀取關系型數據庫中的數據(需要將 JDBC 的驅動加入

// 將 JDBC 的驅動加入
bin/spark-shell --master spark://bigdata11:7077 --jars /root/temp/ojdbc6.jar --driver-class-path /root/temp/ojdbc6.jar

// 讀取 Oracle
val oracleEmp = spark.read.format("jdbc")
                    .option("url","jdbc:oracle:thin:@192.168.10.100:1521/orcl.example.com")
                    .option("dbtable","scott.emp")
                    .option("user","scott")
                    .option("password","tiger").load


    ---> 操作 Hive 的表

        ---- 把 hive 和 hadoop 的配置文件拷貝到sprke 的 conf 目錄下: hive-sit.xml, core-sit.xml, hdfs-sit.xml

        ---- 啟動 Spark-shell 時 指定mysql 數據庫的驅動程序

 ./bin/spark-shell --master spark://bigdata0:7077 --jars /data/tools/mysql-connector-java-5.1.43-bin.jar  --driver-class-path /data/tools/mysql-connector-java-5.1.43-bin.jar

        ---- 使用 Spark Shell 操作 Hive

// 創建表
spark.sql("create table ccc(key INT, value STRING) row format delimited fields terminated by ','")

// 導入數據
spark.sql("load data local path '/test/data.txt' into table ccc")

// 查詢數據
spark.sql("select * from ccc").show

        ---- 使用 Spark SQL 操作 Hive

show tables;
select * from ccc;



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西华县| 偏关县| 阿拉善右旗| 永靖县| 巍山| 马鞍山市| 阿图什市| 申扎县| 遂昌县| 攀枝花市| 周宁县| 上思县| 望都县| 舒兰市| 九龙县| 庆阳市| 金平| 霍林郭勒市| 清新县| 扎囊县| 洛浦县| 乌鲁木齐县| 集安市| 波密县| 信宜市| 南陵县| 图片| 华宁县| 舞钢市| 钟祥市| 昆山市| 西吉县| 寿宁县| 水城县| 穆棱市| 思南县| 翼城县| 确山县| 红原县| 西安市| 和平区|