亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark作業怎么實現

發布時間:2022-01-14 17:12:20 來源:億速云 閱讀:132 作者:iii 欄目:大數據

這篇“spark作業怎么實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“spark作業怎么實現”文章吧。

將sample.log的數據發送到Kafka中,經過Spark Streaming處理,將數據格式變為以下形式:

commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid



在發送到kafka的另一個隊列中

要求:

1、sample.log => 讀文件,將數據發送到kafka隊列中

2、從kafka隊列中獲取數據(0.10 接口不管理offset),變更數據格式

3、處理后的數據在發送到kafka另一個隊列中

分析
1 使用課程中的redis工具類管理offset
2 讀取日志數據發送數據到topic1
3 消費主題,將數據的分割方式修改為豎線分割,再次發送到topic2

1.OffsetsWithRedisUtils

package home.one

import java.util

import org.apache.kafka.common.TopicPartition
import org.apache.spark.streaming.kafka010.OffsetRange
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

import scala.collection.mutable

object OffsetsWithRedisUtils {
  // 定義Redis參數
  private val redisHost = "linux123"
  private val redisPort = 6379

  // 獲取Redis的連接
  private val config = new JedisPoolConfig
  // 最大空閑數
  config.setMaxIdle(5)
  // 最大連接數
  config.setMaxTotal(10)

  private val pool = new JedisPool(config, redisHost, redisPort, 10000)
  private def getRedisConnection: Jedis = pool.getResource

  private val topicPrefix = "kafka:topic"

  // Key:kafka:topic:TopicName:groupid
  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"

  // 根據 key 獲取offsets
  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
    val jedis: Jedis = getRedisConnection

    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>
      val key = getKey(topic, groupId)

      import scala.collection.JavaConverters._

      // 將獲取到的redis數據由Java的map轉換為scala的map,數據格式為{key:[{partition,offset}]}
      jedis.hgetAll(key)
        .asScala
        .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }
    }

    // 歸還資源
    jedis.close()
    offsets.flatten.toMap
  }

  // 將offsets保存到Redis中
  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {
    // 獲取連接
    val jedis: Jedis = getRedisConnection

    // 組織數據
    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}
      .groupBy(_._1)
      .foreach{case (topic, buffer) =>
        val key: String = getKey(topic, groupId)

        import scala.collection.JavaConverters._
        // 同樣將scala的map轉換為Java的map存入redis中
        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava

        // 保存數據
        jedis.hmset(key, maps)
      }

    jedis.close()
  }
}
  1. KafkaProducer

package home.one

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object KafkaProducer {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 讀取sample.log文件數據
    val lines: RDD[String] = sc.textFile("data/sample.log")

    // 定義 kafka producer參數
    val prop = new Properties()
    // kafka的訪問地址
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    // key和value的序列化方式
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    // 將讀取到的數據發送到mytopic1
    lines.foreachPartition{iter =>
      // 初始化KafkaProducer
      val producer = new KafkaProducer[String, String](prop)
      iter.foreach{line =>
        // 封裝數據
        val record = new ProducerRecord[String, String]("mytopic1", line)
        // 發送數據
        producer.send(record)
      }
      producer.close()
    }
  }
}

3.HomeOne

package home.one


import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object HomeOne {
  val log = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    // 需要消費的topic
    val topics: Array[String] = Array("mytopic1")
    val groupid = "mygroup1"
    // 定義kafka相關參數
    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)
    // 從Redis獲取offset
    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)

    // 創建DStream
    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      // 從kafka中讀取數據
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)
    )

    // 轉換后的數據發送到另一個topic
    dstream.foreachRDD { rdd =>
      if (!rdd.isEmpty) {
        // 獲取消費偏移量
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // 處理數據發送到topic2
        rdd.foreachPartition(process)
        // 將offset保存到Redis
        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)
      }
    }

    // 啟動作業
    ssc.start()
    // 持續執行
    ssc.awaitTermination()
  }

  // 將處理后的數據發送到topic2
  def process(iter: Iterator[ConsumerRecord[String, String]]) = {
    iter.map(line => parse(line.value))
      .filter(!_.isEmpty)
      .foreach(line => sendMsg2Topic(line, "mytopic2"))
  }

  // 調用kafka生產者發送消息
  def sendMsg2Topic(msg: String, topic: String): Unit = {
    val producer = new KafkaProducer[String, String](getKafkaProducerParameters())
    val record = new ProducerRecord[String, String](topic, msg)
    producer.send(record)
  }

  // 修改數據格式,將逗號分隔變成豎線分割
  def parse(text: String): String = {
    try {
      val arr = text.replace("<<<!>>>", "").split(",")
      if (arr.length != 15) return ""
      arr.mkString("|")
    } catch {
      case e: Exception =>
        log.error("解析數據出錯!", e)
        ""
    }
  }

  // 定義kafka消費者的配置信息
  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
    )
  }

  // 定義生產者的kafka配置
  def getKafkaProducerParameters(): Properties = {
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop
  }

}

2

/*


假設機場的數據如下:

1, "SFO"

2, "ORD"

3, "DFW"

機場兩兩之間的航線及距離如下:

1, 2,1800

2, 3, 800

3, 1, 1400

用 GraphX 完成以下需求:

求所有的頂點
求所有的邊
求所有的triplets
求頂點數
求邊數
求機場距離大于1000的有幾個,有哪些
按所有機場之間的距離排序(降序),輸出結果
 */

代碼:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD

object TwoHome {

  def main(args: Array[String]): Unit = {
    // 初始化
    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("warn")

    //初始化數據
    val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW"))
    val edgeArray: Array[Edge[Int]] = Array(
      Edge(1L, 2L, 1800),
      Edge(2L, 3L, 800),
      Edge(3L, 1L, 1400)
    )

    //構造vertexRDD和edgeRDD
    val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)

    //構造圖
    val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)

    //所有的頂點
    println("所有頂點:")
    graph.vertices.foreach(println)

    //所有的邊
    println("所有邊:")
    graph.edges.foreach(println)

    //所有的triplets
    println("所有三元組信息:")
    graph.triplets.foreach(println)

    //求頂點數
    val vertexCnt = graph.vertices.count()
    println(s"總頂點數:$vertexCnt")

    //求邊數
    val edgeCnt = graph.edges.count()
    println(s"總邊數:$edgeCnt")

    //機場距離大于1000的
    println("機場距離大于1000的邊信息:")
    graph.edges.filter(_.attr > 1000).foreach(println)

    //按所有機場之間的距離排序(降序)
    println("降序排列所有機場之間距離")
    graph.edges.sortBy(-_.attr).collect().foreach(println)
  }
}

運行結果 spark作業怎么實現

以上就是關于“spark作業怎么實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

元朗区| 珠海市| 双城市| 驻马店市| 武平县| 淮北市| 肇州县| 盖州市| 玛多县| 陕西省| 乐山市| 徐汇区| 类乌齐县| 大庆市| 无棣县| 尼玛县| 阿城市| 东至县| 五莲县| 小金县| 曲靖市| 洛阳市| 淮安市| 宝应县| 玉龙| 清涧县| 盐边县| 马尔康县| 荣昌县| 商河县| 玉田县| 萨嘎县| 凤台县| 万源市| 溧阳市| 綦江县| 唐海县| 德惠市| 惠来县| 元江| 法库县|