您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何將數據按指定格式存入zookeeper”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何將數據按指定格式存入zookeeper”吧!
環境:
scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zk import java.util.concurrent.TimeUnit import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.recipes.locks.InterProcessMutex import org.apache.curator.retry.ExponentialBackoffRetry import org.slf4j.LoggerFactory import scala.collection.JavaConversions._ import scala.collection.mutable /** * Created by ganwei on 2018/08/21 * 要求: * 1 通過storeOffsets方法把數據存入zookeeper中。 * 存儲格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * 2 通過obtainOffsets方法把存入的數據讀取出來 * 輸出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */ object ZkConnectApp{ val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass) val client = { val client = CuratorFrameworkFactory .builder .connectString("172.16.100.31:2181") .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace("consumers") .build() client.start() client } def lock(path: String)(body: => Unit) { val lock = new InterProcessMutex(client, path) lock.acquire() try { body } finally { lock.release() } } def tryDo(path: String)(body: => Unit): Boolean = { val lock = new InterProcessMutex(client, path) if (!lock.acquire(10, TimeUnit.SECONDS)) { LOG.info(s"不能獲得鎖 {$path},已經有任務在運行,本次任務退出") return false } try { LOG.info("獲準運行") body true } finally { lock.release() LOG.info(s"釋放鎖 {$path}") } } //zookeeper創建路徑 def ensurePathExists(path: String): Unit = { if (client.checkExists().forPath(path) == null) { client.create().creatingParentsIfNeeded().forPath(path) } } /** * OffsetRange類定義(偏移量對象) * 用于存儲偏移量 */ case class OffsetRange( val topic:String, // 主題 val partition:Int, // 分區 val fromOffset:Long, // 起始偏移量 val utilOffset:Long // 終止偏移量 ) /** * zookeeper存儲offset的方法 * 寫入格式: * /consumers/G322/offsets/ruoze_offset_topic/partition/0 * /consumers/G322/offsets/ruoze_offset_topic/partition/1 * /consumers/G322/offsets/ruoze_offset_topic/partition/2 * @param OffsetsRanges * @param groupName */ def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={ val offsetRootPath = s"/"+groupName if (client.checkExists().forPath(offsetRootPath) == null) { client.create().creatingParentsIfNeeded().forPath(offsetRootPath) } for(els <- OffsetsRanges ){ val data = String.valueOf(els.utilOffset).getBytes val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}" // 創建路徑 ensurePathExists(path) // 寫入數據 client.setData().forPath(path, data) } } /** * TopicAndPartition類定義(偏移量key對象) * 用于提取偏移量 */ case class TopicAndPartition( topic:String, // 主題 partition:Int // 分區 ) /** * zookeeper提取offset的方法 * @param topic * @param groupName * @return */ def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={ // 定義一個空的HashMap val maps = mutable.HashMap[TopicAndPartition,Long]() // offset的路徑 val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition" // 判斷路徑是否存在 val stat = client.checkExists().forPath(s"$offsetRootPath") if (stat == null ){ println(stat) // 路徑不存在 就將路徑打印在控制臺,檢查路徑 }else{ // 獲取 offsetRootPath路徑下一級的所有子目錄 // 我們這里是獲取的所有分區 val children = client.getChildren.forPath(s"$offsetRootPath") // 遍歷所有的分區 for ( lines <- children ){ // 獲取分區的數據 val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong // 將 topic partition 和數據賦值給 maps maps(TopicAndPartition(topic,lines.toInt)) = data } } // 按partition排序后 返回map對象 maps.toList.sortBy(_._1.partition).toMap } def main(args: Array[String]) { //定義初始化數據 val off1 = OffsetRange("ruoze_offset_topic",0,0,7) val off2 = OffsetRange("ruoze_offset_topic",1,0,3) val off3 = OffsetRange("ruoze_offset_topic",2,0,5) val arr = Array(off1,off2,off3) //獲取到namespace // println(client.getNamespace) // 創建路徑 // val offsetRootPath = "/G322" // if (client.checkExists().forPath(offsetRootPath) == null) { // client.create().creatingParentsIfNeeded().forPath(offsetRootPath) // } //存儲值 storeOffsets(arr,"G322") //獲取值 /** * 輸出格式: * topic:ruoze_offset_topic partition:0 offset:7 * topic:ruoze_offset_topic partition:1 offset:3 * topic:ruoze_offset_topic partition:2 offset:5 */ val result = obtainOffsets("ruoze_offset_topic","G322") for (map <- result){ println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2) } } }
感謝各位的閱讀,以上就是“如何將數據按指定格式存入zookeeper”的內容了,經過本文的學習后,相信大家對如何將數據按指定格式存入zookeeper這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。