您好,登錄后才能下訂單哦!
本篇文章為大家展示了Spark On MaxCompute如何訪問Phonix數據,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
hbase主要版本為2.0與1.1,這邊選擇對應hbase對應的版本為1.1
Hbase與Hbase2.0版本的區別
HBase1.1版本
1.1版本基于HBase社區1.1.2版本開發。
HBase2.0版本
2.0版本是基于社區2018年發布的HBase2.0.0版本開發的全新版本。同樣,在此基礎上,做了大量的改進和優化,吸收了眾多阿里內部成功經驗,比社區HBase版本具有更好的穩定性和性能。
確保測試聯通性的可以方便可行,該hbase的VPCId,vsWitchID盡量與購買的獨享集成資源組的為一致的。
根據文檔鏈接選擇對應的DataWorks的region下的白名單進行添加。
打開數據庫鏈接的按鈕,可以查看到Hbase的主版本以及Hbase的專有網絡訪問地址,以及是否開通公網訪問的方式進行連接。
根據hbase的版本為1.1選擇Phonix的版本為4.12.0根據文檔下載對應的客戶端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陸客戶端執行命令
./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181
創建表:
CREATE TABLE IF NOT EXISTS users_phonix ( id INT , username STRING, password STRING ) ;
插入數據:
UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');
在客戶端執行命令,查看當前表與數據是否上傳成功
select * from users;
在IDEA按照對應得Pom文件進行配置本地得開發環境,將代碼涉及到得配置信息填寫完整,進行編寫測試,這里可以先使用Hbase得公網訪問鏈接進行測試,代碼邏輯驗證成功后可調整配置參數,具體代碼如下
package com.git.phonix import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SparkSession import org.apache.phoenix.spark._ /** * 本實例適用于Phoenix 4.x版本 */ object SparkOnPhoenix4xSparkSession { def main(args: Array[String]): Unit = { //HBase集群的ZK鏈接地址。 //格式為:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181 val zkAddress = args(0) //Phoenix側的表名,需要在Phoenix側提前創建。Phoenix表創建可以參考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW val phoenixTableName = args(1) //Spark側的表名。 val ODPSTableName = args(2) val sparkSession = SparkSession .builder() .appName("SparkSQL-on-MaxCompute") .config("spark.sql.broadcastTimeout", 20 * 60) .config("spark.sql.crossJoin.enabled", true) .config("odps.exec.dynamic.partition.mode", "nonstrict") //.config("spark.master", "local[4]") // 需設置spark.master為local[N]才能直接運行,N為并發數 .config("spark.hadoop.odps.project.name", "***") .config("spark.hadoop.odps.access.id", "***") .config("spark.hadoop.odps.access.key", "***") //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api") .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api") .config("spark.sql.catalogImplementation", "odps") .getOrCreate() //第一種插入方式 var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load() df.show() df.write.mode("overwrite").insertInto(ODPSTableName) } }
pom文件中分為Spark依賴,與ali-phoenix-spark相關的依賴,由于涉及到ODPS的jar包,會在集群中引起jar沖突,所以要將ODPS的包排除掉
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <properties> <spark.version>2.3.0</spark.version> <cupid.sdk.version>3.3.8-public</cupid.sdk.version> <scala.version>2.11.8</scala.version> <scala.binary.version>2.11</scala.binary.version> <phoenix.version>4.12.0-HBase-1.1</phoenix.version> </properties> <groupId>com.aliyun.odps</groupId> <artifactId>Spark-Phonix</artifactId> <version>1.0.0-SNAPSHOT</version> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-model</artifactId> <version>1.3.8</version> </dependency> <dependency> <groupId>org.jpmml</groupId> <artifactId>pmml-evaluator</artifactId> <version>1.3.10</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </exclusion> <exclusion> <groupId>org.scala-lang</groupId> <artifactId>scalap</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.binary.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>cupid-sdk</artifactId> <version>${cupid.sdk.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-mapred</artifactId> </exclusion> <exclusion> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-commons</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-spark</artifactId> <version>4.12.0-AliHBase-1.1-0.8</version> <exclusions> <exclusion> <groupId>com.aliyun.phoenix</groupId> <artifactId>ali-phoenix-core</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <shadedArtifactAttached>true</shadedArtifactAttached> <artifactSet> <includes> <!-- Include here the dependencies you want to be packed in your fat jar --> <include>*:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>**/log4j.properties</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.3.2</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
CREATE TABLE IF NOT EXISTS users_phonix ( id INT , username STRING, password STRING ) ;
在IDEA打包要打成shaded包,將所有的依賴包,打入jar包中,由于DatadWork界面方式上傳jar包有50M的限制,因此采用MaxCompute客戶端進行jar包
進入DataWorks界面選擇左側資源圖標,選擇對應的環境位開發換進,輸入刪除文件時的文件名稱進行搜索,列表中展示該資源已經上傳成,點擊提交到數據開發
點擊提交按鈕
其中的配置vpcList文件的配置信息如下,可具體根據個人hbase的鏈接,進行配置
{ "regionId":"cn-beijing", "vpcs":[ { "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk", "zones":[ { "urls":[ { "domain":"172.16.0.12", "port":2181 }, { "domain":"172.16.0.13", "port":2181 }, { "domain":"172.16.0.15", "port":2181 }, { "domain":"172.16.0.14", "port":2181 }, { "domain":"172.16.0.12", "port":16000 }, { "domain":"172.16.0.13", "port":16000 }, { "domain":"172.16.0.15", "port":16000 }, { "domain":"172.16.0.14", "port":16000 }, { "domain":"172.16.0.12", "port":16020 }, { "domain":"172.16.0.13", "port":16020 }, { "domain":"172.16.0.15", "port":16020 }, { "domain":"172.16.0.14", "port":16020 } ] } ] } ] }
Spark任務提交任務的配置參數,主類,以及對應的參數
該參數主要為3個參數第一個為Phonix的鏈接,第二個為Phonix的表名稱,第三個為傳入的MaxCompute表
點擊冒煙測試按鈕,可以看到任務執行成功
在臨時查詢節點中執行查詢語句,可以得到數據已經寫入MaxCompute的表中
使用Spark on MaxCompute訪問Phonix的數據,并將數據寫入到MaxCompute的表中經過實踐,該方案時可行的。但在實踐的時有幾點注意事項:
1.結合實際使用情況選擇對應的Hbase以及Phonix版本,對應的版本一致,并且所使用的客戶端,以及代碼依賴都會有所改變。
2.使用公網在IEAD進行本地測試,要注意Hbase白名單,不僅要設置DataWorks的白名單,還需將自己本地的地址加入到白名單中。
3.代碼打包時需要將pom中的依賴關系進行梳理,避免ODPS所存在的包在對應的依賴中,進而引起jar包沖突,并且打包時打成shaded包,避免缺失遺漏對應的依賴。
上述內容就是Spark On MaxCompute如何訪問Phonix數據,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。