亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

KAFKA中rebalance是什么

發布時間:2021-11-22 09:48:14 來源:億速云 閱讀:348 作者:iii 欄目:云計算

本篇內容介紹了“KAFKA中rebalance是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!


一、寫在前面

讓我們從頭到尾梳理一下rebalance。


二、什么是rebalance?

中文直譯,就是重平衡。
是什么去重平衡呢?消費組內的消費者成員去重平衡。(消費組的概念如果不清楚各位先自行百度,后續我寫到消費模塊的時候才會提到這些概念)
為什么需要重平衡呢?因為消費組內成員的故障轉移和動態分區分配。

翻譯一下:
消費組內成員的故障轉移:當一個消費組內有三個消費者A,B,C,分別消費分區:a,b,c

A -> a
B -> b
C -> c

此時如果A消費者出了點問題,那么就意味著a分區沒有消費者進行消費了,那這肯定不行,那么就通過rebalance去將a分區分配給其他還存活著的消費者客戶端,rebalance后可能得到的消費策略:

A -> a (GG)
B -> b,a
C -> c

這就是消費組內成員的故障轉移,就是某個消費者客戶端出問題之后把它原本消費的分區通過REBALNACE分配給其他存活的消費者客戶端。

動態分區分配:當某個topic的分區數變化,對于消費組而言可消費的分區數變化了,因此就需要rebalance去重新進行動態分區分配,舉個栗子,原本某topic只有3個分區,我現在擴成了10個分區,那么不就意味著多了7個分區沒有消費者消費嗎?這顯然是不行的,因此就需要rebalance過程去進行分區分配,讓現有的消費者去把這10個分區全部消費到。


三、rebalance是怎么觸發的?

這個其實在上面一小節已經提到的差不多了,在這個小節再做一點補充和總結。
觸發條件:

  1. 消費組內成員變化:下線/上線/故障被踢出。

  2. 消費的分區數變化:topic被刪了,topic分區數增加了。

  3. coordinator節點出問題了:因為消費組的元數據信息都是在coordinator節點的,因此coordinator節點出問題也會觸發rebalance去找一個新的coordinator節點。怎么找呢?顯然就是走一遍FIND_COORDINATOR請求嘛,然后找到負載最低的那個節點問一下,我的新的coordinator在哪兒呀?然后得到答案之后讓消費者客戶端去連新的coordinator節點。


四、rebalance的宏觀過程

整個rebalance的過程,是一個狀態機流轉的過程,整體過程示意圖如下:圖源:https://www.cnblogs.com/huxi2b/p/6815797.html KAFKA中rebalance是什么
其實上面這個狀態機流轉過程在明白原理的情況下,已經非常清晰了,但是如果沒看過源碼的,依舊不知道為什么是這么流轉的,什么情況下狀態是Empty呢,什么狀態下是Stable呢?什么時候Empty狀態會轉換為PreparingRebalance狀態呢?
下面我就根據請求順序來看下整個狀態的流轉過程: KAFKA中rebalance是什么
讓我們來回答上個小節后面提出的幾個比較細節的問題:

這些請求都帶有哪些關鍵數據?
在FIND_COORDINATOR請求的時候,會帶上自己的group.id值,這個值是用來計算它的coordinator到底在哪兒的,對應的計算方法就是:coordinatorId=groupId.hash % 50 這個算出來是個數字,代表著具體的分區,哪個topic的分區呢?顯然是__consumer_offsets了。
在JOIN_GROUP請求的時候,是沒帶什么關鍵參數的,但是在響應的時候會挑選一個客戶端作為leader,然后在響應中告訴它被選為了leader并且把消費組元數據信息發給它,然后讓該客戶端去進行分區分配。
在SYNC_GROUP請求的時候,leader就會帶上它根據具體的策略已經分配好的分區分配方案,服務端收到后就更新到元數據里面去,然后其余的consumer客戶端只要一發送SYNC請求過來就告訴它要消費哪些分區,然后讓它自己去消費就ok了。

到底是哪個階段導致rebalance過程會劣化到幾分鐘?
我圖中特意將JOIN階段標位紅色,就是讓這個階段顯得顯眼一些,沒錯就是這個階段會導致rebalance整個過程耗時劣化到幾分鐘。
具體的原因就是JOIN階段會等待原先組內存活的成員發送JOIN_GROUP請求過來,如果原先組內的成員因為業務處理一直沒有發送JOIN_GROUP請求過來,服務端就會一直等待,直到超時。這個超時時間就是max.poll.interval.ms的值,默認是5分鐘,因此這種情況下rebalance的耗時就會劣化到5分鐘,導致所有消費者都無法進行正常消費,影響非常大。

為什么要分為這么多階段?
這個主要是設計上的考慮,整個過程設計的還是非常優雅的,第一次連上的情況下需要三次請求,正常運行的consumer去進行rebalance只需要兩次請求,因為它原先就知道自己的coordinator在哪兒,因此就不需要FIND_COORDINATOR請求了,除非是它的coordinator宕機了。

回答完這些問題,是不是對整個rebalance過程理解加深一些了呢?其實還有很多細節沒有涉及到,例如consumer客戶端什么時候會進入rebalance狀態?服務端是如何等待原先消費組內的成員發送JOIN_GROUP請求的呢?這些問題就只能一步步看源碼了。

FIND_COORDINATOR請求的源碼我就不打寫了,很簡單大家可以自己翻一下,就是帶了個group.id上去,上面都提到了。


六、JOIN階段源碼分析

從這段函數我們知道,如果加入一個新的消費組,服務端收到第一個JOIN請求的時候會創建group,這個group的初始狀態為Empty

      // 如果group都還不存在,就有了memberId,則認為是非法請求,直接拒絕。
      groupManager.getGroup(groupId) match {
        case None =>
          // 這里group都還不存在的情況下,memberId自然是空的
          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
          } else {
            // 初始狀態是EMPTY
            val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
            // 執行具體的加組操作
            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }

        case Some(group) =>
          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
      }

讓我們進入doJoinGroup函數,看下里面的核心邏輯:

          case Empty | Stable =>
            // 初始狀態是EMPTY,添加member并且執行rebalance
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // if the member id is unknown, register the member to the group
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
			// ...
              } else {
			  //...
              }
  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                    sessionTimeoutMs: Int,
                                    clientId: String,
                                    clientHost: String,
                                    protocolType: String,
                                    protocols: List[(String, Array[Byte])],
                                    group: GroupMetadata,
                                    callback: JoinCallback) = {
    // 根據clientID初始化memberID
    val memberId = clientId + "-">
  def add(member: MemberMetadata) {
    if (members.isEmpty)
      this.protocolType = Some(member.protocolType)

    assert(groupId == member.groupId)
    assert(this.protocolType.orNull == member.protocolType)
    assert(supportsProtocols(member.protocols))
    // coordinator選舉leader很簡單,就第一個發送join_group請求的那個member
    if (leaderId.isEmpty)
      leaderId = Some(member.memberId)
    members.put(member.memberId, member)
  }

上面的代碼翻譯一下很簡單,就是新來了一個member,封裝一下,添加到這個group中,需要說一下的就是當組狀態是Empty的情況下,誰先連上誰就是leader。緊接著就準備rebalance:

  private def maybePrepareRebalance(group: GroupMetadata) {
    group.inLock {
      if (group.canRebalance)
        prepareRebalance(group)
    }
  }
  // 這里是傳入PreparingRebalance狀態,然后獲取到一個SET
  // 翻譯一下:就是只有這個SET(Stable, CompletingRebalance, Empty)里面的狀態,才能開啟rebalance
  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)

  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
    Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
      CompletingRebalance -> Set(PreparingRebalance),
      Stable -> Set(CompletingRebalance),
      PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
      Empty -> Set(PreparingRebalance))
  private def prepareRebalance(group: GroupMetadata) {
    // if any members are awaiting sync, cancel their request and have them rejoin
    if (group.is(CompletingRebalance))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

    val delayedRebalance = if (group.is(Empty))
      new InitialDelayedJoin(this,
        joinPurgatory,
        group,
        groupConfig.groupInitialRebalanceDelayMs,// 默認3000ms,即3s
        groupConfig.groupInitialRebalanceDelayMs,
        max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
    else
      new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 這里這個超時時間是客戶端的poll間隔,默認5分鐘
    // 狀態機轉換:EMPTY -> PreparingRebalance
    group.transitionTo(PreparingRebalance)
    // rebalance開始標志日志
    info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
      s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
    // 加入時間輪
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

上面這段代碼有兩個關鍵點,一個是判斷當前能否進入rebalance過程,可以看到只有(Stable, CompletingRebalance, Empty)里面的狀態,才能開啟rebalance,而最開始來到第一個member的時候,組的狀態是Empty顯然是能進來的,但是近來之后就給轉換為了PreparingRebalance狀態,那么后續的member發送JOIN請求過來之后就進不來了,就只能設置個回調后一直等。
那么要等到什么時候呢?第二段代碼寫的很清楚就是等待延時任務超時,這個延時任務創建是根據當前狀態來判斷的,如果是Empty就創建一個InitialDelayedJoin延時任務,超時時間是3s;如果不是Empty就創建一個DelayedJoin,超時時間默認是5min。看,源碼出真知,這就是JOIN階段等待member的代碼實現。
這里需要補充一下,為什么Empty的狀態下要等待3s呢?這其實是一個優化,主要就是優化多消費者同時連入的情況。舉個栗子,10個消費者都能在3s內啟動然后練上,如果你等著3s時間那么一次rebalance過程就搞定了,如果你不等,那么就意味著來一個就又要開啟一次rebalance,一共要進行10次rebalance,這個耗時就比較長了。具體的細節可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html
另外就是,為什么狀態不是Empty的時候就延時5分鐘呢?這個其實上面就回答了,要等待原來消費組內在線的消費者發送JOIN請求,這個也是rebalance過程耗時劣化的主要原因。

接下來我們看看這兩個延時任務,在超時的時候分別都會做些啥,首先是InitialDelayedJoin:

/**
  * Delayed rebalance operation that is added to the purgatory when a group is transitioning from
  * Empty to PreparingRebalance
  *
  * When onComplete is triggered we check if any new members have been added and if there is still time remaining
  * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the
  * rebalance.
  */
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
                                        purgatory: DelayedOperationPurgatory[DelayedJoin],
                                        group: GroupMetadata,
                                        configuredRebalanceDelay: Int,
                                        delayMs: Int,
                                        remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {

  // 這里寫死是false,是為了在tryComplete的時候不被完成
  override def tryComplete(): Boolean = false

  override def onComplete(): Unit = {
    // 延時任務處理
    group.inLock  {
      // newMemberAdded是后面有新的member加進來就會是true
      // remainingMs第一次創建該延時任務的時候就是3s。
      // 所以這個條件在第一次的時候都是成立的
      if (group.newMemberAdded && remainingMs != 0) {
        group.newMemberAdded = false
        val delay = min(configuredRebalanceDelay, remainingMs)
        // 最新計算的remaining恒等于0,其實本質上就是3-3=0,
        // 所以哪怕這里是新創建了一個InitialDelayedJoin,這個任務的超時時間就是下一刻
        // 這么寫的目的其實就是相當于去完成這個延時任務
        val remaining = max(remainingMs - delayMs, 0)
        purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
          purgatory,
          group,
          configuredRebalanceDelay,
          delay,
          remaining
        ), Seq(GroupKey(group.groupId)))
      } else
        // 如果沒有新的member加入,直接調用父類的函數
        // 完成JOIN階段
        super.onComplete()
    }
  }
}

大意我都寫在注釋里面了,其實就是等待3s,然后完了之后調用父類的函數完成整個JOIN階段,不過不聯系上下文去看,還是挺費勁的,對了看這個需要對時間輪源碼有了解,正好我前面有寫,大家如果有什么不清楚的可以去看下。
接著看下DelayedJoin超時后會干嘛:

/**
 * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
 *
 * Whenever a join-group request is received, check if all known group members have requested
 * to re-join the group; if yes, complete this operation to proceed rebalance.
 *
 * When the operation has expired, any known members that have not requested to re-join
 * the group are marked as failed, and complete this operation to proceed rebalance with
 * the rest of the group.
 */
private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                 group: GroupMetadata,
                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)
}

  // 超時之后啥也沒干,哈哈,因為確實不用做啥,置空就好了
  // 核心是onComplete函數和tryComplete函數
  def onExpireJoin() {
    // TODO: add metrics for restabilize timeouts
  }
  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
    group.inLock {
      if (group.notYetRejoinedMembers.isEmpty)
        forceComplete()
      else false
    }
  }
  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
  
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }
  def onCompleteJoin(group: GroupMetadata) {
    group.inLock {
      // remove any members who haven't joined the group yet
      // 如果組內成員依舊沒能連上,那么就刪除它,接收當前JOIN階段
      group.notYetRejoinedMembers.foreach { failedMember =>
        group.remove(failedMember.memberId)
        // TODO: cut the socket connection to the client
      }

      if (!group.is(Dead)) {
        // 狀態機流轉 : preparingRebalancing -> CompletingRebalance
        group.initNextGeneration()
        if (group.is(Empty)) {
          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          groupManager.storeGroup(group, Map.empty, error => {
            if (error != Errors.NONE) {
              // we failed to write the empty group metadata. If the broker fails before another rebalance,
              // the previous generation written to the log will become active again (and most likely timeout).
              // This should be safe since there are no active members in an empty generation, so we just warn.
              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
            }
          })
        } else {
          // JOIN階段標志結束日志
          info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          // trigger the awaiting join group response callback for all the members after rebalancing
          for (member <- group.allMemberMetadata) {
            assert(member.awaitingJoinCallback != null)
            val joinResult = JoinGroupResult(
              // 如果是leader 就返回member列表及其元數據信息
              members = if (group.isLeader(member.memberId)) {
                group.currentMemberMetadata
              } else {
                Map.empty
              },
              memberId = member.memberId,
              generationId = group.generationId,
              subProtocol = group.protocolOrNull,
              leaderId = group.leaderOrNull,
              error = Errors.NONE)

            member.awaitingJoinCallback(joinResult)
            member.awaitingJoinCallback = null
            completeAndScheduleNextHeartbeatExpiration(group, member)
          }
        }
      }
    }
  }

上面這一串代碼有幾個要點,首先,這個任務超時的時候是啥也不干的,為什么呢?這里要了解時間輪的機制,代碼也在上面,當一個任務超時的時候,時間輪強制執行對應任務的onComplete函數,然后執行onExpiration函數,其實onExpiration函數對于這個延時任務來說是沒有意義的,并不需要做什么,打日志都懶得打。
第二點就是這個任務onComplete什么時候會被調用呢?難道就只能等待5分鐘超時才能被調用嗎?那不是每一次rebalance都必須要等待5分鐘?當然不可能啦,這里就需要先看下tryComplete函數的內容,發現這個內容會去檢查還沒連上的member,如果發現到期了,就強制完成。那么我們看下這tryComplete是在哪兒被調用的?這里需要插入一點之前沒貼全的代碼,在doJoinGroup函數中的而最后一段:

if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))

這段代碼非常關鍵,當當前狀態是PreparingRebalance的時候,會嘗試去完成當前的延時任務,最終調用的代碼:

  private[server] def maybeTryComplete(): Boolean = {
    var retry = false
    var done = false
    do {
      if (lock.tryLock()) {
        try {
          tryCompletePending.set(false)
          done = tryComplete()
        } finally {
          lock.unlock()
        }
        // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
        // `tryCompletePending`. In this case we should retry.
        retry = tryCompletePending.get()
      } else {
        // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
        // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
        // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
        // released the lock and returned by the time the flag is set.
        retry = !tryCompletePending.getAndSet(true)
      }
    } while (!isCompleted && retry)
    done
  }

就是上面的tryComplete函數,最終會調用到DelayedJoin中的tryComplete函數,什么意思呢?已經很明顯了,每來一個JOIN請求的時候,如果處于PreparingRebalance階段,都會去檢查一下group中原來的成員是否已經到齊了,到齊了就立刻結束JOIN階段往后走。看到這兒,回頭看下InitialDelayedJoin這個延時任務的tryComplete為什么就默認實現了個false呢?也明白了,就是初始化延時任務的時候不讓你嘗試完成,我就等3s,不需要你們來觸發我提前完成。

以上,我們就看完了整個服務端的JOIN請求處理過程,其實主要核心就是這兩個延時任務,如果不聯系上下文,不了解時間輪機制,看起來確實費勁。接下來就看下SYNC階段是如何處理的。


七、SYNC階段源碼分析

直接看下面的核心源碼邏輯:

  private def doSyncGroup(group: GroupMetadata,
                          generationId: Int,
                          memberId: String,
                          groupAssignment: Map[String, Array[Byte]],
                          responseCallback: SyncCallback) {
    group.inLock {
      if (!group.has(memberId)) {
        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
      } else if (generationId != group.generationId) {
        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
      } else {
        group.currentState match {
          case Empty | Dead =>
            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)

          case PreparingRebalance =>
            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
          // 只有group處于compeletingRebalance狀態下才會被處理
          // 其余狀態都是錯誤的狀態
          case CompletingRebalance =>
            // 給當前member設置回調,之后就啥也不干,也不返回
            // 等到leader的分區方案就緒后,才會被返回。
            group.get(memberId).awaitingSyncCallback = responseCallback

            // if this is the leader, then we can attempt to persist state and transition to stable
            // 只有收到leader的SYNC才會被處理,并進行狀態機流轉
            if (group.isLeader(memberId)) {
              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

              // fill any missing members with an empty assignment
              val missing = group.allMembers -- groupAssignment.keySet
              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

              groupManager.storeGroup(group, assignment, (error: Errors) => {
                group.inLock {
                  // another member may have joined the group while we were awaiting this callback,
                  // so we must ensure we are still in the CompletingRebalance state and the same generation
                  // when it gets invoked. if we have transitioned to another state, then do nothing
                  if (group.is(CompletingRebalance) && generationId == group.generationId) {
                    if (error != Errors.NONE) {
                      resetAndPropagateAssignmentError(group, error)
                      maybePrepareRebalance(group)
                    } else {
                      setAndPropagateAssignment(group, assignment)
                      // 狀態機流轉:CompletingRebalance -> Stable
                      group.transitionTo(Stable)
                    }
                  }
                }
              })
            }
          // 如果已經處于stable狀態,說明leader已經把分區分配方案傳上來了
          // 那么直接從group的元數據里面返回對應的方案就好了
          case Stable =>
            // if the group is stable, we just return the current assignment
            val memberMetadata = group.get(memberId)
            responseCallback(memberMetadata.assignment, Errors.NONE)
            // 開啟心跳檢測
            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
        }
      }
    }
  }

我們可能對上面的代碼案處理會有一個疑問,為什么只有leader的SYNC請求才會被處理呢?要是其他consumer比leader早上來了難道就卡這兒不管了?不像JOIN階段那樣加入個時間輪設置個最大超時時間?這要是leader一直不發送SNYC請求,那不就所有成員都這兒干等著,無限等待了?
我們一個個來回答,首先,我們看上面的代碼,每個請求過來第一件事是先設置回調,然后才去卡住等著,直到leader把分區分配方案通過SYNC請求帶上來。
第二個問題,如果其他consumer比leader早到了就這么干等著嗎?是的,沒錯,代碼就是這么寫的。
第三個問題,為什么不設置個最大超時時間啥的?我們可以看下客戶端的代碼,一旦開啟rebalance之后,就只會進行相關請求的收發,意味著leader在收到JOIN階段的返回后,中間不會有任何業務代碼的影響,直接就是分配完分區然后發送SYNC請求;這就意味著leader的JOIN響應和SYNC請求之間理論上是不存在阻塞的,因此就可以不用設置超時,就不用加入時間輪了。
第四個問題,leader一直不發送SYNC請求就干等著?是的,代碼就是這么寫的。不過你想想,哪些情況能讓leader一直不發送SYNC請求?我能想到的就是GC/leader宕機了,無論是哪種情況都會因為心跳線程出了問題被服務端檢測到,因此在對應的心跳任務超時后重新開啟下一輪的rebalance。哪怕是GC很長時間之后恢復了繼續發SYNC請求過來,也會因為generation不匹配而得到錯誤返回開啟下一輪rebalance。
最后再看下leader到了之后會具體做啥:

  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
    assert(group.is(CompletingRebalance))
    // 給每個member的分配方案賦值
    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
    // 在整個group中傳播這個分配方案
    propagateAssignment(group, Errors.NONE)
  }
  
  private def propagateAssignment(group: GroupMetadata, error: Errors) {
    // 遍歷
    // 如果是follower比leader先到SYNC請求
    // 那么就只會設置個callback,就啥都不干了,也不會返回
    // 直到leader帶著分配方案來了以后,把狀態更改為stable之后,才會遍歷
    // 看看有哪些member已經發送了請求過來,設置了callback,然后一次性給他們返回回去對應的分區方案
    // 所以這個名稱叫做【傳播分配方案】
    // 真是絕妙
    for (member <- group.allMemberMetadata) {
      if (member.awaitingSyncCallback != null) {
        // 通過回調告訴member對應的分配方案
        member.awaitingSyncCallback(member.assignment, error)
        member.awaitingSyncCallback = null

        // reset the session timeout for members after propagating the member's assignment.
        // This is because if any member's session expired while we were still awaiting either
        // the leader sync group or the storage callback, its expiration will be ignored and no
        // future heartbeat expectations will not be scheduled.
        completeAndScheduleNextHeartbeatExpiration(group, member)
      }
    }
  }

看,最開始設置的回調,在收到leader請求時候,起了作用;會被挨個遍歷后響應具體的分區分配方案,另外就是kafka里面的命名都很準確。

SYNC階段簡單說起來就是等待leader把分區分配方案傳上來,如果member先到就設置個回調先等著,如果leader先到,就直接把分區分配方案存到group的元數據中,然后狀態修改為Stable,后續其他member來的SYNC請求就直接從group的元數據取分區分配方案,然后自己消費去了。


八、線上如何排查rebalance問題?
看完理論,讓我們來看下線上問題怎么排查rebalance問題。 rebalance有哪些問題呢?我們來整理一下:

  • 為什么會rebalance呢?是什么引起的?能定位到是哪個客戶端嘛?

  • rebalance耗時了多久?為什么會劣化? 常見的就上面兩個問題,我們按個來回答。

首先,為什么會rebalance,這個就三種情況,分區信息變化、客戶端變化、coordinator變化
一般線上常見的就是客戶端變化,那么客戶端有哪些可能的變化呢?——新增成員,減少成員

新增成員怎么看呢?很簡單嘛,找到coordinator,然后去kafka-request.log里面搜:cat kafka-request.log |grep -i find | grep -i ${group.id} 不過一般FIND_COORDINATOR請求的處理時間都小于10ms,所以只能打開debug日志才能看到。一般這種讓客戶自己看,對應的時間點是不是有啟動kafka-consumer就行了,其實也不常見,這種情況。畢竟很少有人頻繁開啟關閉消費者,就算是有也是不好的業務使用方式。

減少成員呢?又分為兩種:心跳超時,poll間隔超過配置 心跳超時的標識日志:

  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group.inLock {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
        // 標識日志
        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        removeMemberAndUpdateGroup(group, member)
      }
    }
  }

很遺憾poll間隔超時,在1.1.0版本的info級別下并沒有可查找的日志,檢測poll時間間隔超時的是對應客戶端的心跳線程,在檢測到超過配置后就會主動leaveGroup從而觸發rebalance,而這個請求在服務端依舊沒有info級別的請求,因此,要判斷是poll間隔超時引起的rebalance,就只能看下有沒有上面心跳超時的日志,如果沒有可能就是因為這個原因造成的。目前大多數的rebalance都是因為這個原因造成的,而且這個原因引發的rebalance同時還可能伴隨著很長的rebalance耗時。
來看下服務端是如何做poll間隔超時的呢?

} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
}

public boolean sessionTimeoutExpired(long now) {
    return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

    public synchronized void maybeLeaveGroup() {
        if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
            // this is a minimal effort attempt to leave the group. we do not
            // attempt any resending if the request fails or times out.
            log.debug("Sending LeaveGroup request to coordinator {}", coordinator);
            LeaveGroupRequest.Builder request =
                    new LeaveGroupRequest.Builder(groupId, generation.memberId);
            client.send(coordinator, request)
                    .compose(new LeaveGroupResponseHandler());
            client.pollNoWakeup();
        }

        resetGeneration();
    }

總結一下,怎么定位rebalance的問題,就是找標志日志,然后排除法,實在不行了就打開debug日志

接著看第二個問題,rebalance一次的時間耗費了多久?為什么會劣化到幾分鐘? 因為整個rebalance過程是線性的過程,就是狀態按照請求順序流轉,因此呢找到對應的標志日志就好啦。 開啟的標志日志:

// rebalance開始標志日志
info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

結束的兩種標識日志:這兩種結束日志都行,因為都差不多代表著rebalance過程完成,原因在上面已經講的很清楚了。

 // JOIN階段標志結束日志
 info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

// SYNC階段結束日志
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

那么如何統計整個rebalance過程的時間呢? 顯而易見,結束時間 - 開始時間呀。

知道是怎么什么原因開啟了rebalance之后,該怎么定位業務問題呢? 心跳超時:因為心跳線程是守護線程,一般都是因為客戶端的機器負載太高導致心跳現場無法獲取到CPU導致的。
poll間隔超過配置:顯然嘛,就是poll出來數據之后,進行業務處理的時候太慢了,建議根據業務優化消費邏輯,改成多線程消費或者異步消費。


九、消費者如何感知到rebalance的呢?

這個很簡單,我們想一下,與這個group有關的元數據全部都在coordinator那里,哪些請求會與coordinator交互呢?HEARTBEAT/OFFSET_COMMIT嘛,就這倆,那么其實正常的member都是靠這兩個請求來感知到自己要去進行rebalance的,我們分別來看下。

首先是HEARTBEAT請求,每次都會帶上當前消費組的generation值,也就是紀元值,要是服務端rebalance已經完成了,紀元值+1,那么此時就會發現自己沒匹配上,然后緊接著就去設置自己的RejoinNeeded的標識,在下一輪poll 的時候就會去開啟rebalance。
如果說是rebalance還沒完成,那就更簡單了,發現group的狀態不是stable,直接就返回對應的錯誤,然后設置標識,加入到rebalance過程中。
服務端源碼:

        case Some(group) =>
          group.inLock {
            group.currentState match {
              case Dead =>
                // if the group is marked as dead, it means some other thread has just removed the group
                // from the coordinator metadata; this is likely that the group has migrated to some other
                // coordinator OR the group is in a transient unstable phase. Let the member retry
                // joining without the specified member id,
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case Empty =>
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case CompletingRebalance =>
                if (!group.has(memberId))
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                else
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)

              case PreparingRebalance =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
                }

              case Stable =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                  // 紀元切換
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  // 完成上次的延時,新建新的延時任務
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  // 回調響應
                  responseCallback(Errors.NONE)
                }

客戶端源碼:

    private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                        coordinator());
                markCoordinatorUnknown();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heartbeat failed since group is rebalancing");
                requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }
	
    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

所以我們客戶端看到這種異常,就知道怎么回事了,就是我在rebalance的過程中,或者已經完成了,客戶端的紀元不對。

    REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new RebalanceInProgressException(message);
            }
        }),

    ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new IllegalGenerationException(message);
            }
        }),

我們再看OFFSET_COMMIT請求,其實和HEARTBEAT請求是基本一致的。
服務端:

    group.inLock {
      if (group.is(Dead)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
        // The group is only using Kafka to store offsets.
        // Also, for transactional offset commits we don't need to validate group membership and the generation.
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
      } else if (group.is(CompletingRebalance)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
      } else if (!group.has(memberId)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if (generationId != group.generationId) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
      } else {
        val member = group.get(memberId)
        completeAndScheduleNextHeartbeatExpiration(group, member)
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
      }
    }
  }

客戶端:

else if (error == Errors.UNKNOWN_MEMBER_ID
                            || error == Errors.ILLEGAL_GENERATION
                            || error == Errors.REBALANCE_IN_PROGRESS) {
                        // need to re-join group
                        resetGeneration();
                        future.raise(new CommitFailedException());
                        return;

    /**
     * Reset the generation and memberId because we have fallen out of the group.
     */
    protected synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

從源碼我們可以看到,客戶端在感知rebalance主要通過兩個機制,一個是狀態,一個是紀元;狀態生效于rebalance過程中,紀元生效于rebalance的JOIN階段結束后。
與coordinator交互的這兩個請求都會帶上自己的紀元信息,在服務端處理前都會校驗一下狀態已經紀元信息,一旦不對,就告訴客戶端你需要rebalance了。


十、線上如何減小rebalance的影響?

首先明確下,rebalance會有什么影響?引用JVM的術語來說,就是STOP THE WORLD
一旦開啟rebalance過程,在消費者進入JOIN階段后就無法再繼續消費,就是整個group的成員全部STW,所以對業務的影響還是很大的。

“KAFKA中rebalance是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宽城| 海口市| 苏尼特左旗| 蛟河市| 安阳县| 永福县| 剑川县| 陆良县| 扶沟县| 建平县| 临泽县| 宁乡县| 武清区| 夏邑县| 大姚县| 青浦区| 鹿邑县| 永宁县| 巴彦淖尔市| 昭觉县| 边坝县| 西和县| 滦平县| 邹平县| 山东省| 潢川县| 齐河县| 平远县| 班戈县| 高雄市| 安庆市| 辉南县| 大连市| 灵寿县| 皋兰县| 昌黎县| 新兴县| 安陆市| 澎湖县| 衡阳县| 楚雄市|