亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

五、spark--spark streaming原理和使用

發布時間:2020-07-31 09:18:08 來源:網絡 閱讀:455 作者:隔壁小白 欄目:大數據

一、spark-streaming概述

1.1 常用的實時計算引擎

實時計算引擎也叫流式計算引擎,常用的目前有3個:
1、Apache Storm:真正的流式計算
2、Spark Streaming:嚴格上來說,不是真正的流式計算(實時計算)
? 把連續的流式數據,當成不連續的RDD來處理
? 本質:是一個離散計算(不連續的數據)
? 面試中問到時:先說它的本質,
? 然后說自己的理解
? 常用的方法
? 和其他同類型技術的對比
3、Apache Flink:真正的流式計算。和Spark Streaming相反。
? 本質:一個流式計算,雖然可以用于離線計算,但是本質上是將離散數據模擬成流式數據來給flink做流式計算

?

1.2 spark-streaming是什么

? Spark Streaming是核心Spark API的擴展,可實現可擴展、高吞吐量、可容錯的實時數據流處理。數據可以從諸如Kafka,Flume,Kinesis或TCP套接字等眾多來源獲取,并且可以使用由高級函數(如map,reduce,join和window)開發的復雜算法進行流數據處理。最后,處理后的數據可以被推送到文件系統,數據庫和實時儀表板。而且,您還可以在數據流上應用Spark提供的機器學習和圖處理算法。

特點:
1、易用:集成在Spark中
2、容錯性:底層RDD,RDD本身就具備容錯機制。
3、支持多種編程語言:Java Scala Python

?

1.3 spark-streaming架構

spark-streaming用來接收實時數據,然后處理程序通過類似于定時采樣的方式分批取得數據,每一批數據就是一個RDD,最終輸入給處理程序的是一個RDD隊列流,這個流其實就是discretizedstream或DStream。在內部,DStream 由一個RDD序列表示。DStream對象就是可以用來調用各種算子進行處理
五、spark--spark streaming原理和使用

? 圖1.1 DStream原理

1.4 案例演示--NetworkWordCount

首先啟動netcat服務器,并監聽在端口1234上

nc -l 1234

沒有這個命令就 yum -y install netcat 安裝一下

接著啟動spark-streaming樣例程序,從本地的1234端口獲取數據,并進行wordcount操作

到spark的安裝目錄下,執行bin目錄下的命令:
bin/run-example streaming.NetworkWordCount localhost 1234

然后在netcat端輸入各種字符串:

[root@bigdata121 hive-1.2.1-bin]# nc -l 1234
king king hello

在另外一個窗口查看統計信息:

-------------------------------------------
Time: 1567005584000 ms
-------------------------------------------
(hello,1)
(king,2)

這邊就立馬統計出來了

1.5 自行編寫NetworkWordCount

首先maven中pom.xml記得再加上streaming的依賴(為了方便最好spark所有組件的依賴都加上)
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>king</groupId>
    <artifactId>sparkTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.8.0</version>
        </dependency>

    </dependencies>

    <!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

代碼:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * wordcount流式計算程序
  *
  * 1、創建streamingContext對象
  *    創建DStream流(離散流)
  *    本質是離散計算
  *
  *    離散:將連續數據變成離散數據,并實時立刻處理
  *    離線:并非是實時處理的
  *
  * 2、DStream表現形式就是RDD
  *    和操作RDD一樣
  *
  * 3、使用DStream將連續的數據庫切割成離散的RDD
  */
object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    //Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    /**
    這是 StreamingContext 對象的標準創建方式
    無法通過 sparkSession對象來創建
    */
    //創建streamingContext對象,指定master為local[2],意思是使用至少兩個核心,即兩個線程,一個用于發送數據,一個處理數據
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //這里指定conf對象,還有批處理的時間間隔為3秒,每3秒切一個rdd,然后處理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //創建接收數據源,這里創建socketstream,接收數據,內部會自動切割成一個個rdd。
    //指定監聽的主機端口
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //wordcount流程
    val rdd1 = streamText.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //打印結構
    rdd1.print()

    //啟動streamingContext,開始計算
    streamingContext.start()

    //等待任務結束
    streamingContext.awaitTermination()
  }

}

在bigdata121虛擬機上啟動netcat服務:

nc -l 1234

idea中運行上面的程序,并在netcat中輸入字符,結構和實例的一樣

二、streaming基本原理和使用

2.1 StreamingContext對象的概念

1、StreamingContext會內在的創建一個SparkContext的實例(所有Spark功能的起始點),你可以通過ssc.sparkContext訪問到這個實例。

2、一旦一個StreamingContext開始運作,就不能設置或添加新的流計算。

3、一旦一個上下文被停止,它將無法重新啟動。

4、同一時刻,一個JVM中只能有一個StreamingContext處于活動狀態。

5、StreamingContext上的stop()方法也會停止SparkContext。 要僅停止StreamingContext(保持SparkContext活躍),請將stop() 方法的可選參數stopSparkContext設置為false。

6、只要前一個StreamingContext在下一個StreamingContext被創建之前停止(不停止SparkContext),SparkContext就可以被重用來創建多個StreamingContext。

?

2.2 離散流(DStreams):Discretized Streams

? DStream對象可以說整個spark-streaming程序的一個數據的出口,處理的數據都從這里來。前面也說了,這個對象里面其實一個個的RDD,這是DStream的本質。而且經過算子的轉換之后,DStream仍舊是DStream對象,里面也還是RDD。所以算子轉換的過程和普通RDD的概率類似。總的來說streaming程序中,就是DStream之間的轉換,本質上就是DStream中的RDD的轉換

?

2.3 DStream的算子

算子列表:
五、spark--spark streaming原理和使用

? 圖2.1 DStream算子

和普通rdd很類似,有兩個比較特殊的算子,transform和updateStateByKey

2.3.1 transform

transform(RDD[T]=>RDD[U])
是一個用于將dstream中的rdd轉換成新的rdd的算子。所以要注意,這個算子中的處理函數是接收rdd作為參數,不像其他算子是接收rdd中的數據作為參數的。

例子:
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //這里指定conf對象,還有批處理的時間間隔為4秒,每4秒切一個rdd,然后處理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //創建socketstream,接收數據,內部會自動切割成一個個rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //接收的函數參數中就是rdd,然后在里面對rdd進行處理,最后返回新的rdd
    streamText.transform(rdd=>{
      rdd.flatMap(_.split(" "))
    })

2.3.2 updateStateByKey

? 默認情況下,Spark Streaming 不記錄之前的狀態,每次發一條數據,都從0開始。比如說進行單詞統計時,之前統計的單詞數量并不會累加到下一次的統計中,下一次是從0開始計數的。如果想進行累加操作,使用這個算子來實現這個功能

updateStateByKey((Seq[T],Option[S])=>Option[S])
這個算子接收的函數的參數要求有兩個:
Seq[T]:當前對key進行分組后,同一個key的value的一個集合,比如("age",[1,2,1,1])中的[1,2,1,1]
Option[S]:同一個key,在此之前的value總和,也就是這個key之前的計數狀態
返回值是之前的計數+現在的計數的一個返回值

例子:
下面將之前的wordcount改變一些,實現單詞的持續計數,不會每次都重新從0開始計數

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 測試updateStateByKey 進行狀態的累加
  */
object TestUpdateState {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //創建streamingContext對象,指定master為local[2],意思是使用至少兩個核心,即兩個線程,一個用于發送數據,一個處理數據
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //這里指定conf對象,還有批處理的時間間隔為4秒,每4秒切一個rdd,然后處理.
    val streamingContext = new StreamingContext(conf, Seconds(3))

    //設置檢查點,保存之前狀態,需要保證目錄不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming")

    //創建socketstream,接收數據,內部會自動切割成一個個rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //切割數據,并添加計數對
    val wordPair = streamText.flatMap(_.split(" ")).map((_,1))

    //累加處理函數
    val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
      //當前值累加
      val currentSum = currentValues.sum

      //取出之前的值.如果值不存在就返回0
      val pre = previousValue.getOrElse(0)

      //之前和現在的值相加
      Option(pre + currentSum)
    }

    //更新,將舊計數更新為新計數狀態
    wordPair.updateStateByKey(addFunc).print()

    //啟動streamingContext,開始計算
    streamingContext.start()

    //等待任務結束
    streamingContext.awaitTermination()
  }
}

運行這個demo的過程出現的報錯:

Caused by: java.lang.ClassNotFoundException: org.apache.commons.io.Charsets

說是沒有org.apache.commons.io.Charsets 這個類,進去org.apache.commons.io看了下,果然沒有,估計是包版本太舊了,沒有這個類,百度了一下,2.5版本的有這個類,所以就在pom.xml添加上新的依賴

<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.5</version>
</dependency>

接著運行,OK了

2.3.3 foreachRDD

這個算子類似forech,但是操作的對象是整個rdd,不是rdd中的某些數據。

foreachRDD(RDD[T]=>Unit)
一般用于將rdd的結果寫入其他存儲中,比如hdfs,mysql等

下面有一個關于 foreachRDD和sql 的例子。

2.4 窗口操作

應用場景:
一般用于統計最近N小時的數據,這樣的應用的場景,這時候就需要窗口

2.4.1 原理

原理圖:
五、spark--spark streaming原理和使用

? 圖2.2 spark-streaming窗口操作

? 窗口其實就是DStream的基礎上,再加上一個時間范圍。如圖所示,每當窗口滑過originalDStream時,落在窗口內的源RDD被組合并被執行操作以產生windowed DStream的RDD。在上面的例子中,操作應用于最近3個時間單位的數據,并以2個時間單位滑動。所以窗口操作比起普通的DStream操作,普通的DStream是一個個RDD處理,而窗口則是一個時間范圍內的RDD一起處理。而且窗口是DStream再上一層的一個封裝。
? 使用窗口的時候,有兩個關鍵參數:
窗口長度(windowlength):窗口的時間長度(上圖的示例中為:3)
滑動間隔(slidinginterval): 兩次相鄰的窗口操作的間隔(即每次滑動的時間長度)(上圖示例中為:2)
而且要注意的一點是:這兩個參數必須是源DStream的采樣間隔的倍數(上圖示例中為:1)。因為如果不是整數倍,就會導致窗口邊緣會將一個rdd分隔成兩份,這樣是不行的,spark沒辦法處理半個rdd,rdd是不可分的。

2.4.2 窗口操作的相關算子

window(windowLength, slideInterval)
->基于源DStream產生的窗口化的批數據計算一個新的DStream

countByWindow(windowLength, slideInterval)
->返回流中元素的一個滑動窗口數

reduceByWindow(func, windowLength, slideInterval)
->返回一個單元素流。利用函數func聚集滑動時間間隔的流的元素創建這個單元素流。函數必須是相關聯的以使計算能夠正確的并行計算。

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
->應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每一個key的值均由給定的reduce函數聚集起來。注意:在默認情況下,這個算子利用了Spark默認的并發任務數去分組。你可以用numTasks參數設置不同的任務數

reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
->上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce計算結果遞增地計算每個窗口的reduce值。這是通過對進入滑動窗口的新數據進行reduce操作,以及“逆減(inverse reducing)”離開窗口的舊數據來完成的。一個例子是當窗口滑動時對鍵對應的值進行“一加一減”操作。但是,它僅適用于“可逆減函數(invertible reduce functions)”,即具有相應“反減”功能的減函數(作為參數invFunc)。 像reduceByKeyAndWindow一樣,通過可選參數可以配置reduce任務的數量。 請注意,使用此操作必須啟用檢查點。

countByValueAndWindow(windowLength, slideInterval, [numTasks])
->應用到一個(K,V)對組成的DStream上,返回一個由(K,V)對組成的新的DStream。每個key的值都是它們在滑動窗口中出現的頻率。

比較常用的是reduceByKeyAndWindow這個,常用于統計固定最近一段時間內的數據,比如統計最近1小時訂單銷售量。下面把這個算子應用到wordcount例子中。

2.4.3 例子

窗口大小為30s,每10s滑動一次窗口,并且對單詞的計數是累加的

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 測試updateStateByKey 進行狀態的累加
  */
object TestUpdateState {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //創建streamingContext對象,指定master為local[2],意思是使用至少兩個核心,即兩個線程,一個用于發送數據,一個處理數據
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //這里指定conf對象,還有批處理的時間間隔為4秒,每4秒切一個rdd,然后處理.
    val streamingContext = new StreamingContext(conf, Seconds(1))

    //設置檢查點,保存之前狀態,需要保證目錄不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")

    //創建socketstream,接收數據,內部會自動切割成一個個rdd
    val streamText = streamingContext.socketTextStream("bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    //切割數據,并添加計數對
    val wordPair = streamText.flatMap(_.split(" ")).map((_,1))

    //在這里添加一個窗口操作
    val windowValue = wordPair.reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(30), Seconds(10))

    //累加處理函數
    val addFunc = (currentValues:Seq[Int], previousValue:Option[Int]) => {
      //當前值累加
      val currentSum = currentValues.sum

      //取出之前的值.如果值不存在就返回0
      val pre = previousValue.getOrElse(0)

      //之前和現在的值相加
      Option(pre + currentSum)
    }

    //更新,將舊計數更新為新計數狀態
    //wordPair.updateStateByKey(addFunc).print()
    windowValue.updateStateByKey(addFunc).print()

    //啟動streamingContext,開始計算
    streamingContext.start()

    //等待任務結束
    streamingContext.awaitTermination()
  }
}

2.5 sql操作

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 將streaming的DStream轉為可以使用sql操作
  */
object StreamingAndSql {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("streaming and sql").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(2))

    val lines = ssc.socketTextStream("bigdata121",1234, StorageLevel.MEMORY_ONLY)

    val words = lines.flatMap(_.split(" "))

    //需要將rdd轉為df對象,才能用于spark sql操作
    words.foreachRDD(rdd => {
      //從rdd中獲取conf配置,保證配置和rdd的配置一樣
      val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
      import spark.sqlContext.implicits._
      //rdd轉為df,并指定列名
      val df = rdd.toDF("word")
      //創建視圖并執行sql
      df.createOrReplaceTempView("tmp1")
      val resultDF = spark.sql("select word,count(1) from tmp1 group by word")
      resultDF.show()
    })

    ssc.start()
    ssc.awaitTermination()

  }

}

?

2.6 checkpoint檢查點

這個和rdd中類似,只不過streaming中是通過 StreamingContext對象進行checkpoint:

//創建streamingContext對象,指定master為local[2],意思是使用至少兩個核心,即兩個線程,一個用于發送數據,一個處理數據
    val conf = new SparkConf().setAppName("streaming wordcount").setMaster("local[2]")
    //這里指定conf對象,還有批處理的時間間隔為4秒,每4秒切一個rdd,然后處理.
    val streamingContext = new StreamingContext(conf, Seconds(1))

    //設置檢查點,保存之前狀態,需要保證目錄不存在
    streamingContext.checkpoint("hdfs://bigdata121:9000/sparkCheckpoint/spark-streaming3")

?

三、streaming的數據源

3.1 基本數據源

文件流:textFileStream
套接字流:socketTextStream/sockeStream,前面已經講過例子,這里不重復
RDD隊列流:queueStream

1、textFileStream
通過監控文件系統的變化,若有新文件添加,則將它讀入并作為數據流
需要注意的是:
這些文件具有相同的格式
這些文件通過原子移動或重命名文件的方式在dataDirectory創建
如果在文件中追加內容,這些追加的新數據也不會被讀取。

例子:
package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFromFile {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark window operation").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    val fileStream = ssc.textFileStream("G:\\test\\teststreaming")

    fileStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

//==========================================================
2、queueStream
RDD隊列流是從一個隊列中讀取RDD
例子:
package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

object StreamingFromRDDQueue {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark streaming rdd queue").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    //創建隊列
    val rddQueue = new mutable.Queue[RDD[Int]]()

    //隊列中添加rdd
    for (x<- 1 to 3) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)
    }

    //從隊列讀取rdd
    val queueRdd = ssc.queueStream(rddQueue).map(_*2)
    queueRdd.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

3.2 高級數據源

高級數據源一般在生產中比較常用,很少使用spark直接監控數據的。常用的高級數據源有Kafka,Flume,Kinesis,Twitter等等。下面主要講解flume

3.2.1flume

1、flume推送數據到計算節點
(1)首先配置flume的agent配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

# 監控目錄
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

# 我這里是在ide中直接運行spark程序,所以flume數據直接推導windows主機上
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=192.168.50.1
a1.sinks.k1.port=1234

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

(2)spark代碼
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkDemo</groupId>
    <artifactId>SparkDemoTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.1.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.1.0</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
            <scope>runtime</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>1.2.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-configuration</artifactId>
            <version>1.8.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->

        <!--這里是spark從flume讀取數據的依賴,不要忘了-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume-sink -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume-sink_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.5</version>
        </dependency>

    </dependencies>

    <!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

依賴這里,方便起見,直接添加flume和spark的全部依賴,自己到maven的官方庫上搜索,然后添加就可以。接著最重要的是 spark使用flume的依賴的spark-streaming-flume 這個包,不要漏了。如果在集群中運行,記得將這個包放到spark的jars目錄下

代碼:

package SparkStreamExer

import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StreamingFromFlume {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("spark streaming from flume").setMaster("local[2]")
    conf.registerKryoClasses(Array())

    val ssc = new StreamingContext(conf, Seconds(4))

    //創建flumeevent,接收從flume push來的數據
    val flumeDStream = FlumeUtils.createStream(ssc, "192.168.50.1", 1234, StorageLevel.MEMORY_ONLY)

    val eventDStream = flumeDStream.map(event => {
      (event.event.getHeaders.toString,new String(event.event.getBody.array()))
    })

    eventDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }

}

(3)啟動:

先啟動spark程序,直接在ide中運行。
接著啟動flume:flume-ng agent --conf conf --name a1 --conf-file conf/flume-spark.properties  -Dflume.root.logger=INFO,console

然后自己在監控目錄下修改文件,或者添加文件。
接著查看ide中輸出的數據

2、spark從flume拉取數據
這種方式比起第一種方式要更加靈活,可擴展性高。
(1)flume配置文件

a1.sources=r1
a1.channels=c1
a1.sinks=k1

a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
a1.sources.r1.filegroups.f1=/opt/modules/apache-flume-1.8.0-bin/logs/.*
a1.sources.r1.fileHeader=true

a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100

# 這里使用spark自己實現的一個sink
a1.sinks.k1.type=org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.50.121
a1.sinks.k1.port=1234

a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

另外,需要將spark-streaming-flume-sink_2.11-2.1.0.jar 這個jar包添加到flume的lib目錄下,這是上面使用的SparkSink所在的jar包。可以自己在idea中添加這個依賴,然后下載,接著到本地倉庫目錄復制到flume的lib下。
(2)代碼
pom.xml

和上面類似,只是多了
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>2.1.0</version>
</dependency>
這個依賴        

代碼:

package SparkStreamExer

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FromFlumePull {
  def main(args: Array[String]): Unit = {
    //設置日志級別為ERROR,默認是INFO
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    val conf = new SparkConf().setAppName("flume through spark sink").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(4))

    //創建 poll streaming,從flume拉取數據到本地處理
    val flumePollingStream = FlumeUtils.createPollingStream(ssc, "bigdata121", 1234, StorageLevel.MEMORY_ONLY)

    /**
      * 這里要注意:
      * event.event.getBody.array() 不要直接 toString,解析處理的字符串只是[class name]@[hashCode]的形式
      * 應該用 New string(event.event.getBody.array()) 這樣會根據默認編解碼規則給bytes字符串解碼
      * 因為傳輸過來的是bytes數據
      */
    flumePollingStream.map(event=>{
      (event.event.getHeaders.toString, new String(event.event.getBody.array()))
    }).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

(3)啟動
啟動方式和上面類似,這里不重復。

(4)遇到的問題
問題1:
已經將spark-streaming-flume-sink_2.11.jar包放到flume的lib目錄下,flume的agent啟動時報錯:

29 Aug 2019 17:59:31,838 WARN  [Spark Sink Processor Thread - 10] (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  - Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
        at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

 重點在:java.lang.IllegalStateException: begin() called when transaction is OPEN!
有可能是flume的一些jar包的問題,具體還不清楚。

屢次報這個錯,最后看了看flume的lib下的scala包

scala-library-2.10.5.jar

是這個版本,我放進去的sparksink包是基于 scala 2.11.8的,所以我在想是不是scala library包版本不對,所以從spark的jar目錄下拷貝scala-library-2.11.8.jar 這個包過去flume下,將原來的重命名,不讓flume使用舊的。
接著重新啟動flume agent,正常運行。

所以這個問題是因為依賴包版本不對應的問題發生的。

問題2:
讀取body的時候,直接toString和new String有區別,前者亂碼,后者還原原本字符串

toString()與new String ()用法區別

str.toString是調用了b這個object對象的類的toString方法。一般是返回這么一個String:[class name]@[hashCode]。
new String(str)是根據parameter是一個字節數組,使用Java虛擬機默認的編碼格式,將這個字節數組decode為對應的字符。若虛擬機默認的編碼格式是ISO-8859-1,按照ascii編碼表即可得到字節對應的字符。

什么時候用什么方法呢?
new String()一般使用字符轉碼的時候,byte[]數組的時候
toString()將對象打印的時候使用 
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

淳安县| 盐津县| 建阳市| 辽宁省| 瑞丽市| 措美县| 开远市| 边坝县| 本溪| 湟中县| 会东县| 明溪县| 桐城市| 盘山县| 太白县| 苏尼特左旗| 元谋县| 嘉峪关市| 峡江县| 安国市| 都昌县| 大姚县| 新绛县| 黎川县| 枞阳县| 安庆市| 连平县| 西畴县| 双柏县| 盐池县| 剑河县| 夹江县| 邵东县| 翁源县| 凤台县| 沈阳市| 武安市| 阿图什市| 莎车县| 乃东县| 祁阳县|