亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark RDD常用算子是什么類型的

發布時間:2022-02-19 11:34:57 來源:億速云 閱讀:238 作者:小新 欄目:開發技術

小編給大家分享一下Spark RDD常用算子是什么類型的,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

Spark RDD常用算子:Value類型

Spark之所以比Hadoop靈活和強大,其中一個原因是Spark內置了許多有用的算子,也就是方法。通過對這些方法的組合,編程人員就可以寫出自己想要的功能。說白了spark編程就是對spark算子的使用,下面為大家詳細講解一下SparkValue類型的常用算子

Spark RDD常用算子是什么類型的

map

函數說明:

map() 接收一個函數,該函數將RDD中的元素逐條進行映射轉換,可以是類型的轉換,也可以是值的轉換,將函數的返回結果作為結果RDD編程。

函數簽名:

def map[U: ClassTag](f: T => U): RDD[U]

案例演示

   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -map
   val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
   val mapRdd1 = rdd.map(
     _*2
   )
   mapRdd1.collect().foreach(println)
   sc.stop()

運行結果

2
4
6
8

mapPartitons

函數說明:

將待處理的數據以分區為單位發送到待計算節點上進行處理,mapPartition是對RDD的每一個分區的迭代器進行操作,返回的是迭代器。這里的處理可以進行任意的處理。

函數簽名:

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示

 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
   val sc = new SparkContext(sparkConf)
   //算子 -mapPartitons 計算每個分區的最大數
   val rdd = sc.makeRDD(List(1, 34, 36,345,2435,2342,62,35, 4),4)
   val mapParRdd = rdd.mapPartitions(
     iter => {
       List(iter.max).iterator
     }
   )
   mapParRdd.foreach(println)
   sc.stop()
 }

運行結果:

62
2435
34
345

mapPartitonsWithIndex

函數說明:

將待處理的數據以分區為單位發送到計算節點上,這里的處理可以進行任意的處理,哪怕是過濾數據,在處理的同時可以獲取當前分區的索引值。

函數簽名:

def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]

案例演示:

  1. 將數據進行扁平化映射并且打印所在的分區數
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       datas.map(
         num => {
           (index, num)
         }
       )
     }
   )
   mpwiRdd.collect().foreach(println)
 }

運行結果:

(0,Hello)
(0,Spark)
(1,Hello)
(1,Scala)
(1,Word)
(1,Count)
  1. 將數據進行扁平化映射只打印所在第一分區的數據
def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello Spark", "Hello Scala", "Word Count"),2)
   val mapRDD = rdd.flatMap(_.split(" "))
   val mpwiRdd = mapRDD.mapPartitionsWithIndex(
     (index, datas) => {
       if (index==0){
         datas.map(
           num => {
             (index, num)
           }
         )
       }else{
       Nil.iterator
       }
     }
   )
   mpwiRdd.collect().foreach(println)

運行結果:

(0,Hello)
(0,Spark)

flatMap

函數說明:

將數據進行扁平化之后在做映射處理,所以算子也稱為扁平化映射

函數簽名:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

案例演示:

將每個單詞進行扁平化映射

def main(args: Array[String]): Unit = {
 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
 val sc = new SparkContext(sparkConf)
 //算子 -map
 val rdd = sc.makeRDD(List("Hello Scala","Hello Spark"), 2)
 val FltRdd = rdd.flatMap(
   _.split(" ")
 )
 FltRdd.foreach(println)
 sc.stop()
}

運行結果:

Hello
Scala
Hello
Spark

glom

函數說明:

glom的作用就是將一個分區的數據合并到一個array中。

函數簽名:

def glom(): RDD[Array[T]]

案例演示:

  1. 將不同分區rdd的元素合并到一個分區
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9),2)
   val glomRdd = rdd.glom()
   glomRdd.collect().foreach(data=>println(data.mkString(",")))
   sc.stop()
 }

運行結果:

1,2,3,4
5,6,7,8,9

groupBy

函數說明:

將數據根據指定的規則進行分組,分區默認不變,單數數據會被打亂,我們成這樣的操作為shuffer,

函數簽名:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

案例演示:

  1. 按照奇偶數進行groupby分區
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,10),2)
   val groupByRDD = rdd.groupBy(_ % 2 == 0)
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

運行結果:

(false,CompactBuffer(1, 3, 5, 7))
(true,CompactBuffer(2, 4, 6, 8, 10))
  1. 按照單詞的首字母進行分組
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Tom","Timi","Scala","Spark"))
   val groupByRDD = rdd.groupBy(_.charAt(0))
   groupByRDD.collect().foreach(println)
   sc.stop()
 }

運行結果:

(T,CompactBuffer(Tom, Timi))
(H,CompactBuffer(Hello))
(S,CompactBuffer(Scala, Spark))

filter

函數說明:

filter即過濾器的意思,所以filter算子的作用就是過濾的作用。filter將根據指定的規則進行篩選過濾,符合條件的數據保留,不符合的數據丟棄,當數據進行篩選過濾之后,分區不變,但分區內的數據可能不均衡,生產環境下,可能會出現數據傾斜。

函數簽名:

def filter(f: T => Boolean): RDD[T]

案例演示:

  1. 篩選出能被二整除的數字
 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List(46,235,246,2346,3276,235,234,6234,6245,246,24,6246,235,26,265))
   val filterRDD = rdd.filter(_ % 2 == 0)
   filterRDD.collect().foreach(println)
   sc.stop()
 }

運行結果:

46
246
2346
3276
234
6234
246
24
6246
26

2.篩選單詞中包含H的

 def main(args: Array[String]): Unit = {
   val conf = new SparkConf().setMaster("local[*]").setAppName("rdd")
   val sc = new SparkContext(conf)
   val rdd = sc.makeRDD(List("Hello","Horber","Hbeer","ersfgH","Scala","Hadoop","Zookeeper"))
   val filterRDD = rdd.filter(_.contains("H"))
   filterRDD.collect().foreach(println)
   sc.stop()
 }

運行結果:

Hello
Horber
Hbeer
ersfgH
Hadoop

以上是“Spark RDD常用算子是什么類型的”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

万全县| 京山县| 正安县| 盱眙县| 崇仁县| 沁源县| 乌鲁木齐市| 天峻县| 宣化县| 镇坪县| 望江县| 山丹县| 桐乡市| 临潭县| 鄂托克前旗| 齐河县| 浠水县| 睢宁县| 四川省| 阿尔山市| 西昌市| 南和县| 扶沟县| 吴旗县| 南召县| 宜宾县| 阜阳市| 镶黄旗| 太仓市| 邯郸县| 北票市| 当阳市| 泰来县| 敖汉旗| 新河县| 朝阳区| 于田县| 绵阳市| 邳州市| 叶城县| 滨海县|