您好,登錄后才能下訂單哦!
作者:Nico Kruber
翻譯:曹英杰
Flink 的網絡協議棧是組成 flink-runtime請添加鏈接描述 模塊的核心組件之一,是每個 Flink 作業的核心。它連接所有 TaskManager 的各個子任務(Subtask),因此,對于 Flink 作業的性能包括吞吐與延遲都至關重要。與 TaskManager 和 JobManager 之間通過基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之間的網絡協議棧依賴于更加底層的 Netty API。
本文將首先介紹 Flink 暴露給流算子(Stream operator)的高層抽象,然后詳細介紹 Flink 網絡協議棧的物理實現和各種優化、優化的效果以及 Flink 在吞吐量和延遲之間的權衡。
Flink 的網絡協議棧為彼此通信的子任務提供以下邏輯視圖,例如在 A 通過 keyBy() 操作進行數據 Shuffle :
cdn.xitu.io/2019/6/25/16b8d5ce9bfaa319?w=1080&h=859&f=png&s=393657">
這一過程建立在以下三種基本概念的基礎上:
▼ 子任務輸出類型(ResultPartitionType):
Pipelined(有限的或無限的):一旦產生數據就可以持續向下游發送有限數據流或無限數據流。
Blocking:僅在生成完整結果后向下游發送數據。
▼ 調度策略:
同時調度所有任務(Eager):同時部署作業的所有子任務(用于流作業)。
上游產生第一條記錄部署下游(Lazy):一旦任何生產者生成任何輸出,就立即部署下游任務。
上游產生完整數據部署下游:當任何或所有生產者生成完整數據后,部署下游任務。
▼ 數據傳輸:
高吞吐:Flink 不是一個一個地發送每條記錄,而是將若干記錄緩沖到其網絡緩沖區中并一次性發送它們。這降低了每條記錄的發送成本因此提高了吞吐量。
低延遲:當網絡緩沖區超過一定的時間未被填滿時會觸發超時發送,通過減小超時時間,可以通過犧牲一定的吞吐來獲取更低的延遲。
我們將在下面深入 Flink 網絡協議棧的物理實現時看到關于吞吐延遲的優化。對于這一部分,讓我們詳細說明輸出類型與調度策略。首先,需要知道的是子任務的輸出類型和調度策略是緊密關聯的,只有兩者的一些特定組合才是有效的。
Pipelined 結果是流式輸出,需要目標 Subtask 正在運行以便接收數據。因此需要在上游 Task 產生數據之前或者產生第一條數據的時候調度下游目標 Task 運行。批處理作業生成有界結果數據,而流式處理作業產生無限結果數據。
批處理作業也可能以阻塞方式產生結果,具體取決于所使用的算子和連接模式。在這種情況下,必須等待上游 Task 先生成完整的結果,然后才能調度下游的接收 Task 運行。這能夠提高批處理作業的效率并且占用更少的資源。
下表總結了 Task 輸出類型以及調度策略的有效組合:
注釋:
[1]目前 Flink 未使用
[2]批處理 / 流計算統一完成后,可能適用于流式作業
此外,對于具有多個輸入的子任務,調度以兩種方式啟動:當所有或者任何上游任務產生第一條數據或者產生完整數據時調度任務運行。要調整批處理作業中的輸出類型和調度策略,可以參考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。
為了理解物理數據連接,請回想一下,在 Flink 中,不同的任務可以通過 Slotsharing group 共享相同 Slot。TaskManager 還可以提供多個 Slot,以允許將同一任務的多個子任務調度到同一個 TaskManager 上。
對于下圖所示的示例,我們假設 2 個并發為 4 的任務部署在 2 個 TaskManager 上,每個 TaskManager 有兩個 Slot。TaskManager 1 執行子任務 A.1,A.2,B.1 和 B.2,TaskManager 2 執行子任務 A.3,A.4,B.3 和 B.4。在 A 和 B 之間是 Shuffle 連接類型,比如來自于 A 的 keyBy() 操作,在每個 TaskManager 上會有 2x4 個邏輯連接,其中一些是本地的,另一些是遠程的:
不同任務(遠程)之間的每個網絡連接將在 Flink 的網絡堆棧中獲得自己的 TCP 通道。但是,如果同一任務的不同子任務被調度到同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接將多路復用并共享同一個 TCP 信道以減少資源使用。在我們的例子中,這適用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下圖所示:
每個子任務的輸出結果稱為 ResultPartition,每個 ResultPartition 被分成多個單獨的 ResultSubpartition- 每個邏輯通道一個。Flink 的網絡協議棧在這一點的處理上,不再處理單個記錄,而是將一組序列化的記錄填充到網絡緩沖區中進行處理。每個子任務本地緩沖區中最多可用 Buffer 數目為(每個發送方和接收方各一個):
#channels * buffers-per-channel + floating-buffers-per-gate
單個 TaskManager 上的網絡層 Buffer 總數通常不需要配置。有關如何在需要時進行配置的詳細信息,請參閱配置網絡緩沖區的文檔。
▼ 造成反壓(1)
每當子任務的數據發送緩沖區耗盡時——數據駐留在 Subpartition 的緩沖區隊列中或位于更底層的基于 Netty 的網絡堆棧內,生產者就會被阻塞,無法繼續發送數據,而受到反壓。接收端以類似的方式工作:Netty 收到任何數據都需要通過網絡 Buffer 傳遞給 Flink。如果相應子任務的網絡緩沖區中沒有足夠可用的網絡 Buffer,Flink 將停止從該通道讀取,直到 Buffer 可用。這將反壓該多路復用上的所有發送子任務,因此也限制了其他接收子任務。下圖說明了過載的子任務 B.4,它會導致多路復用的反壓,也會導致子任務 B.3 無法接受和處理數據,即使是 B.3 還有足夠的處理能力。
為了防止這種情況發生,Flink 1.5 引入了自己的流量控制機制。
Credit-based 流量控制可確保發送端已經發送的任何數據,接收端都具有足夠的能力(Buffer)來接收。新的流量控制機制基于網絡緩沖區的可用性,作為 Flink 之前機制的自然延伸。每個遠程輸入通道(RemoteInputChannel)現在都有自己的一組獨占緩沖區(Exclusive buffer),而不是只有一個共享的本地緩沖池(LocalBufferPool)。與之前不同,本地緩沖池中的緩沖區稱為流動緩沖區(Floating buffer),因為它們會在輸出通道間流動并且可用于每個輸入通道。
數據接收方會將自身的可用 Buffer 作為 Credit 告知數據發送方(1 buffer = 1 credit)。每個 Subpartition 會跟蹤下游接收端的 Credit(也就是可用于接收數據的 Buffer 數目)。只有在相應的通道(Channel)有 Credit 的時候 Flink 才會向更底層的網絡協議棧發送數據(以 Buffer 為粒度),并且每發送一個 Buffer 的數據,相應的通道上的 Credit 會減 1。除了發送數據本身外,數據發送端還會發送相應 Subpartition 中有多少正在排隊發送的 Buffer 數(稱之為 Backlog)給下游。數據接收端會利用這一信息(Backlog)去申請合適數量的 Floating buffer 用于接收發送端的數據,這可以加快發送端堆積數據的處理。接收端會首先申請和 Backlog 數量相等的 Buffer,但可能無法申請到全部,甚至一個都申請不到,這時接收端會利用已經申請到的 Buffer 進行數據接收,并監聽是否有新的 Buffer 可用。
Credit-based 的流控使用 Buffers-per-channel 來指定每個 Channel 有多少獨占的 Buffer,使用 Floating-buffers-per-gate 來指定共享的本地緩沖池(Local buffer pool)大小(可選3),通過共享本地緩沖池,Credit-based 流控可以使用的 Buffer 數目可以達到與原來非 Credit-based 流控同樣的大小。這兩個參數的默認值是被精心選取的,以保證新的 Credit-based 流控在網絡健康延遲正常的情況下至少可以達到與原策略相同的吞吐。可以根據實際的網絡 RRT (round-trip-time)和帶寬對這兩個參數進行調整。
注釋3:如果沒有足夠的 Buffer 可用,則每個緩沖池將獲得全局可用 Buffer 的相同份額(±1)。
▼ 造成反壓(2)
與沒有流量控制的接收端反壓機制不同,Credit 提供了更直接的控制:如果接收端的處理速度跟不上,最終它的 Credit 會減少成 0,此時發送端就不會在向網絡中發送數據(數據會被序列化到 Buffer 中并緩存在發送端)。由于反壓只發生在邏輯鏈路上,因此沒必要阻斷從多路復用的 TCP 連接中讀取數據,也就不會影響其他的接收者接收和處理數據。
▼ Credit-based 的優勢與問題
由于通過 Credit-based 流控機制,多路復用中的一個信道不會由于反壓阻塞其他邏輯信道,因此整體資源利用率會增加。此外,通過完全控制正在發送的數據量,我們還能夠加快 Checkpoint alignment:如果沒有流量控制,通道需要一段時間才能填滿網絡協議棧的內部緩沖區并表明接收端不再讀取數據了。在這段時間里,大量的 Buffer 不會被處理。任何 Checkpoint barrier(觸發 Checkpoint 的消息)都必須在這些數據 Buffer 后排隊,因此必須等到所有這些數據都被處理后才能夠觸發 Checkpoint(“Barrier 不會在數據之前被處理!”)。
但是,來自接收方的附加通告消息(向發送端通知 Credit)可能會產生一些額外的開銷,尤其是在使用 SSL 加密信道的場景中。此外,單個輸入通道( Input channel)不能使用緩沖池中的所有 Buffer,因為存在無法共享的 Exclusive buffer。新的流控協議也有可能無法做到立即發送盡可能多的數據(如果生成數據的速度快于接收端反饋 Credit 的速度),這時則可能增長發送數據的時間。雖然這可能會影響作業的性能,但由于其所有優點,通常新的流量控制會表現得更好。可能會通過增加單個通道的獨占 Buffer 數量,這會增大內存開銷。然而,與先前實現相比,總體內存使用可能仍然會降低,因為底層的網絡協議棧不再需要緩存大量數據,因為我們總是可以立即將其傳輸到 Flink(一定會有相應的 Buffer 接收數據)。
在使用新的 Credit-based 流量控制時,可能還會注意到另一件事:由于我們在發送方和接收方之間緩沖較少的數據,反壓可能會更早的到來。然而,這是我們所期望的,因為緩存更多數據并沒有真正獲得任何好處。如果要緩存更多的數據并且保留 Credit-based 流量控制,可以考慮通過增加單個輸入共享 Buffer 的數量。
注意:如果需要關閉 Credit-based 流量控制,可以將這個配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此參數已過時,最終將與非 Credit-based 流控制代碼一起刪除。
下圖從上面的擴展了更高級別的視圖,其中包含網絡協議棧及其周圍組件的更多詳細信息,從發送算子發送記錄(Record)到接收算子獲取它:
在生成 Record 并將其傳遞出去之后,例如通過 Collector#collect(),它被傳遞給 RecordWriter,RecordWriter 會將 Java 對象序列化為字節序列,最終存儲在 Buffer 中按照上面所描述的在網絡協議棧中進行處理。RecordWriter 首先使用 SpanningRecordSerializer 將 Record 序列化為靈活的堆上字節數組。然后,它嘗試將這些字節寫入目標網絡 Channel 的 Buffer 中。我們將在下面的章節回到這一部分。
在接收方,底層網絡協議棧(Netty)將接收到的 Buffer 寫入相應的輸入通道(Channel)。流任務的線程最終從這些隊列中讀取并嘗試在 RecordReader 的幫助下通過 SpillingAdaptiveSpanningRecordDeserializer 將累積的字節反序列化為 Java 對象。與序列化器類似,這個反序列化器還必須處理特殊情況,例如跨越多個網絡 Buffer 的 Record,或者因為記錄本身比網絡緩沖區大(默認情況下為32KB,通過 taskmanager.memory.segment-size 設置)或者因為序列化 Record 時,目標 Buffer 中已經沒有足夠的剩余空間保存序列化后的字節數據,在這種情況下,Flink 將使用這些字節空間并繼續將其余字節寫入新的網絡 Buffer 中。
在上圖中,Credit-based 流控制機制實際上位于“Netty Server”(和“Netty Client”)組件內部,RecordWriter 寫入的 Buffer 始終以空狀態(無數據)添加到 Subpartition 中,然后逐漸向其中填寫序列化后的記錄。但是 Netty 在什么時候真正的獲取并發送這些 Buffer 呢?顯然,不能是 Buffer 中只要有數據就發送,因為跨線程(寫線程與發送線程)的數據交換與同步會造成大量的額外開銷,并且會造成緩存本身失去意義(如果是這樣的話,不如直接將將序列化后的字節發到網絡上而不必引入中間的 Buffer)。
在 Flink 中,有三種情況可以使 Netty 服務端使用(發送)網絡 Buffer:
▼ 在 Buffer 滿后發送
RecordWriter 將 Record 序列化到本地的序列化緩沖區中,并將這些序列化后的字節逐漸寫入位于相應 Result subpartition 隊列中的一個或多個網絡 Buffer中。雖然單個 RecordWriter 可以處理多個 Subpartition,但每個 Subpartition 只會有一個 RecordWriter 向其寫入數據。另一方面,Netty 服務端線程會從多個 Result subpartition 中讀取并像上面所說的那樣將數據寫入適當的多路復用信道。這是一個典型的生產者 - 消費者模式,網絡緩沖區位于生產者與消費者之間,如下圖所示。在(1)序列化和(2)將數據寫入 Buffer 之后,RecordWriter 會相應地更新緩沖區的寫入索引。一旦 Buffer 完全填滿,RecordWriter 會(3)為當前 Record 剩余的字節或者下一個 Record 從其本地緩沖池中獲取新的 Buffer,并將新的 Buffer 添加到相應 Subpartition 的隊列中。這將(4)通知 Netty服務端線程有新的數據可發送(如果 Netty 還不知道有可用的數據的話4)。每當 Netty 有能力處理這些通知時,它將(5)從隊列中獲取可用 Buffer 并通過適當的 TCP 通道發送它。
注釋4:如果隊列中有更多已完成的 Buffer,我們可以假設 Netty 已經收到通知。
▼ 在 Buffer 超時后發送
為了支持低延遲應用,我們不能只等到 Buffer 滿了才向下游發送數據。因為可能存在這種情況,某種通信信道沒有太多數據,等到 Buffer 滿了在發送會不必要地增加這些少量 Record 的處理延遲。因此,Flink 提供了一個定期 Flush 線程(the output flusher)每隔一段時間會將任何緩存的數據全部寫出。可以通過 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的間隔,并作為延遲5的上限(對于低吞吐量通道)。下圖顯示了它與其他組件的交互方式:RecordWriter 如前所述序列化數據并寫入網絡 Buffer,但同時,如果 Netty 還不知道有數據可以發送,Output flusher 會(3,4)通知 Netty 服務端線程數據可讀(類似與上面的“buffer已滿”的場景)。當 Netty 處理此通知(5)時,它將消費(獲取并發送)Buffer 中的可用數據并更新 Buffer 的讀取索引。Buffer 會保留在隊列中——從 Netty 服務端對此 Buffer 的任何進一步操作將在下次從讀取索引繼續讀取。
注釋5:嚴格來說,Output flusher 不提供任何保證——它只向 Netty 發送通知,而 Netty 線程會按照能力與意愿進行處理。這也意味著如果存在反壓,則 Output flusher 是無效的。
▼ 特殊消息后發送
一些特殊的消息如果通過 RecordWriter 發送,也會觸發立即 Flush 緩存的數據。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,這些事件應該盡快被發送,而不應該等待 Buffer 被填滿或者 Output flusher 的下一次 Flush。
▼ 進一步的討論
與小于 1.5 版本的 Flink 不同,請注意(a)網絡 Buffer 現在會被直接放在 Subpartition 的隊列中,(b)網絡 Buffer 不會在 Flush 之后被關閉。這給我們帶來了一些好處:
但是,在低負載情況下,可能會出現 CPU 使用率和 TCP 數據包速率的增加。這是因為,Flink 將使用任何可用的 CPU 計算能力來嘗試維持所需的延遲。一旦負載增加,Flink 將通過填充更多的 Buffer 進行自我調整。由于同步開銷減少,高負載場景不會受到影響,甚至可以實現更高的吞吐。
更深入地了解 Flink 中是如何實現生產者 - 消費者機制,需要仔細查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 類。雖然讀取是以 Buffer 為粒度,但寫入它是按 Record 進行的,因此是 Flink 中所有網絡通信的核心路徑。因此,我們需要在任務線程(Task thread)和 Netty 線程之間實現輕量級連接,這意味著盡量小的同步開銷。你可以通過查看源代碼獲取更加詳細的信息。
引入網絡 Buffer 的目是獲得更高的資源利用率和更高的吞吐,代價是讓 Record 在 Buffer 中等待一段時間。雖然可以通過 Buffer 超時給出此等待時間的上限,但可能很想知道有關這兩個維度(延遲和吞吐)之間權衡的更多信息,顯然,無法兩者同時兼得。下圖顯示了不同的 Buffer 超時時間下的吞吐,超時時間從 0 開始(每個 Record 直接 Flush)到 100 毫秒(默認值),測試在具有 100 個節點每個節點 8 個 Slot 的群集上運行,每個節點運行沒有業務邏輯的 Task 因此只用于測試網絡協議棧的能力。為了進行比較,我們還測試了低延遲改進(如上所述)之前的 Flink 1.4 版本。
如圖,使用 Flink 1.5+,即使是非常低的 Buffer 超時(例如1ms)(對于低延遲場景)也提供高達超時默認參數(100ms)75% 的最大吞吐,但會緩存更少的數據。
了解 Result partition,批處理和流式計算的不同網絡連接以及調度類型,Credit-Based 流量控制以及 Flink 網絡協議棧內部的工作機理,有助于更好的理解網絡協議棧相關的參數以及作業的行為。后續我們會推出更多 Flink 網絡棧的相關內容,并深入更多細節,包括運維相關的監控指標(Metrics),進一步的網絡調優策略以及需要避免的常見錯誤等。
via:
https://flink.apache.org/2019/06/05/flink-network-stack.html
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。