亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MaxCompute Spark開發的示例分析

發布時間:2021-12-16 21:46:40 來源:億速云 閱讀:147 作者:柒染 欄目:云計算

這期內容當中小編將會給大家帶來有關MaxCompute Spark開發的示例分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

MaxCompute Spark開發

0. 概述

MaxCompute Spark是MaxCompute提供的兼容開源的Spark計算服務,它在統一的計算資源和數據集權限體系之上,提供Spark計算框架,支持用戶以熟悉的開發使用方式提交運行Spark作業,以滿足更豐富的數據處理分析場景。

下面將重點介紹MaxCompute Spark能夠支撐的應用場景,同時說明開發的依賴條件和環境準備,重點對Spark作業開發、提交到MaxCompute集群執行、診斷進行介紹。

1. 前提條件

MaxCompute Spark是阿里云提供的Spark on MaxCompute的解決方案,能夠讓Spark應用運行在托管的MaxCompute計算環境中。為了能夠在MaxCompute環境中安全地運行Spark作業,MaxCompute提供了以下SDK和MaxCompute Spark定制發布包。

SDK定位于開源應用接入MaxCompute SDK:
提供了集成所需的API說明以及相關功能Demo,用戶可以基于項目提供的Spark-1.x以及Spark-2.x的example項目構建自己的應用,并且提交到MaxCompute集群上。
MaxCompute Spark客戶端發布包:
集成了MaxCompute認證功功能,作為客戶端工具,用于通過Spark-submit方式提交作業到MaxCompute項目中運行,目前提供了面向Spark1.x和Spark2.x的2個發布包:spark-1.6.3和spark-2.3.0 SDK在開發時,可以通過配置Maven依賴進行引用。Spark客戶端需要根據開發的Spark版本,提前下載。如,需要開發Spark1.x應用,應下載spark-1.6.3版本客戶端;如需開發Spark2.x應用,應下載spark-2.3.0客戶端。

2. 開發環境準備

2.1 Maxcompute Spark客戶端準備

MaxCompute Spark發布包:集成了MaxCompute認證功功能,作為客戶端工具,用于通過Spark-submit方式提交作業到MaxCompute項目中運行,目前提供了面向Spark1.x和Spark2.x的2個發布包:

  • spark-1.6.3

  • spark-2.3.0

請根據需要開發的Spark版本,選擇合適的版本下載并解壓Maxcompute Spark發布包。

2.2 設置環境變量

JAVA_HOME設置

# 盡量使用JDK 1.7+ 1.8+ 最佳
export JAVA_HOME=/path/to/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH

SPARK_HOME設置

export SPARK_HOME=/path/to/spark_extracted_package
export PATH=$SPARK_HOME/bin:$PATH

2.3 設置Spark-defaults.conf

$SPARK_HOME/conf
路徑下存在spark-defaults.conf.template文件,這個可以作為spark-defaults.conf的模版,需要在該文件中設置MaxCompute相關的賬號信息后,才可以提交Spark任務到MaxCompute。默認配置內容如下,將空白部分根據實際的賬號信息填上即可,其余的配置可以保持不變。

# MaxCompute賬號信息
spark.hadoop.odps.project.name =
spark.hadoop.odps.access.id =
spark.hadoop.odps.access.key =
# 以下配置保持不變
spark.sql.catalogImplementation=odps
spark.hadoop.odps.task.major.version = cupid_v2
spark.hadoop.odps.cupid.container.image.enable = true
spark.hadoop.odps.cupid.container.vm.engine.type = hyper
spark.hadoop.odps.end.point = http://service.cn.maxcompute.aliyun.com/api
spark.hadoop.odps.runtime.end.point = http://service.cn.maxcompute.aliyun-inc.com/api

3. 訪問MaxCompute表所需依賴

若作業需要訪問MaxCompute表,需要依賴odps-spark-datasource模塊,本節介紹如何把該依賴編譯安裝到本地maven倉庫;若無需訪問可直接跳過。

  1. git clone代碼,github地址: https://github.com/aliyun/aliyun-cupid-sdk/tree/3.3.2-public

#git clone git@github.com:aliyun/aliyun-cupid-sdk.git

  1. 編譯模塊

#cd ${path to aliyun-cupid-sdk}
#git checkout 3.3.2-public

// 編譯并安裝cupid-sdk
#cd ${path to aliyun-cupid-sdk}/core/cupid-sdk/
#mvn clean install -DskipTests

// 編譯并安裝datasource。依賴cupid-sdk
// for spark-2.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-2.x/datasource
# mvn clean install -DskipTests
// for spark-1.x
# cd ${path to aliyun-cupid-sdk}/spark/spark-1.x/datasource
#mvn clean install -DskipTests

  1. 添加依賴

<!-- Spark-1.x請依賴此模塊 -->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.10</artifactId>
<version>3.3.2-public</version>
</dependency>

<!-- Spark-2.x請依賴此模塊 -->
<dependency>
  <groupId>com.aliyun.odps</groupId>
  <artifactId>odps-spark-datasource_2.11</artifactId>
  <version>3.3.2-public</version>
</dependency>

4. OSS依賴

若作業需要訪問OSS,直接添加以下依賴即可

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>hadoop-fs-oss</artifactId>
    <version>3.3.2-public</version>
</dependency>

5. 應用開發

MaxCompute產品提供了兩個應用構建的模版,用戶可以基于此模版進行開發,最后統一構建整個項目后用生成的應用包即可直接提交到MaxCompute集群上運行Spark應用。

5.1 通過模版構建應用

MaxCompute Spark提供兩個應用構建模版,用戶可以基于此模版進行開發,最后統一構建整個項目后用生成的應用包即可直接提交到MaxCompute集群上運行Spark應用。首先需要把代碼clone下來

#git clone git@github.com:aliyun/aliyun-cupid-sdk.git
#cd aliyun-cupid-sdk
#checkout 3.3.2-public
#cd archetypes

// for Spark-1.x
sh Create-AliSpark-1.x-APP.sh spark-1.x-demo /tmp

// for Spark-2.x
Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp

以上命令會在/tmp目錄下創建名為 spark-1.x-demo(spark-2.x-demo)的maven project,執行以下命令進行編譯和提交作業:

#cd /tmp/spark-2.x/demo
#mvn clean package

// 提交作業
$SPARK_HOME/bin/spark-submit \
--master yarn-cluster \
--class SparkPi \
/tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar

# Usage: sh Create-AliSpark-2.x-APP.sh <app_name> <target_path>
sh Create-AliSpark-2.x-APP.sh spark-2.x-demo /tmp/
cd /tmp/spark-2.x-demo
mvn clean package
# 冒煙測試
# 1 利用編譯出來的shaded jar包
# 2 按照文檔所示下載MaxCompute Spark客戶端
# 3 參考文檔”置環境變量”指引,填寫MaxCompute項目相關配置項
# 執行spark-submit命令 如下
$SPARK_HOME/bin/spark-submit \
        --master yarn-cluster \
        --class SparkPi \
      /tmp/spark-2.x-demo/target/AliSpark-2.x-quickstart-1.0-SNAPSHOT-shaded.jar

5.2 Java/Scala開發樣例

Spark-1.x

pom.xml 須知
請注意 用戶構建Spark應用的時候,由于是用MaxCompute提供的Spark客戶端去提交應用,故需要注意一些依賴scope的定義

  • spark-core spark-sql等所有spark社區發布的包,用provided scope

  • odps-spark-datasource 用默認的compile scope

<!-- spark相關依賴, provided -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>

<!-- datasource依賴, 用于訪問MaxCompute表 -->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
    <version>3.3.2-public</version>
</dependency>

案例說明

WordCount

詳細代碼
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.WordCount \
      ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Spark-SQL on MaxCompute Table

詳細代碼
提交方式

# 運行可能會報Table Not Found的異常,因為用戶的MaxCompute Project中沒有代碼中指定的表
# 可以參考代碼中的各種接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.sparksql.SparkSQL \
      ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

GraphX PageRank

詳細代碼
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.graphx.PageRank \
      ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Mllib Kmeans-ON-OSS

詳細代碼
提交方式

# 代碼中的OSS賬號信息相關需要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
      ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

OSS UnstructuredData

詳細代碼
提交方式

# 代碼中的OSS賬號信息相關需要填上,再編譯提交
conf.set("spark.hadoop.fs.oss.accessKeyId", "***")
conf.set("spark.hadoop.fs.oss.accessKeySecret", "***")
conf.set("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
      ${path to aliyun-cupid-sdk}/spark/spark-1.x/spark-examples/target/spark-examples_2.10-version-shaded.jar

Spark-2.x

pom.xml 須知
請注意 用戶構建Spark應用的時候,由于是用MaxCompute提供的Spark客戶端去提交應用,故需要注意一些依賴scope的定義

  • spark-core spark-sql等所有spark社區發布的包,用provided scope

  • odps-spark-datasource 用默認的compile scope

<!-- spark相關依賴, provided -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.binary.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>cupid-sdk</artifactId>
    <scope>provided</scope>
</dependency>

<!-- datasource依賴, 用于訪問MaxCompute表 -->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
    <version>3.3.2-public</version>
</dependency>

案例說明

WordCount

詳細代碼
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.WordCount \
      ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

Spark-SQL 操作MaxCompute表

詳細代碼
提交方式

# 運行可能會報Table Not Found的異常,因為用戶的MaxCompute Project中沒有代碼中指定的表
# 可以參考代碼中的各種接口,實現對應Table的SparkSQL應用
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.sparksql.SparkSQL \
      ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

GraphX PageRank

詳細代碼
提交方式

Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.graphx.PageRank \
      ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

Mllib Kmeans-ON-OSS

KmeansModelSaveToOss
詳細代碼
提交方式

# 代碼中的OSS賬號信息相關需要填上,再編譯提交
val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.accessKeyId", "***")
      .config("spark.hadoop.fs.oss.accessKeySecret", "***")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("KmeansModelSaveToOss")
      .getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.mllib.KmeansModelSaveToOss \
      ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

OSS UnstructuredData

SparkUnstructuredDataCompute
詳細代碼
提交方式

# 代碼中的OSS賬號信息相關需要填上,再編譯提交
val spark = SparkSession
      .builder()
      .config("spark.hadoop.fs.oss.accessKeyId", "***")
      .config("spark.hadoop.fs.oss.accessKeySecret", "***")
      .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-zmf.aliyuncs.com")
      .appName("SparkUnstructuredDataCompute")
      .getOrCreate()
Step 1. build aliyun-cupid-sdk
Step 2. properly set spark.defaults.conf
Step 3. bin/spark-submit --master yarn-cluster --class \
      com.aliyun.odps.spark.examples.oss.SparkUnstructuredDataCompute \
      ${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

PySpark開發樣例

需要文件
若需要訪問MaxCompute表,則需要參考第三節(訪問MaxCompute表所需依賴)編譯datasource包

SparkSQL應用示例(spark1.6)

from pyspark import SparkContext, SparkConf
from pyspark.sql import OdpsContext

if __name__ == '__main__':
    conf = SparkConf().setAppName("odps_pyspark")
    sc = SparkContext(conf=conf)
    sql_context = OdpsContext(sc)
    df = sql_context.sql("select id, value from cupid_wordcount")
    df.printSchema()
    df.show(200)
   
    df_2 = sql_context.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
    df_2.show(200)

    #Create Drop Table
    sql_context.sql("create table TestCtas as select * from cupid_wordcount").show()
    sql_context.sql("drop table TestCtas").show()

提交運行:

./bin/spark-submit \
--jars ${path to odps-spark-datasource_2.10-3.3.2-public.jar} \
example.py

SparkSQL應用示例(spark2.3)

from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder.appName("spark sql").getOrCreate()

    df = spark.sql("select id, value from cupid_wordcount")
    df.printSchema()
    df.show(10, 200)

    df_2 = spark.sql("SELECT product,category,revenue FROM (SELECT product,category,revenue, dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank  FROM productRevenue) tmp WHERE  rank <= 2");
    df_2.printSchema()
    df_2.show(10, 200)

    df_3 = spark.sql("select id, value from cupid_partition_table1 where pt1 = 'part1'")
    df_3.show(10, 200)

    #Create Drop Table
    spark.sql("create table TestCtas as select * from cupid_wordcount").show()
    spark.sql("drop table TestCtas").show()

提交運行:

spark-submit --master yarn-cluster \
--jars ${path to odps-spark-datasource_2.11-3.3.2-public.jar \
example.py

6. 通過Spark訪問VPC環境內服務

對于用戶使用Spark on MaxCompute對VPC環境內的RDS、Redis、ECS主機部署的服務等,受限于VPC的訪問限制,暫時還無法訪問,即將在近期支持。

7. 如何把開源Spark代碼遷移到Spark on MaxCompute

case1. 作業無需訪問MaxCompute表和OSS
用戶jar包可直接運行,參照第二節準備開發環境和修改配置。注意,對于spark或hadoop的依賴必須設成provided。
case2. 作業需要訪問MaxCompute
參考第三節編譯datasource并安裝到本地maven倉庫,在pom中添加依賴后重新打包即可。
case3. 作業需要訪問OSS
參考第四節在pom中添加依賴后重新打包即可。

8. 任務提交執行

目前MaxCompute Spark支持以下幾種運行方式:local模式,cluster模式,和在DataWorks中執行模式。

8.1 Local模式

local模式主要是讓用戶能夠方便的調試應用代碼,使用方式跟社區相同,我們添加了用tunnel讀寫ODPS表的功能。用戶可以在ide和命令行中使用該模式,需要添加配置spark.master=local[N],其中N表示執行該模式所需要的cpu資源。此外,local模式下的讀寫表是通過讀寫tunnel完成的,需要在Spark-defaults.conf中增加tunnel配置項(請根據MaxCompute項目所在的region及網絡環境填寫對應的Tunnel Endpoint地址):tunnel_end_point=http://dt.cn-beijing.maxcompute.aliyun.com。命令行執行該模式的方式如下:

1.bin/spark-submit --master local[4] \
--class com.aliyun.odps.spark.examples.SparkPi \
${path to aliyun-cupid-sdk}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

8.2 Cluster模式

在Cluster模式中,用戶需要指定自定義程序入口Main,Main結束(Success or Fail)spark job就會結束。使用場景適合于離線作業,可以與阿里云DataWorks產品結合進行作業調度。命令行提交方式如下:

1.bin/spark-submit --master yarn-cluster \
–class SparkPi \
${ProjectRoot}/spark/spark-2.x/spark-examples/target/spark-examples_2.11-version-shaded.jar

8.3 DataWorks執行模式

用戶可以在DataWorks中運行MaxCompute Spark離線作業(cluster模式),以方便與其他類型執行節點集成和調度。

用戶需要在DataWorks的業務流程中上傳并提交(記得要單擊"提交"按鈕)資源:

MaxCompute Spark開發的示例分析

第二步:在創建的業務流程中,從數據開發組件中選擇ODPS Spark節點。

MaxCompute Spark開發的示例分析
雙擊拖拽到工作流的Spark節點,對Spark作業進行任務定義: 
          MaxCompute Spark開發的示例分析

選擇Spark的版本、任務使用的開發語言,并指定任務所使用的資源文件。這里的資源文件就是第一步在業務流程中預先上傳并發布的資源文件。同時,您還可以指定提交作業時的配置項,如executor的數量、內存大小等配置項。同時設置配置項:spark.hadoop.odps.cupid.webproxy.endpoint(取值填寫項目所在region的endpoint,如http://service.cn.maxcompute.aliyun-inc.com/api)、spark.hadoop.odps.moye.trackurl.host(取值填寫:http://jobview.odps.aliyun.com)
以便能夠查看日志中打印出的jobview信息。
手動執行Spark節點,可以查看該任務的執行日志,從打印出來的日志中可以獲取該任務的logview和jobview的url,編譯進一步查看與診斷

MaxCompute Spark開發的示例分析

MaxCompute Spark開發的示例分析

Spark作業定義完成后,即可以在業務流程中對不同類型服務進行編排、統一調度執行。

9. 作業診斷

提交作業后,需要根據作業日志來檢查作業是否正常提交并執行,MaxCompute對于Spark作業提供了Logview工具以及Spark Web-UI來幫助開發者進行作業診斷。

例如,通過Spark-submit方式(dataworks執行spark任務時也會產生相應日志)提交作業,在作業日志中會打印以下關鍵內容:

MaxCompute Spark開發的示例分析

  1. 通過日志輸出的logview在瀏覽器中可以查看CUPID類型的任務執行的基本信息。
    MaxCompute Spark開發的示例分析
            

    單擊TempRoot的StdOut按鈕可以查看SparkPi的輸出結果:
            

             MaxCompute Spark開發的示例分析
            

     

  2. 日志中打印出上述的TrackingUrl,表示您的作業已經提交到MaxCompute集群,這個TrackingUrl非常關鍵,它既是SparkWebUI,也是HistoryServer的Url。在瀏覽器中打開這個Url,可以追蹤Spark作業的運行情況。

MaxCompute Spark開發的示例分析

單擊driver的stdout即可以查看Spark作業的輸出內容。
MaxCompute Spark開發的示例分析

上述就是小編為大家分享的MaxCompute Spark開發的示例分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平乡县| 嘉义县| 青阳县| 修武县| 神农架林区| 杭锦后旗| 邯郸市| 资阳市| 天门市| 新巴尔虎左旗| 海门市| 南昌市| 红原县| 丹寨县| 和静县| 林口县| 新郑市| 南丹县| 竹溪县| 钟山县| 辰溪县| 宜黄县| 桐城市| 喀喇沁旗| 甘南县| 车致| 蓬安县| 梁山县| 尚志市| 宜宾县| 张家口市| 奇台县| 邵武市| 玉屏| 交口县| 阳西县| 襄垣县| 池州市| 大竹县| 临夏市| 玉树县|