亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Spark Streaming SQL對PV和UV進行統計

發布時間:2021-09-04 09:24:38 來源:億速云 閱讀:198 作者:chen 欄目:大數據

本篇內容介紹了“如何使用Spark Streaming SQL對PV和UV進行統計”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1.背景介紹

PV/UV統計是流式分析一個常見的場景。通過PV可以對訪問的網站做流量或熱點分析,例如廣告主可以通過PV值預估投放廣告網頁所帶來的流量以及廣告收入。另外一些場景需要對訪問的用戶作分析,比如分析用戶的網頁點擊行為,此時就需要對UV做統計。

使用Spark Streaming SQL,并結合Redis可以很方便進行PV/UV的統計。本文將介紹通過Streaming  SQL消費Loghub中存儲的用戶訪問信息,對過去1分鐘內的數據進行PV/UV統計,將結果存入Redis中。

2.準備工作

  • 創建E-MapReduce 3.23.0以上版本的Hadoop集群。

  • 下載并編譯E-MapReduce-SDK包

git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git cd aliyun-emapreduce-sdk git checkout -b master-2.x origin/master-2.x mvn clean package -DskipTests

編譯完后,  assembly/target目錄下會生成emr-datasources_shaded_${version}.jar,其中${version}為sdk的版本。

數據源

本文采用Loghub作為數據源,有關日志采集、日志解析請參考日志服務。

3.統計PV/UV

一般場景下需要將統計出的PV/UV以及相應的統計時間存入Redis。其他一些業務場景中,也會只保存最新結果,用新的結果不斷覆蓋更新舊的數據。以下首先介紹第一種情況的操作流程。

3.1啟動客戶端

命令行啟動streaming-sql客戶端

streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar

也可以創建SQL語句文件,通過streaming-sql -f的方式運行。

3.1定義數據表

數據源表定義如下

CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)  USING loghub  OPTIONS( sls.project=${sls.project}, sls.store=${sls.store}, access.key.id=${access.key.id}, access.key.secret=${access.key.secret}, endpoint=${endpoint});

其中,數據源表包含user_ip和__time__兩個字段,分別代表用戶的IP地址和loghub上的時間列。OPTIONS中配置項的值根據實際配置。

結果表定義如下

CREATE TABLE redis_sink  USING redis  OPTIONS( table='statistic_info', host=${redis_host}, key.column='interval');

其中,statistic_info為Redis存儲結果的表名,interval對應統計結果中的interval字段;配置項${redis_host}的值根據實際配置。

3.2創建流作業

CREATE SCAN loghub_scan ON loghub_source USING STREAM OPTIONS( watermark.column='__time__', watermark.delayThreshold='10 second');  CREATE STREAM job OPTIONS( checkpointLocation=${checkpoint_location}) INSERT INTO redis_sink SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval FROM loghub_scan GROUP BY TUMBLING(__time__, interval 1 minute), window;

4.3查看統計結果

最終的統計結果如下圖所示

可以看到,每隔一分鐘都會生成一條數據,key的形式為表名:interval,value為pv和uv的值。

3.4實現覆蓋更新

將結果表的配置項key.column修改為一個固定的值,例如定義如下

CREATE TABLE redis_sink USING redis  OPTIONS( table='statistic_info', host=${redis_host}, key.column='statistic_type');

創建流作業的SQL改為

CREATE STREAM job OPTIONS( checkpointLocation='/tmp/spark-test/checkpoint') INSERT INTO redis_sink SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval FROM loghub_scan GROUP BY TUMBLING(__time__, interval 1 minute), window;

最終的統計結果如下圖所示

如何使用Spark Streaming SQL對PV和UV進行統計

可以看到,Redis中值保留了一個值,這個值每分鐘都被更新,value包含pv、uv和interval的值。

“如何使用Spark Streaming SQL對PV和UV進行統計”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

藁城市| 高密市| 沁水县| 大冶市| 三台县| 大方县| 固始县| 安溪县| 右玉县| 栾城县| 兰考县| 睢宁县| 仁布县| 万年县| 武山县| 郧西县| 文山县| 平山县| 莲花县| 巫山县| 肃宁县| 洞头县| 菏泽市| 临湘市| 江津市| 嘉黎县| 大厂| 武隆县| 昭平县| 富顺县| 双流县| 崇州市| 冷水江市| 鹰潭市| 鸡泽县| 库车县| 中山市| 永吉县| 米脂县| 瑞金市| 威信县|