您好,登錄后才能下訂單哦!
本篇內容介紹了“Hive怎么避免數據傾斜”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
對于每一個表(table)或者分區, Hive可以進一步組織成桶,也就是說桶是更為細粒度的數據范圍劃分。Hive也是 針對某一列進行桶的組織。Hive采用對列值哈希,然后除以桶的個數求余的方式決定該條記錄存放在哪個桶當中。
把表(或者分區)組織成桶(Bucket)有兩個理由:
桶為表加上了額外的結構,Hive 在處理有些查詢時能利用這個結構。具體而言,連接兩個在(包含連接列的)相同列上劃分了桶的表,可以使用 Map 端連接 (Map-side join)高效的實現。比如JOIN操作。對于JOIN操作兩個表有一個相同的列,如果對這兩個表都進行了桶操作。那么將保存相同列值的桶進行JOIN操作就可以,可以大大較少JOIN的數據量。
在處理大規模數據集時,在開發和修改查詢的階段,如果能在數據集的一小部分數據上試運行查詢,會帶來很多方便。
create table bucketed_user(id int,name string) clustered by (id) sorted by(name) into 4 buckets row format delimited fields terminated by '\t' stored as textfile;
首先,我們來看如何告訴Hive—個表應該被劃分成桶。我們使用CLUSTERED BY 子句來指定劃分桶所用的列和要劃分的桶的個數:
CREATE TABLE bucketed_user (id INT) name STRING)
CLUSTERED BY (id) INTO 4 BUCKETS;
在這里,我們使用用戶ID來確定如何劃分桶(Hive使用對值進行哈希并將結果除 以桶的個數取余數。這樣,任何一桶里都會有一個隨機的用戶集合(PS:其實也能說是隨機,不是嗎?)。
對于map端連接的情況,兩個表以相同方式劃分桶。處理左邊表內某個桶的 mapper知道右邊表內相匹配的行在對應的桶內。因此,mapper只需要獲取那個桶 (這只是右邊表內存儲數據的一小部分)即可進行連接。這一優化方法并不一定要求 兩個表必須桶的個數相同,兩個表的桶個數是倍數關系也可以。用HiveQL對兩個劃分了桶的表進行連接,可參見“map連接”部分(P400)。
桶中的數據可以根據一個或多個列另外進行排序。由于這樣對每個桶的連接變成了高效的歸并排序(merge-sort), 因此可以進一步提升map端連接的效率。以下語法聲明一個表使其使用排序桶:
CREATE TABLE bucketed_users (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 4 BUCKETS;
我們如何保證表中的數據都劃分成桶了呢?把在Hive外生成的數據加載到劃分成 桶的表中,當然是可以的。其實讓Hive來劃分桶更容易。這一操作通常針對已有的表。
Hive并不檢查數據文件中的桶是否和表定義中的桶一致(無論是對于桶 的數量或用于劃分桶的列)。如果兩者不匹配,在査詢時可能會碰到錯 誤或未定義的結果。因此,建議讓Hive來進行劃分桶的操作。
select /*+ MAPJOIN(time_dim) */ count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
set hive.auto.convert.join=true;
select count(*) from
store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
select /*+ MAPJOIN(time_dim, date_dim) */ count(*) from
store_sales
join time_dim on (ss_sold_time_sk = t_time_sk)
join date_dim on (ss_sold_date_sk = d_date_sk)
where t_hour = 8 and d_year = 2002
設置下面兩個屬性hive將會進行自動執行上述過程,第一個屬性默認為true,第二個屬性是設置map端join適合讀取內存文件的大小。
set hive.auto.convert.join.noconditionaltask = true;
set hive.auto.convert.join.noconditionaltask.size = 10000000;
我們只需要設置一下幾個參數即可:
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
使用下面屬性:
set hive.auto.convert.sortmerge.join.bigtable.selection.policy= org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ;
幾種策略設置
org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ (default)
org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ
org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ
詳細請參考一下連接:hive中進行連接方案詳解(https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins)
Hive的執行是分階段的,map處理數據量的差異取決于上一個stage的reduce輸出,所以如何將數據均勻的分配到各個reduce中,就是解決數據傾斜的根本所在。
1,數據在節點上分布不均
2,key分布不均(key中存在個別值數據量比較大,比如NULL,那么join時就會容易發生數據傾斜)
3,count(disctinct key),在數據兩比較大的時候容易發生數據傾斜,因為count(distinct)是按照group by字段進行分組的
4,group by的使用容易造成數據傾斜
5,業務數據本身的特性
6,建表時考慮不周
7,某些SQL語句本身就有數據傾斜
任務進度長時間維持在99%左右,查看任務監控頁面發現只有少量reduce任務未完成。因為其處理的數據量和其他reduce差異過大。單一reduce的記錄數與平均記錄數差異過大,通常可能達到3倍甚至更多。最長時長遠大于平均時長。
set hive.map.aggr=true;
Map 端部分聚合,相當于Combiner
set hive.groupby.skewindata=true;
有數據傾斜的時候進行負載均衡,當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
set hive.map.aggr ; --> 是否在Map端進行數據聚合,默認設置為true;
set hive.groupby.mapaggr.checkinterval ; --> 在Map端進行聚合操作的條目數。
set hive.groupby.skewindata ;
默認值是false,需要設置成true ;
當設置為true時,會變成兩個MapReduce ;
第一個MR JOb中,map的輸出結果會隨機分布到Reduce中,每個Reduce做部分聚合操作,并輸出結果,這樣出來的結果相同的Group By Key有可能被分發到不同的Reduce中,從而達到輔助均衡目的。
第二個MR JOb,會根據預處理數據結果按照key分布到Reduce中,最終完成聚合操作。
場景:如日志中,常會有信息丟失的問題,比如日志中的 user_id,如果取其中的 user_id 和 用戶表中的user_id 關聯,會碰到數據傾斜的問題。
select * from log a
join users b
on a.user_id is not null
and a.user_id = b.user_id
union all
select * from log a
where a.user_id is null;
select *
from log a
left outer join users b
on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
結論:方法2比方法1效率更好,不但io少了,而且作業數也少了。解決方法1中 log讀取兩次,jobs是2。解決方法2 job數是1 。這個優化適合無效 id (比如 -99 , ’’, null 等) 產生的傾斜問題。把空值的 key 變成一個字符串加上隨機數,就能把傾斜的數據分到不同的reduce上 ,解決數據傾斜問題。
場景:用戶表中user_id字段為int,log表中user_id字段既有string類型也有int類型。當按照user_id進行兩個表的Join操作時,默認的Hash操作會按int型的id來進行分配,這樣會導致所有string類型id的記錄都分配到一個Reducer中。
select * from users a
left outer join logs b
on a.usr_id = cast(b.user_id as string)
使用 map join解決小表(記錄數少)關聯大表的數據傾斜問題,這個方法使用的頻率非常高,但如果小表很大,大到map join會出現bug或異常,這時就需要特別的處理。以下例子:
select * from log a
left outer join users b
on a.user_id = b.user_id;
users 表有 600w+ 的記錄,把 users 分發到所有的 map 上也是個不小的開銷,而且 map join 不支持這么大的小表。如果用普通的 join,又會碰到數據傾斜的問題。
select /*+mapjoin(x)*/* from log a
left outer join (
select /*+mapjoin(c)*/d.*
from ( select distinct user_id from log ) c
join users d
on c.user_id = d.user_id
) x
on a.user_id = b.user_id;
假如,log里user_id有上百萬個,這就又回到原來map join問題。所幸,每日的會員uv不會太多,有交易的會員不會太多,有點擊的會員不會太多,有傭金的會員不會太多等等。所以這個方法能解決很多場景下的數據傾斜問題。
使map的輸出數據更均勻的分布到reduce中去,是我們的最終目標。由于Hash算法的局限性,按key Hash會或多或少的造成數據傾斜。大量經驗表明數據傾斜的原因是人為的建表疏忽或業務邏輯可以規避的。在此給出較為通用的步驟:
1、采樣log表,哪些user_id比較傾斜,得到一個結果表tmp1。由于對計算框架來說,所有的數據過來,他都是不知道數據分布情況的,所以采樣是并不可少的。
2、數據的分布符合社會學統計規則,貧富不均。傾斜的key不會太多,就像一個社會的富人不多,奇特的人不多一樣。所以tmp1記錄數會很少。把tmp1和users做map join生成tmp2,把tmp2讀到distribute file cache。這是一個map過程。
3、map讀入users和log,假如記錄來自log,則檢查user_id是否在tmp2里,如果是,輸出到本地文件a,否則生成的key,value對,假如記錄來自member,生成的key,value對,進入reduce階段。
4、最終把a文件,把Stage3 reduce階段輸出的文件合并起寫到hdfs。
如果確認業務需要這樣傾斜的邏輯,考慮以下的優化方案:
1、對于join,在判斷小表不大于1G的情況下,使用map join
2、對于group by或distinct,設定
hive.groupby.skewindata=true
“Hive怎么避免數據傾斜”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。