您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關spark如何通過classloader實現對于hive metastore的兼容性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
我們只是簡單的提了一下按照官網的配置就能夠兼容不同的hive元數據,這次我們從代碼級別來分析一下spark是怎么做到實現不同版本的元數據的訪問。 注意:正如官網所說的,該部分只是用于hive元數據的訪問,spark sql內部編譯的其他版本的hive用于來進行其他執行,如序列化和反序列化,UDF和UDAF等等
這里提到這一點是為了釋疑一下在源碼中看到一些低版本不存在的類,因為這部分spark sql內置了其他版本的hive用于除了hive元數據之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 這個類在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的
我們以spark 3.1.1進行分析
我們知道spark跟外部元數據的交互是類ExternalCatalog來進行響應的,對應到hive元數據就是HiveExternalCatalog,轉到client代碼:
/** * A Hive client used to interact with the metastore. */ lazy val client: HiveClient = { HiveUtils.newClientForMetadata(conf, hadoopConf) }
該client在就是進行元數據交互的最終執行者,且這里直接調用了HiveUtils的newClientForMetadata方法,直接跳到最終調用的方法:
protected[hive] def newClientForMetadata( conf: SparkConf, hadoopConf: Configuration, configurations: Map[String, String]): HiveClient = { val sqlConf = new SQLConf sqlConf.setConf(SQLContext.getSQLProperties(conf)) val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) ... } else if (hiveMetastoreJars == "path") { // Convert to files and expand any directories. val jars = HiveUtils.hiveMetastoreJarsPath(sqlConf) .flatMap { case path if path.contains("\\") && Utils.isWindows => addLocalHiveJars(new File(path)) case path => DataSource.checkAndGlobPathIfNecessary( pathStrings = Seq(path), hadoopConf = hadoopConf, checkEmptyGlobPath = true, checkFilesExist = false, enableGlobbing = true ).map(_.toUri.toURL) } logInfo( s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " + s"using path: ${jars.mkString(";")}") new IsolatedClientLoader( version = metaVersion, sparkConf = conf, hadoopConf = hadoopConf, execJars = jars.toSeq, config = configurations, isolationOn = true, barrierPrefixes = hiveMetastoreBarrierPrefixes, sharedPrefixes = hiveMetastoreSharedPrefixes) ...
val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 這里直接獲取配置的元數據的版本,也就是spark.sql.hive.metastore.version配置項
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 這里配置hive元數據jar包的獲取方式,默認是builtin內置,推薦使用path方式,因為一般線上環境是無網絡環境 val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 這兩個跟classloader有關,也就是說什么類用哪種classloader加載,用來隔離class
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark內部的hive版本表示,用于進行元數據class的精細化操作
這里會根據配置的獲取元數據jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最終會調用isolatedLoader的createClient方法:
/** The isolated client interface to Hive. */ private[hive] def createClient(): HiveClient = synchronized { val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)) if (!isolationOn) { return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") val origLoader = Thread.currentThread().getContextClassLoader Thread.currentThread.setContextClassLoader(classLoader) try { classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException => if (e.getCause().isInstanceOf[NoClassDefFoundError]) { val cnf = e.getCause().asInstanceOf[NoClassDefFoundError] throw new ClassNotFoundException( s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" + "Please make sure that jars for your version of hive and hadoop are included in the " + s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e) } else { throw e } } finally { Thread.currentThread.setContextClassLoader(origLoader) } }
如果未開啟隔離性,則直接返回HiveClientImpl,該client所有終端用戶共享。如果開啟了(默認值),則設置當前的contextClassLoader為classLoader: 該classLoader是自定義的:
... new URLClassLoader(allJars, rootClassLoader) { override def loadClass(name: String, resolve: Boolean): Class[_] = { val loaded = findLoadedClass(name) if (loaded == null) doLoadClass(name, resolve) else loaded } def doLoadClass(name: String, resolve: Boolean): Class[_] = { val classFileName = name.replaceAll("\\.", "/") + ".class" if (isBarrierClass(name)) { // For barrier classes, we construct a new copy of the class. val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName)) logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}") defineClass(name, bytes, 0, bytes.length) } else if (!isSharedClass(name)) { logDebug(s"hive class: $name - ${getResource(classToPath(name))}") super.loadClass(name, resolve) } else { // For shared classes, we delegate to baseClassLoader, but fall back in case the // class is not found. logDebug(s"shared class: $name") try { baseClassLoader.loadClass(name) } catch { case _: ClassNotFoundException => super.loadClass(name, resolve) } } } } } ...
直接重點,對于開啟了隔離(默認值),則直接返回該classLoader,關于classloader的知識,可以參考這里,要是還有真不明白的,可以參考classLoader類的源碼。
這里我們重點觀察一下該自定義classloader的loadClass方法,該方法是實現類隔離的關鍵,
如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定義的前綴.則從當前的ContextClassLoader中復制一份class類,且生成對應的class
如果不是共享類,也不是BarrierClass,則使用URLClassLoader的loadClass方法加載class
否則不是barrierClass,是共享類,則用當前contextclassloader來加載當前class
通過該classLoader加載的方式,對于跟hive元數據相關的class就是通過該自定義的classLoader加載的(注意子classloader能夠看見父加載器加載的類)
之后通過該classloader加載對應的HiveClientImpl類,進行反射實例化HiveClientImpl對象,從而實現了在運行的時候,根據傳入的元數據jar包進行動態加載.
重置當前線程的contextClassLoader。
重點:hive元數據的jar包的動態記載是通過自定義classloader實現的
至于真正的和hive元數據進行交互就是HiveClientImpl,該類引入了shim的機制,也就是說,通過該shim機制,對于hive元數據版本的升級都是通過該shim來進行控制,比如增加方法,就會在shim中增加對應的方法,從而達到hive元數據的向后兼容性。 其實從shim這個英文單詞中我們也能看出一二,shim(墊片)是為了切合版本的升級而做的墊片。
看完上述內容,你們對spark如何通過classloader實現對于hive metastore的兼容性有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。