亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark中如何使用groupByKey進行分組排序

發布時間:2023-03-09 15:24:31 來源:億速云 閱讀:133 作者:iii 欄目:開發技術

今天小編給大家分享一下spark中如何使用groupByKey進行分組排序的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

任務需求:已知RDD[(query:String, item_id:String, imp:Int, clk:Int)],要求找到每個query對應的點擊最多的前2個item_id,即:按照query分組,并按照clk降序排序,每組取前兩個。

例如:

(連衣裙,1234,  22,  13)

(牛仔褲,2768,  34,  7)

(連衣裙,1673,45,  9)

(襯衣,3468, 67,  12)

(牛仔褲,2754, 68, 20)

(連衣裙,1976,93,  29)

希望得到:

(連衣裙,1976,93,  29)

(連衣裙,1234,  22,  13)

(牛仔褲,2754, 68, 20)

(牛仔褲,2768,  34,  7)

(襯衣,3468, 67,  12)

先看一個錯誤的版本:

val list = List(("連衣裙",1234, 22, 13),("牛仔褲",2768, 34, 7),("連衣裙",1673,45, 9)
    ,("襯衣",3468,67, 12),("牛仔褲",2754, 68, 20),("連衣裙",1976,93, 29))
val rdd = ss.sparkContext.parallelize(list)
 
val topItem_set= rdd.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey()
  .map(line => {
        val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].reverse).take(2)
        topItem.mkString(",")
        topItem.map(x => {(line._1, x._1, x._2, x._3)})
  })
topItem_set.foreach(println)
println()
topItem_set.map(_.mkString).foreach(println)

我們把query作為key,其余放到一起,groupByKey后(map之前),類型為:RDD[(String, Iterable[(String, Int, Int)])],根據query分組再map,line._2.toArray把Iterable轉為Array,sortBy(_._3)是按最后一個Int即clk排序,(Ordering[Int].reverse)表示從大到小(sortBy默認從小到大,注意這里的sortBy是Array的成員函數而不是rdd的sortBy,用法比較不同),take(2)是取前2個,然后返回(query,  item_id)。跑一下上面的過程。

返回:

[Lscala.Tuple4;@2b672e4
[Lscala.Tuple4;@52e50126
[Lscala.Tuple4;@1362b124
 
(連衣裙,1976,93,29)(連衣裙,1234,22,13)
(襯衣,3468,67,12)
(牛仔褲,2754,68,20)(牛仔褲,2768,34,7)

上面3行是直接打印跟預期稍有差別,同一個key下的top兩個元素是作為一個整體,但已經很接近目標,如果希望拆分,需要使用flatMap:

val topItem_set= rdd.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey()
  .flatMap(line => {
        val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].reverse).take(2)
        topItem.mkString(",")
        topItem.map(x => {(line._1, x._1, x._2, x._3)})
  })

為什么呢?GroupByKey后,類型為RDD[(String, Iterable[(String, Int, Int)])],如果用map,那每一個key對應的一個Iterable變量,相當于一條數據,map后的結果自然還是一條。但flatMap,相當于map+flat操作,這才是我們真正的需要的形式。

任務進階:要求找到每個query對應的點擊最多的前2個item_id,當點擊一樣時,選曝光最少的,即:按照query分組,并優先按照clk降序排序,其次按照imp升序排序,每組取前兩個。

例如:

(連衣裙,1234,  22,  13)

(牛仔褲,2768,  34,  7)

(連衣裙,1673,45,  9)

(襯衣,3468, 67,  12)

(牛仔褲,2754, 68, 20)

(連衣裙,1976,93,  29)

(牛仔褲,1232,  20, 7)

希望得到:

(連衣裙,1976,93,  29)

(連衣裙,1234,  22,  13)

(牛仔褲,2754, 68, 20)

(牛仔褲,1232,  20,  7)

(襯衣,2768,  34,  7)

注意,上面樣本中牛仔褲有兩個樣本的點擊都是7,但標紅的樣本曝光數是更小,所以應該入選top2,直接上代碼吧:

val list2 = List(("連衣裙",1234, 22, 13),("牛仔褲",2768, 34, 7),("連衣裙",1673,45, 9)
    ,("襯衣",3468,67, 12),("牛仔褲",2754, 68, 20),("連衣裙",1976,93, 29),("牛仔褲",1232, 20, 7))
    val rdd2 = ss.sparkContext.parallelize(list2)
    rdd2.foreach(println)
    val topItem_set= rdd2.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey()
      .flatMap(line => {
        val topItem = line._2.toArray.sortBy(x => (x._3, x._2))(Ordering.Tuple2(Ordering[Int].reverse, Ordering[Int])).take(2)
        topItem.map(x => {(line._1, x._1, x._2, x._3)})
      })
    topItem_set.foreach(println)

sortBy可以根據需要增加排序維度,參數按優先級排列,這個在日常使用較多。

以上就是“spark中如何使用groupByKey進行分組排序”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

锦屏县| 昭通市| 含山县| 兴宁市| 广水市| 平舆县| 苍南县| 八宿县| 莫力| 高阳县| 巴林左旗| 永新县| 当涂县| 桓台县| 依兰县| 平泉县| 巩留县| 光山县| 伊春市| 富源县| 修水县| 景东| 恩施市| 新巴尔虎右旗| 聊城市| 黔西| 哈尔滨市| 咸丰县| 岑巩县| 山丹县| 安徽省| 涪陵区| 攀枝花市| 朔州市| 尤溪县| 镇康县| 高雄市| 山西省| 绥化市| 调兵山市| 青州市|