亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Core讀取ES的分區問題案例分析

發布時間:2021-12-16 14:02:11 來源:億速云 閱讀:204 作者:iii 欄目:大數據

本篇內容介紹了“Spark Core讀取ES的分區問題案例分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1.Spark Core讀取ES

ES官網直接提供的有elasticsearch-hadoop 插件,對于ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version  = 2.7.1hadoop22Version = 2.2.0spark13Version = 1.6.2spark20Version = 2.3.0

浪尖這了采用的ES版本是7.1.1,測試用的Spark版本是2.3.1,沒有問題。整合es和spark,導入相關依賴有兩種方式:

a,導入整個elasticsearch-hadoop包

 <dependency>      <groupId>org.elasticsearch</groupId>      <artifactId>elasticsearch-hadoop</artifactId>      <version>7.1.1</version>    </dependency>

b,只導入spark模塊的包

<dependency>      <groupId>org.elasticsearch</groupId>      <artifactId>elasticsearch-spark-20_2.11</artifactId>      <version>7.1.1</version>    </dependency>

浪尖這里為了測試方便,只是在本機起了一個單節點的ES實例,簡單的測試代碼如下:


import org.apache.spark.{SparkConf, SparkContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkrdd {
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
   conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")    conf.set(ConfigurationOptions.ES_PORT, "9200")    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)    conf.set("es.write.rest.error.handlers", "ignoreConflict")    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
   val sc = new SparkContext(conf)    import org.elasticsearch.spark._
   sc.esRDD("posts").foreach(each=>{      each._2.keys.foreach(println)    })    sc.esJsonRDD("posts").foreach(each=>{      println(each._2)    })
   sc.stop()  }}

可以看到Spark Core讀取RDD主要有兩種形式的API:

a,esRDD。這種返回的是一個tuple2的類型的RDD,第一個元素是id,第二個是一個map,包含ES的document元素。

RDD[(String, Map[String, AnyRef])]

b,esJsonRDD。這種返回的也是一個tuple2類型的RDD,第一個元素依然是id,第二個是json字符串。

RDD[(String, String)]

雖然是兩種類型的RDD,但是RDD都是ScalaEsRDD類型。

要分析Spark Core讀取ES的并行度,只需要分析ScalaEsRDD的getPartitions函數即可。

2.源碼分析

首先導入源碼https://github.com/elastic/elasticsearch-hadoop這個是gradle工程,可以直接導入idea,然后切換到7.x版本即可。

廢話少說直接找到ScalaEsRDD,發現gePartitions是在其父類實現的,方法內容如下:

override def getPartitions: Array[Partition] = {    esPartitions.zipWithIndex.map { case(esPartition, idx) =>      new EsPartition(id, idx, esPartition)    }.toArray  }

esPartitions是一個lazy型的變量:

@transient private[spark] lazy val esPartitions = {    RestService.findPartitions(esCfg, logger)  }

這種聲明原因是什么呢?

lazy+transient的原因大家可以考慮一下。

RestService.findPartitions方法也是僅是創建客戶端獲取分片等信息,然后調用,分兩種情況調用兩個方法。

final List<PartitionDefinition> partitions;//            5.x及以后版本 同時沒有配置es.input.max.docs.per.partitionif (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {     partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);} else {     partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);}

a).findSlicePartitions

這個方法其實就是在5.x及以后的ES版本,同時配置了

es.input.max.docs.per.partition

以后,才會執行,實際上就是將ES的分片按照指定大小進行拆分,必然要先進行分片大小統計,然后計算出拆分的分區數,最后生成分區信息。具體代碼如下:

long numDocs;if (readResource.isTyped()) {    numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);} else {    numDocs = client.countIndexShard(index, Integer.toString(shardId), query);}int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);for (int i = 0; i < numPartitions; i++) {    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));}

實際上分片就是用游標的方式,對_doc進行排序,然后按照分片計算得到的分區偏移進行數據的讀取,組裝過程是SearchRequestBuilder.assemble方法來實現的。

這個其實個人覺得會浪費一定的性能,假如真的要ES結合Spark的話,建議合理設置分片數。

b).findShardPartitions方法

這個方法沒啥疑問了就是一個RDD分區對應于ES index的一個分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,locationList.toArray(new String[0]));partitions.add(partition);

“Spark Core讀取ES的分區問題案例分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

文水县| 永州市| 郧西县| 沈阳市| 观塘区| 微山县| 长阳| 城口县| 稷山县| 韶山市| 武汉市| 云梦县| 武平县| 伊春市| 庆云县| 福州市| 资兴市| 根河市| 台湾省| 兴业县| 亳州市| 庄浪县| 湘潭县| 昌黎县| 上杭县| 镇远县| 滨海县| 平南县| 呼图壁县| 岱山县| 开封县| 和田市| 应用必备| 揭东县| 峨眉山市| 武宁县| 兴国县| 砀山县| 安庆市| 辉南县| 巨野县|