亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark如何通過classloader實現對于hive metastore的兼容性

發布時間:2021-12-17 10:36:02 來源:億速云 閱讀:387 作者:柒染 欄目:大數據

今天就跟大家聊聊有關spark如何通過classloader實現對于hive metastore的兼容性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

背景

我們只是簡單的提了一下按照官網的配置就能夠兼容不同的hive元數據,這次我們從代碼級別來分析一下spark是怎么做到實現不同版本的元數據的訪問。 注意:正如官網所說的,該部分只是用于hive元數據的訪問,spark sql內部編譯的其他版本的hive用于來進行其他執行,如序列化和反序列化,UDF和UDAF等等
這里提到這一點是為了釋疑一下在源碼中看到一些低版本不存在的類,因為這部分spark sql內置了其他版本的hive用于除了hive元數據之外的其他交互,如:hive/hiveShim.scala中的SerializationUtilities 這個類在hive 1.2.1是不存在的,但是hive高版本2.3.7是存在的
我們以spark 3.1.1進行分析

分析

我們知道spark跟外部元數據的交互是類ExternalCatalog來進行響應的,對應到hive元數據就是HiveExternalCatalog,轉到client代碼:

/**
   * A Hive client used to interact with the metastore.
   */
lazy val client: HiveClient = {
    HiveUtils.newClientForMetadata(conf, hadoopConf)
  }

該client在就是進行元數據交互的最終執行者,且這里直接調用了HiveUtils的newClientForMetadata方法,直接跳到最終調用的方法:

 protected[hive] def newClientForMetadata(
      conf: SparkConf,
      hadoopConf: Configuration,
      configurations: Map[String, String]): HiveClient = {
    val sqlConf = new SQLConf
    sqlConf.setConf(SQLContext.getSQLProperties(conf))
    val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf)
    val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf)
    val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf)
    val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf)
    val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion)

...
} else if (hiveMetastoreJars == "path") {
      // Convert to files and expand any directories.
      val jars =
        HiveUtils.hiveMetastoreJarsPath(sqlConf)
          .flatMap {
            case path if path.contains("\\") && Utils.isWindows =>
              addLocalHiveJars(new File(path))
            case path =>
              DataSource.checkAndGlobPathIfNecessary(
                pathStrings = Seq(path),
                hadoopConf = hadoopConf,
                checkEmptyGlobPath = true,
                checkFilesExist = false,
                enableGlobbing = true
              ).map(_.toUri.toURL)
          }

      logInfo(
        s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion " +
          s"using path: ${jars.mkString(";")}")
      new IsolatedClientLoader(
        version = metaVersion,
        sparkConf = conf,
        hadoopConf = hadoopConf,
        execJars = jars.toSeq,
        config = configurations,
        isolationOn = true,
        barrierPrefixes = hiveMetastoreBarrierPrefixes,
        sharedPrefixes = hiveMetastoreSharedPrefixes)
...

val hiveMetastoreVersion = HiveUtils.hiveMetastoreVersion(sqlConf) 這里直接獲取配置的元數據的版本,也就是spark.sql.hive.metastore.version配置項
val hiveMetastoreJars = HiveUtils.hiveMetastoreJars(sqlConf) 這里配置hive元數據jar包的獲取方式,默認是builtin內置,推薦使用path方式,因為一般線上環境是無網絡環境 val hiveMetastoreSharedPrefixes = HiveUtils.hiveMetastoreSharedPrefixes(sqlConf) val hiveMetastoreBarrierPrefixes = HiveUtils.hiveMetastoreBarrierPrefixes(sqlConf) 這兩個跟classloader有關,也就是說什么類用哪種classloader加載,用來隔離class
val metaVersion = IsolatedClientLoader.hiveVersion(hiveMetastoreVersion) 映射成spark內部的hive版本表示,用于進行元數據class的精細化操作
這里會根據配置的獲取元數據jar包的方式而采用不同的初始化IsolatedClientLoader的方式。最終會調用isolatedLoader的createClient方法:

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = synchronized {
  val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
  if (!isolationOn) {
    return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
      baseClassLoader, this)
  }
  // Pre-reflective instantiation setup.
  logDebug("Initializing the logger to avoid disaster...")
  val origLoader = Thread.currentThread().getContextClassLoader
  Thread.currentThread.setContextClassLoader(classLoader)
  try {
    classLoader
      .loadClass(classOf[HiveClientImpl].getName)
      .getConstructors.head
      .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
      .asInstanceOf[HiveClient]
  } catch {
    case e: InvocationTargetException =>
      if (e.getCause().isInstanceOf[NoClassDefFoundError]) {
        val cnf = e.getCause().asInstanceOf[NoClassDefFoundError]
        throw new ClassNotFoundException(
          s"$cnf when creating Hive client using classpath: ${execJars.mkString(", ")}\n" +
          "Please make sure that jars for your version of hive and hadoop are included in the " +
          s"paths passed to ${HiveUtils.HIVE_METASTORE_JARS.key}.", e)
      } else {
        throw e
      }
  } finally {
    Thread.currentThread.setContextClassLoader(origLoader)
  }
}

如果未開啟隔離性,則直接返回HiveClientImpl,該client所有終端用戶共享。如果開啟了(默認值),則設置當前的contextClassLoader為classLoader: 該classLoader是自定義的:

...
new URLClassLoader(allJars, rootClassLoader) {
            override def loadClass(name: String, resolve: Boolean): Class[_] = {
              val loaded = findLoadedClass(name)
              if (loaded == null) doLoadClass(name, resolve) else loaded
            }
            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
              val classFileName = name.replaceAll("\\.", "/") + ".class"
              if (isBarrierClass(name)) {
                // For barrier classes, we construct a new copy of the class.
                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
                defineClass(name, bytes, 0, bytes.length)
              } else if (!isSharedClass(name)) {
                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
                super.loadClass(name, resolve)
              } else {
                // For shared classes, we delegate to baseClassLoader, but fall back in case the
                // class is not found.
                logDebug(s"shared class: $name")
                try {
                  baseClassLoader.loadClass(name)
                } catch {
                  case _: ClassNotFoundException =>
                    super.loadClass(name, resolve)
                }
              }
            }
          }
        }
...

直接重點,對于開啟了隔離(默認值),則直接返回該classLoader,關于classloader的知識,可以參考這里,要是還有真不明白的,可以參考classLoader類的源碼。
這里我們重點觀察一下該自定義classloader的loadClass方法,該方法是實現類隔離的關鍵,

  • 如果是BarrierClass,比如HiveClientImpl/Shim/ShimLoader,或者包含了自定義的前綴.則從當前的ContextClassLoader中復制一份class類,且生成對應的class

  • 如果不是共享類,也不是BarrierClass,則使用URLClassLoader的loadClass方法加載class

  • 否則不是barrierClass,是共享類,則用當前contextclassloader來加載當前class

通過該classLoader加載的方式,對于跟hive元數據相關的class就是通過該自定義的classLoader加載的(注意子classloader能夠看見父加載器加載的類)
之后通過該classloader加載對應的HiveClientImpl類,進行反射實例化HiveClientImpl對象,從而實現了在運行的時候,根據傳入的元數據jar包進行動態加載.
重置當前線程的contextClassLoader。

重點:hive元數據的jar包的動態記載是通過自定義classloader實現的

至于真正的和hive元數據進行交互就是HiveClientImpl,該類引入了shim的機制,也就是說,通過該shim機制,對于hive元數據版本的升級都是通過該shim來進行控制,比如增加方法,就會在shim中增加對應的方法,從而達到hive元數據的向后兼容性。 其實從shim這個英文單詞中我們也能看出一二,shim(墊片)是為了切合版本的升級而做的墊片。

看完上述內容,你們對spark如何通過classloader實現對于hive metastore的兼容性有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

枣强县| 广德县| 平湖市| 昭觉县| 长白| 富蕴县| 青海省| 聂荣县| 资源县| 修武县| 宁城县| 隆林| 永城市| 朝阳县| 靖边县| 桐城市| 昔阳县| 河池市| 寿光市| 含山县| 阿图什市| 东乡县| 鄯善县| 龙山县| 佛坪县| 泊头市| 扶余县| 宁南县| 屯昌县| 共和县| 中卫市| 桂平市| 浠水县| 华安县| 天津市| 泸西县| 西昌市| 兰溪市| 屏东县| 安仁县| 岳西县|