亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

簡述大數據實時處理框架

發布時間:2020-08-11 10:30:02 來源:網絡 閱讀:862 作者:懂天馬 欄目:大數據

歡迎來到BigData的世界

現如今,我們來到了數據時代,數據信息化與我們的生活與工作息息相關。此篇文章簡述利用大數據框架,實時處理數據的流程與相關框架的介紹,主要包括:

  • 數據實時處理的概念和意義

  • 數據實時處理能做什么

  • 數據實時處理架構簡介

  • 數據實時處理代碼演示

數據實時處理的概念和意義

什么是數據實時處理呢?我個人對數據實時處理的理解為:數據從生成->實時采集->實時緩存存儲->(準)實時計算->實時落地->實時展示->實時分析。這一個流程線下來,處理數據的速度在秒級甚至毫秒級。

數據實時處理有什么意義呢?我們得到數據可以進行數據分析,利用數據統計方法,從錯綜復雜的數據關系中梳理出事物的聯系,比如發展趨勢、影響因素、因果關系等。甚至建立一些BI,對一些數據的有用信息進行可視化呈現,并形成數據故事。

數據實時處理能做什么

數據的實時計算

何為數據的實時計算?我們從數據源端拿到數據,可能不盡如人意,我們想對得到的數據進行 ETL 操作、或者進行關聯等等,那么我們就會用到數據的實時計算。目前主流的實時計算框架有 spark,storm,flink 等。

數據的實時落地

數據的實時落地,意思是將我們的源數據或者計算好的數據進行實時的存儲。在大數據領域,推薦使用 HDFS,ES 等進行存儲。

數據的實時展示與分析

我們拿到了數據,要會用數據的價值。數據的價值體現在數據中相互關聯關系,或與歷史關聯,或能預測未來。我們實時得到數據,不僅能夠利用前端框架進行實時展示,還可以對其中的一些數據進行算法訓練,預測未來走勢等。

example:

淘寶雙 11 大屏,每年的雙 11 是淘寶粉絲瘋狂的日子。馬云會在雙 11 的當天在阿里總部豎起一面大的電子屏幕,展示淘寶這一天的成績。例如成交額,訪問人數,訂單量,下單量,成交量等等。這個電子大屏的背后,就是用到的我們所說的數據的實時處理。首先,阿里的服務器遍布全國各地,這些服務器收集PC端、手機端等日志,上報到服務器,在服務上部署數據采集工具。接下來,由于數據量龐大,需要做數據的緩存緩沖處理。下一步,對原始日志進行實時的計算,比如篩選出上面所述的各個指標。最后,通過接口或者其他形式,進行前端屏幕的實時展示。

數據實時處理架構簡介

接下來是我們介紹的重點,先放一張數據流程圖:


簡述大數據實時處理框架cdn.xitu.io/2018/9/3/1659d6798453f811?imageView2/0/w/1280/h/960/format/webp/ignore-error/1">


  • 數據采集端,選用目前采集數據的主流控件 flume。

  • 數據緩沖緩存,選用分布式消息隊列 kafka。

  • 數據實時計算,選用 spark 計算引擎。

  • 數據存儲位置,選用分布式數據存儲 ES。

  • 其他,指從 ES 中拿到數據后進行可視化展示,數據分析等。

下面將分別簡單的介紹下各個組件:

flume

flume 是一個分布式的數據收集系統,具有高可靠、高可用、事務管理、失敗重啟、聚合和傳輸等功能。數據處理速度快,完全可以用于生產環境。

flume 的核心概念有:event,agent,source,channel,sink

event

flume 的數據流由事件 (event) 貫穿始終。event 是 flume 的基本數據單位,它攜帶日志數據并且攜帶數據的頭信息,這些 event 由 agent 外部的 source 生成,當 source 捕獲事件后會進行特定的格式化,然后 source 會把事件推入 channel 中。可以把 channel 看作是一個緩沖區,它將保存事件直到 sink 處理完該事件。sink 負責持久化日志或者把事件推向另一個 source。

agent

flume 的核心是 agent。agent 是一個 java 進程,運行在日志收集端,通過 agent 接收日志,然后暫存起來,再發送到目的地。 每臺機器運行一個 agent。 agent 里面可以包含多個 source,channel,sink。

source

source 是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到 event 里,然后將事件推入 channel 中。flume 提供了很多內置的 source,支持 avro,log4j,syslog 等等。如果內置的 source 無法滿足環境的需求,flume 還支持自定義 source。

channel

channel 是連接 source 和 sink 的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到 sink 處理完該事件。兩個較為常用的 channel,MemoryChannel 和 FileChannel。

sink

sink 從 channel 中取出事件,然后將數據發到別處,可以向文件系統、數據庫、hadoop、kafka,也可以是其他 agent 的 source。

flume 的可靠性與可恢復性

  • flume 的可靠性:當節點出現故障時,日志能夠被傳送到其他節點上而不會丟失。Flume 提供了可靠性保障,收到數據首先寫到磁盤上,當數據傳送成功后,再刪除;如果數據發送失敗,可以重新發送。

  • flume 的可恢復性:可恢復性是靠 channel。

口述抽象,上兩張官網貼圖:

單個 agent 收集數據流程圖


簡述大數據實時處理框架多個 agent 協作處理數據流程圖



簡述大數據實時處理框架


kafka

Kafka 是一個高吞吐量的分布式發布-訂閱消息系統。企業中一般使用 kafka 做消息中間件,做緩沖緩存處理。需要 zookeeper 分布式協調組件管理。

kafka 的設計目標:

  1. 提供優秀的消息持久化能力,對 TB 級以上數據也能保證常數時間的訪問性能。

  2. 高吞吐率。即使在非常廉價的機器上也能做到每臺機每秒 100000 條消息的傳輸。

  3. 支持 kafka server 間的消息分區,及分布式消費,同時保證每個 partition 內的消息順序傳輸。

  4. 同時支持離線數據處理和實時數據處理。

kafka 核心概念

  • broker:消息中間件處理結點,一個 kafka 節點就是一個 broker,多個 broker 可以組成一個 kafka 集群。

  • topic:主題,kafka 集群能夠同時負責多個 topic 的分發。

  • partition:topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列。

  • offset:每個 partition 都由一系列有序的、不可變的消息組成,這些消息被連續的追加到 partition 中。partition 中的每個消息都有一個連續的序列號叫做 offset,用于 partition 唯一標識一條消息。

  • producer:負責發布消息到 kafka broker。

  • consumer:消息消費者,向 kafka broker讀取消息的客戶端。

  • consumer group:每個 consumer 屬于一個特定的 consumer group。

貼兩張官網圖

prodecer-broker-consumer


簡述大數據實時處理框架分區圖


spark

spark 是一個分布式的計算框架,是我目前認為最火的計算框架。

spark,是一種"one stack to rulethem all"的大數據計算框架,期望使用一個技術棧就完美地解決大數據領域的各種計算任務。apache 官方,對 spark 的定義是:通用的大數據快速處理引擎(一“棧”式)。

spark組成
  1. spark core 用于離線計算

  2. spark sql 用于交互式查詢

  3. spark streaming,structed streaming 用于實時流式計算

  4. spark MLlib 用于機器學習

  5. spark GraphX 用于圖計算

spark 特點
  1. 速度快:spar k基于內存進行計算(當然也有部分計算基于磁盤,比如 shuffle)。

  2. 容易上手開發:spark 的基于 rdd 的計算模型,比 hadoop 的基于 map-reduce 的計算模型要更加易于理解,更加易于上手開發,實現各種復雜功能。

  3. 通用性:spark 提供的技術組件,可以一站式地完成大數據領域的離線批處理、交互式查詢、流式計算、機器學習、圖計算等常見的任務。

  4. 與其他技術的完美集成:例如 hadoop,hdfs、hive、hbase 負責存儲,yarn 負責資源調度,spark 負責大數據計算。

  5. 極高的活躍度:spark 目前是 apache 的頂級項目,全世界有大量的優秀工程師是 spark 的 committer,并且世界上很多頂級的 IT 公司都在大規模地使用 spark。

貼個spark架構圖


簡述大數據實時處理框架


數據實時處理代碼演示

搭建好各個集群環境

需要搭建 flume 集群,kafka 集群,es 集群,zookeeper 集群,由于本例 spark 是在本地模式運行,所以無需搭建 spark 集群。

配置好組件之間整合的配置文件

搭建好集群后,根據集群組件直接的整合關系,配置好配置文件。其中主要的配置為 flume 的配置,如下圖:


簡述大數據實時處理框架可以看到,我們的 agent 的 source 為 r1,channel 為 c1,sink 為 k1,source 為我本地 nc 服務,收集日志時,只需要打開 9999 端口就可以把日志收集。channel 選擇為 memory 內存模式。sink 為 kafka 的 topic8 主題。


開啟各個集群進程

  1. 開啟 zookeeper 服務。其中 QuorumPeerMain 為 zookeeper 進程。

  2. 開啟 kafka 服務。

  3. 開啟 es 服務。

  4. 開啟 flume 服務。其中 Application 為 flume 進程。


簡述大數據實時處理框架


創建好 es 對應 table

創建好 es 對應的表,表有三個字段,對應代碼里面的 case class(代碼隨后貼上)。


簡述大數據實時處理框架



簡述大數據實時處理框架



簡述大數據實時處理框架


代碼如下:

package?run?import?org.apache.kafka.common.serialization.StringDeserializer?import?org.apache.log4j.Logger?import?org.apache.spark.{SparkConf,?SparkContext}?import?org.apache.spark.sql.SparkSession?import?org.apache.spark.streaming.dstream.DStream?import?org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe?import?org.apache.spark.streaming.kafka010.KafkaUtils?import?org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent?import?org.apache.spark.streaming.{Seconds,?StreamingContext}?import?org.elasticsearch.spark.rdd.EsSpark?/**???*?@author?wangjx???*?測試kafka數據進行統計??kafka自身維護offset(建議使用自定義維護方式維護偏移量)???*/?object?SparkStreamingAutoOffsetKafka?{???//定義樣例類?與es表對應???case?class?people(name:String,country:String,age:Int)???def?main(args:?Array[String]):?Unit?=?{?????val?logger?=?Logger.getLogger(this.getClass);?????//spark?配置?????val?conf?=?new?SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")?????conf.set("es.index.auto.create","true")?????conf.set("es.nodes","127.0.0.1")?????conf.set("es.port","9200")?????//spark?streaming實時計算初始化?定義每10秒一個批次?準實時處理?企業一般都是準實時?比如每隔10秒統計近1分鐘的數據等等?????val?ssc?=?new?StreamingContext(conf,?Seconds(10))?????val?spark?=?SparkSession.builder()???????.config(conf)???????.getOrCreate()?????spark.sparkContext.setLogLevel("WARN");?????//設置kafka參數?????val?kafkaParams?=?Map[String,?Object](???????"bootstrap.servers"?->?"x:9092",???????"key.deserializer"?->?classOf[StringDeserializer],???????"value.deserializer"?->?classOf[StringDeserializer],???????"group.id"?->?"exactly-once",???????"auto.offset.reset"?->?"latest",???????"enable.auto.commit"?->?(false:?java.lang.Boolean)?????)?????//kafka主題?????val?topic?=?Set("kafka8")?????//從kafka獲取數據?????val?stream?=?KafkaUtils.createDirectStream[String,?String](???????ssc,???????PreferConsistent,???????Subscribe[String,?String](topic,?kafkaParams)?????)?????//具體的業務邏輯?????val?kafkaValue:?DStream[String]?=?stream.flatMap(line=>Some(line.value()))?????val?peopleStream?=?kafkaValue???????.map(_.split(":"))???????//形成people樣例對象???????.map(m=>people(m(0),m(1),m(2).toInt))?????//存入ES?????peopleStream.foreachRDD(rdd?=>{???????EsSpark.saveToEs(rdd,?"people/man")?????})?????//啟動程序入口?????ssc.start()?????ssc.awaitTermination()???}?}?復制代碼


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

灵丘县| 甘孜县| 长春市| 彝良县| 双柏县| 钦州市| 扎囊县| 甘肃省| 慈利县| 阿勒泰市| 高青县| 洮南市| 靖西县| 遂川县| 新干县| 仲巴县| 惠安县| 新河县| 德格县| 灵璧县| 渭源县| 景德镇市| 清水县| 宁城县| 淄博市| 自治县| 蓬莱市| 嘉黎县| 水富县| 桃园市| 波密县| 杂多县| 嵩明县| 永登县| 玛曲县| 弥渡县| 延津县| 宣威市| 汕头市| 芜湖县| 静乐县|