您好,登錄后才能下訂單哦!
這篇文章主要介紹“Kafka怎么用”,在日常操作中,相信很多人在Kafka怎么用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Kafka怎么用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
當Kafka集群流量達到 萬億級記錄/天或者十萬億級記錄/天 甚至更高后,我們需要具備哪些能力才能保障集群高可用、高可靠、高性能、高吞吐、安全的運行。
這里總結內容主要針對Kafka2.1.1版本,包括集群版本升級、數據遷移、流量限制、監控告警、負載均衡、集群擴/縮容、資源隔離、集群容災、集群安全、性能優化、平臺化、開源版本缺陷、社區動態等方面。本文主要是介紹核心脈絡,不做過多細節講解。下面我們先來看看Kafka作為數據中樞的一些核心應用場景。
下圖展示了一些主流的數據處理流程,Kafka起到一個數據中樞的作用。
接下來看看我們Kafka平臺整體架構;
官網地址:http://kafka.apache.org
1.1.1.2 源碼改造如何升級與回退
由于在升級過程中,必然出現新舊代碼邏輯交替的情況。集群內部部分節點是開源版本,另外一部分節點是改造后的版本。所以,需要考慮在升級過程中,新舊代碼混合的情況,如何兼容以及出現故障時如何回退。
由于Kafka集群的架構特點,這必然導致集群內流量負載不均衡的情況,所以我們需要做一些數據遷移來實現集群不同節點間的流量均衡。Kafka開源版本為數據遷移提供了一個腳本工具“bin/kafka-reassign-partitions.sh”,如果自己沒有實現自動負載均衡,可以使用此腳本。
開源版本提供的這個腳本生成遷移計劃完全是人工干預的,當集群規模非常大時,遷移效率變得非常低下,一般以天為單位進行計算。當然,我們可以實現一套自動化的均衡程序,當負載均衡實現自動化以后,基本使用調用內部提供的API,由程序去幫我們生成遷移計劃及執行遷移任務。需要注意的是,遷移計劃有指定數據目錄和不指定數據目錄兩種,指定數據目錄的需要配置ACL安全認證。
官網地址:http://kafka.apache.org
不指定數據目錄
//未指定遷移目錄的遷移計劃 { "version":1, "partitions":[ {"topic":"yyj4","partition":0,"replicas":[1000003,1000004]}, {"topic":"yyj4","partition":1,"replicas":[1000003,1000004]}, {"topic":"yyj4","partition":2,"replicas":[1000003,1000004]} ] }
指定數據目錄
//指定遷移目錄的遷移計劃 { "version":1, "partitions":[ {"topic":"yyj1","partition":0,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}, {"topic":"yyj1","partition":1,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]}, {"topic":"yyj1","partition":2,"replicas":[1000006,1000005],"log_dirs":["/data1/bigdata/mydata1","/data1/bigdata/mydata3"]} ] }
生產環境的服務器一般都是掛載多塊硬盤,比如4塊/12塊等;那么可能出現在Kafka集群內部,各broker間流量比較均衡,但是在broker內部,各磁盤間流量不均衡,導致部分磁盤過載,從而影響集群性能和穩定,也沒有較好的利用硬件資源。在這種情況下,我們就需要對broker內部多塊磁盤的流量做負載均衡,讓流量更均勻的分布到各磁盤上。
當前Kafka開源版本(2.1.1版本)提供的副本遷移工具“bin/kafka-reassign-partitions.sh”在同一個集群內只能實現遷移任務的串行。對于集群內已經實現多個資源組物理隔離的情況,由于各資源組不會相互影響,但是卻不能友好的進行并行的提交遷移任務,遷移效率有點低下,這種不足直到2.6.0版本才得以解決。如果需要實現并發數據遷移,可以選擇升級Kafka版本或者修改Kafka源碼。
當前Kafka開源版本(2.1.1版本)提供的副本遷移工具“bin/kafka-reassign-partitions.sh”在啟動遷移任務后,無法終止遷移。當遷移任務對集群的穩定性或者性能有影響時,將變得束手無策,只能等待遷移任務執行完畢(成功或者失敗),這種不足直到2.6.0版本才得以解決。如果需要實現終止數據遷移,可以選擇升級Kafka版本或者修改Kafka源碼。
經常會出現一些突發的,不可預測的異常生產或者消費流量會對集群的IO等資源產生巨大壓力,最終影響整個集群的穩定與性能。那么我們可以對用戶的生產、消費、副本間數據同步進行流量限制,這個限流機制并不是為了限制用戶,而是避免突發的流量影響集群的穩定和性能,給用戶可以更好的服務。
如下圖所示,節點入流量由140MB/s左右突增到250MB/s,而出流量則從400MB/s左右突增至800MB/s。如果沒有限流機制,那么集群的多個節點將有被這些異常流量打掛的風險,甚至造成集群雪崩。
圖片生產/消費流量限制官網地址:點擊鏈接
對于生產者和消費者的流量限制,官網提供了以下幾種維度組合進行限制(當然,下面限流機制存在一定缺陷,后面在“Kafka開源版本功能缺陷”我們將提到):
/config/users/<user>/clients/<client-id> //根據用戶和客戶端ID組合限流 /config/users/<user>/clients/<default> /config/users/<user>//根據用戶限流 這種限流方式是我們最常用的方式 /config/users/<default>/clients/<client-id> /config/users/<default>/clients/<default> /config/users/<default> /config/clients/<client-id> /config/clients/<default>
在啟動Kafka的broker服務時需要開啟JMX參數配置,方便通過其他應用程序采集Kafka的各項JMX指標進行服務監控。當用戶需要調整限流閾值時,根據單個broker所能承受的流量進行智能評估,無需人工干預判斷是否可以調整;對于用戶流量限制,主要需要參考的指標包括以下兩個:
(1)消費流量指標:ObjectName:kafka.server:type=Fetch,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的出流量)、throttle-time(用戶在當前broker的出流量被限制時間) (2)生產流量指標:ObjectName:kafka.server:type=Produce,user=acl認證用戶名稱 屬性:byte-rate(用戶在當前broker的入流量)、throttle-time(用戶在當前broker的入流量被限制時間)
副本遷移/數據同步流量限制官網地址:鏈接
涉及參數如下:
//副本同步限流配置共涉及以下4個參數 leader.replication.throttled.rate follower.replication.throttled.rate leader.replication.throttled.replicas follower.replication.throttled.replicas
輔助指標如下:
(1)副本同步出流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesOutPerSec (2)副本同步入流量指標:ObjectName:kafka.server:type=BrokerTopicMetrics,name=ReplicationBytesInPerSec
關于Kafka的監控有一些開源的工具可用使用,比如下面這幾種:
Kafka Manager;
Kafka Eagle;
Kafka Monitor;
KafkaOffsetMonitor;
我們已經把Kafka Manager作為我們查看一些基本指標的工具嵌入平臺,然而這些開源工具不能很好的融入到我們自己的業務系統或者平臺上。所以,我們需要自己去實現一套粒度更細、監控更智能、告警更精準的系統。其監控覆蓋范圍應該包括基礎硬件、操作系統(操作系統偶爾出現系統進程hang住情況,導致broker假死,無法正常提供服務)、Kafka的broker服務、Kafka客戶端應用程序、zookeeper集群、上下游全鏈路監控。
網絡監控:
核心指標包括網絡入流量、網絡出流量、網絡丟包、網絡重傳、處于TIME.WAIT的TCP連接數、交換機、機房帶寬、DNS服務器監控(如果DNS服務器異常,可能出現流量黑洞,引起大面積業務故障)等。
磁盤監控:
核心指標包括監控磁盤write、磁盤read(如果消費時沒有延時,或者只有少量延時,一般都沒有磁盤read操作)、磁盤ioutil、磁盤iowait(這個指標如果過高說明磁盤負載較大)、磁盤存儲空間、磁盤壞盤、磁盤壞塊/壞道(壞道或者壞塊將導致broker處于半死不活狀態,由于有crc校驗,消費者將被卡住)等。
CPU監控:
監控CPU空閑率/負載,主板故障等,通常CPU使用率比較低不是Kafka的瓶頸。
內存/交換區監控:
內存使用率,內存故障。一般情況下,服務器上除了啟動Kafka的broker時分配的堆內存以外,其他內存基本全部被用來做PageCache。
緩存命中率監控:
由于是否讀磁盤對Kafka的性能影響很大,所以我們需要監控Linux的PageCache緩存命中率,如果緩存命中率高,則說明消費者基本命中緩存。
詳細內容請閱讀文章:《Linux Page Cache調優在Kafka中的應用》。
系統日志:
我們需要對操作系統的錯誤日志進行監控告警,及時發現一些硬件故障。
broker服務的監控,主要是通過在broker服務啟動時指定JMX端口,然后通過實現一套指標采集程序去采集JMX指標。(服務端指標官網地址)
**broker級監控:**broker進程、broker入流量字節大小/記錄數、broker出流量字節大小/記錄數、副本同步入流量、副本同步出流量、broker間流量偏差、broker連接數、broker請求隊列數、broker網絡空閑率、broker生產延時、broker消費延時、broker生產請求數、broker消費請求數、broker上分布leader個數、broker上分布副本個數、broker上各磁盤流量、broker GC等。
**topic級監控:**topic入流量字節大小/記錄數、topic出流量字節大小/記錄數、無流量topic、topic流量突變(突增/突降)、topic消費延時。
**partition級監控:**分區入流量字節大小/記錄數、分區出流量字節大小/記錄數、topic分區副本缺失、分區消費延遲記錄、分區leader切換、分區數據傾斜(生產消息時,如果指定了消息的key容易造成數據傾斜,這嚴重影響Kafka的服務性能)、分區存儲大小(可以治理單分區過大的topic)。
**用戶級監控:**用戶出/入流量字節大小、用戶出/入流量被限制時間、用戶流量突變(突增/突降)。
**broker服務日志監控:**對server端打印的錯誤日志進行監控告警,及時發現服務異常。
客戶端監控主要是自己實現一套指標上報程序,這個程序需要實現
org.apache.kafka.common.metrics.MetricsReporter 接口。然后在生產者或者消費者的配置中加入配置項 metric.reporters,如下所示:
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ""); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //ClientMetricsReporter類實現org.apache.kafka.common.metrics.MetricsReporter接口 props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName()); ...
客戶端指標官網地址:
http://kafka.apache.org/21/documentation.html#selector_monitoring
http://kafka.apache.org/21/documentation.html#common_node_monitoring
http://kafka.apache.org/21/documentation.html#producer_monitoring
http://kafka.apache.org/21/documentation.html#producer_sender_monitoring
http://kafka.apache.org/21/documentation.html#consumer_monitoring
http://kafka.apache.org/21/documentation.html#consumer_fetch_monitoring
客戶端監控流程架構如下圖所示:
1.4.3.1 生產者客戶端監控
**維度:**用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、brokerIP;
**指標:**連接數、IO等待時間、生產流量大小、生產記錄數、請求次數、請求延時、發送錯誤/重試次數等。
1.4.3.2 消費者客戶端監控
**維度:**用戶名稱、客戶端ID、客戶端IP、topic名稱、集群名稱、消費組、brokerIP、topic分區;
**指標:**連接數、io等待時間、消費流量大小、消費記錄數、消費延時、topic分區消費延遲記錄等。
Zookeeper進程監控;
Zookeeper的leader切換監控;
Zookeeper服務的錯誤日志監控;
當數據鏈路非常長的時候(比如:業務應用->埋點SDk->數據采集->Kafka->實時計算->業務應用),我們定位問題通常需要經過多個團隊反復溝通與排查才能發現問題到底出現在哪個環節,這樣排查問題效率比較低下。在這種情況下,我們就需要與上下游一起梳理整個鏈路的監控。出現問題時,第一時間定位問題出現在哪個環節,縮短問題定位與故障恢復時間。
我們對所有集群中不同對業務進行資源組物理隔離,避免各業務之間相互影響。在這里,我們假設集群有4個broker節點(Broker1/Broker2/Broker3/Broker4),2個業務(業務A/業務B),他們分別擁有topic分區分布如下圖所示,兩個業務topic都分散在集群的各個broker上,并且在磁盤層面也存在交叉。
試想一下,如果我們其中一個業務異常,比如流量突增,導致broker節點異常或者被打掛。那么這時候另外一個業務也將受到影響,這樣將大大的影響了我們服務的可用性,造成故障,擴大了故障影響范圍。
針對這些痛點,我們可以對集群中的業務進行物理資源隔離,各業務獨享資源,進行資源組劃分(這里把4各broker劃分為Group1和Group2兩個資源組)如下圖所示,不同業務的topic分布在自己的資源組內,當其中一個業務異常時,不會波及另外一個業務,這樣就可以有效的縮小我們的故障范圍,提高服務可用性。
我們把集群根據業務特點進行拆分為日志集群、監控集群、計費集群、搜索集群、離線集群、在線集群等,不同場景業務放在不同集群,避免不同業務相互影響。
隨著topic數據量增長,我們最初創建的topic指定的分區個數可能已經無法滿足數量流量要求,所以我們需要對topic的分區進行擴展。擴容分區時需要考慮一下幾點:
必須保證topic分區leader與follower輪詢的分布在資源組內所有broker上,讓流量分布更加均衡,同時需要考慮相同分區不同副本跨機架分布以提高容災能力;
當topic分區leader個數除以資源組節點個數有余數時,需要把余數分區leader優先考慮放入流量較低的broker。
隨著業務量增多,數據量不斷增大,我們的集群也需要進行broker節點擴容。關于擴容,我們需要實現以下幾點:
擴容智能評估:根據集群負載,把是否需要擴容評估程序化、智能化;
智能擴容:當評估需要擴容后,把擴容流程以及流量均衡平臺化。
某些場景下,我們需要下線我們的broker,主要包括以下幾個場景:
一些老化的服務器需要下線,實現節點下線平臺化;
服務器故障,broker故障無法恢復,我們需要下線故障服務器,實現節點下線平臺化;
有更優配置的服務器替換已有broker節點,實現下線節點平臺化。
我們為什么需要負載均衡呢?首先,我們來看第一張圖,下圖是我們集群某個資源組剛擴容后的流量分布情況,流量無法自動的分攤到我們新擴容后的節點上。那么這個時候需要我們手動去觸發數據遷移,把部分副本遷移至新節點上才能實現流量均衡。
下面,我們來看一下第二張圖。這張圖我們可以看出流量分布非常不均衡,最低和最高流量偏差數倍以上。這和Kafka的架構特點有關,當集群規模與數據量達到一定量后,必然出現當問題。這種情況下,我們也需要進行負載均衡。
我們再來看看第三張圖。這里我們可以看出出流量只有部分節點突增,這就是topic分區在集群內部不夠分散,集中分布到了某幾個broker導致,這種情況我們也需要進行擴容分區和均衡。
我們比較理想的流量分布應該如下圖所示,各節點間流量偏差非常小,這種情況下,既可以增強集群扛住流量異常突增的能力又可以提升集群整體資源利用率和服務穩定性,降低成本。
負載均衡我們需要實現以下效果:
1)生成副本遷移計劃以及執行遷移任務平臺化、自動化、智能化;
2)執行均衡后broker間流量比較均勻,且單個topic分區均勻分布在所有broker節點上;
3)執行均衡后broker內部多塊磁盤間流量比較均衡;
要實現這個效果,我們需要開發一套自己的負載均衡工具,如對開源的 cruise control進行二次開發;此工具的核心主要在生成遷移計劃的策略,遷移計劃的生成方案直接影響到最后集群負載均衡的效果。參考內容:
1. linkedIn/cruise-control
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference
cruise control架構圖如下:
在生成遷移計劃時,我們需要考慮以下幾點:
1)選擇核心指標作為生成遷移計劃的依據,比如出流量、入流量、機架、單topic分區分散性等;
2)優化用來生成遷移計劃的指標樣本,比如過濾流量突增/突降/掉零等異常樣本;
3)各資源組的遷移計劃需要使用的樣本全部為資源組內部樣本,不涉及其他資源組,無交叉;
4)治理單分區過大topic,讓topic分區分布更分散,流量不集中在部分broker,讓topic單分區數據量更小,這樣可以減少遷移的數據量,提升遷移速度;
5)已經均勻分散在資源組內的topic,加入遷移黑名單,不做遷移,這樣可以減少遷移的數據量,提升遷移速度;
6)做topic治理,排除長期無流量topic對均衡的干擾;
7)新建topic或者topic分區擴容時,應讓所有分區輪詢分布在所有broker節點,輪詢后余數分區優先分布流量較低的broker;
8)擴容broker節點后開啟負載均衡時,優先把同一broker分配了同一大流量(流量大而不是存儲空間大,這里可以認為是每秒的吞吐量)topic多個分區leader的,遷移一部分到新broker節點;
9)提交遷移任務時,同一批遷移計劃中的分區數據大小偏差應該盡可能小,這樣可以避免遷移任務中小分區遷移完成后長時間等待大分區的遷移,造成任務傾斜;
是不是我們的集群所有人都可以隨意訪問呢?當然不是,為了集群的安全,我們需要進行權限認證,屏蔽非法操作。主要包括以下幾個方面需要做安全認證:
(1)生產者權限認證;
(2)消費者權限認證;
(3)指定數據目錄遷移安全認證;
官網地址:http://kafka.apache.org
跨機架容災:
官網地址:http://kafka.apache.org
**跨集群/機房容災:**如果有異地雙活等業務場景時,可以參考Kafka2.7版本的MirrorMaker 2.0。
GitHub地址:https://github.com
精確KIP地址 :https://cwiki.apache.org
**ZooKeeper集群上Kafka元數據恢復:**我們會定期對ZooKeeper上的權限信息數據做備份處理,當集群元數據異常時用于恢復。
**broker服務參數優化:**這里我只列舉部分影響性能的核心參數。
num.network.threads #創建Processor處理網絡請求線程個數,建議設置為broker當CPU核心數*2,這個值太低經常出現網絡空閑太低而缺失副本。 num.io.threads #創建KafkaRequestHandler處理具體請求線程個數,建議設置為broker磁盤個數*2 num.replica.fetchers #建議設置為CPU核心數/4,適當提高可以提升CPU利用率及follower同步leader數據當并行度。 compression.type #建議采用lz4壓縮類型,壓縮可以提升CPU利用率同時可以減少網絡傳輸數據量。 queued.max.requests #如果是生產環境,建議配置最少500以上,默認為500。 log.flush.scheduler.interval.ms log.flush.interval.ms log.flush.interval.messages #這幾個參數表示日志數據刷新到磁盤的策略,應該保持默認配置,刷盤策略讓操作系統去完成,由操作系統來決定什么時候把數據刷盤; #如果設置來這個參數,可能對吞吐量影響非常大; auto.leader.rebalance.enable #表示是否開啟leader自動負載均衡,默認true;我們應該把這個參數設置為false,因為自動負載均衡不可控,可能影響集群性能和穩定;
**生產優化:**這里我只列舉部分影響性能的核心參數。
linger.ms #客戶端生產消息等待多久時間才發送到服務端,單位:毫秒。和batch.size參數配合使用;適當調大可以提升吞吐量,但是如果客戶端如果down機有丟失數據風險; batch.size #客戶端發送到服務端消息批次大小,和linger.ms參數配合使用;適當調大可以提升吞吐量,但是如果客戶端如果down機有丟失數據風險; compression.type #建議采用lz4壓縮類型,具備較高的壓縮比及吞吐量;由于Kafka對CPU的要求并不高,所以,可以通過壓縮,充分利用CPU資源以提升網絡吞吐量; buffer.memory #客戶端緩沖區大小,如果topic比較大,且內存比較充足,可以適當調高這個參數,默認只為33554432(32MB) retries #生產失敗后的重試次數,默認0,可以適當增加。當重試超過一定次數后,如果業務要求數據準確性較高,建議做容錯處理。 retry.backoff.ms #生產失敗后,重試時間間隔,默認100ms,建議不要設置太大或者太小。
除了一些核心參數優化外,我們還需要考慮比如topic的分區個數和topic保留時間;如果分區個數太少,保留時間太長,但是寫入數據量非常大的話,可能造成以下問題:
1)topic分區集中落在某幾個broker節點上,導致流量副本失衡;
2)導致broker節點內部某幾塊磁盤讀寫超負載,存儲被寫爆;
消費最大的問題,并且經常出現的問題就是消費延時,拉歷史數據。當大量拉取歷史數據時將出現大量讀盤操作,污染pagecache,這個將加重磁盤的負載,影響集群性能和穩定;
可以怎樣減少或者避免大量消費延時呢?
1)當topic數據量非常大時,建議一個分區開啟一個線程去消費;
2)對topic消費延時添加監控告警,及時發現處理;
3)當topic數據可以丟棄時,遇到超大延時,比如單個分區延遲記錄超過千萬甚至數億,那么可以重置topic的消費點位進行緊急處理;【此方案一般在極端場景才使用】
4)避免重置topic的分區offset到很早的位置,這可能造成拉取大量歷史數據;
我們需要對Linux的文件句柄、pagecache等參數進行優化。可參考《Linux Page Cache調優在Kafka中的應用》。
磁盤優化
在條件允許的情況下,可以采用SSD固態硬盤替換HDD機械硬盤,解決機械盤IO性能較低的問題;如果沒有SSD固態硬盤,則可以對服務器上的多塊硬盤做硬RAID(一般采用RAID10),讓broker節點的IO負載更加均衡。如果是HDD機械硬盤,一個broker可以掛載多塊硬盤,比如 12塊*4TB。
內存
由于Kafka屬于高頻讀寫型服務,而Linux的讀寫請求基本走的都是Page Cache,所以單節點內存大一些對性能會有比較明顯的提升。一般選擇256GB或者更高。
網絡
提升網絡帶寬:在條件允許的情況下,網絡帶寬越大越好。因為這樣網絡帶寬才不會成為性能瓶頸,最少也要達到萬兆網絡( 10Gb,網卡為全雙工)才能具備相對較高的吞吐量。如果是單通道,網絡出流量與入流量之和的上限理論值是1.25GB/s;如果是雙工雙通道,網絡出入流量理論值都可以達到1.25GB/s。
網絡隔離打標:由于一個機房可能既部署有離線集群(比如HBase、Spark、Hadoop等)又部署有實時集群(如Kafka)。那么實時集群和離線集群掛載到同一個交換機下的服務器將出現競爭網絡帶寬的問題,離線集群可能對實時集群造成影響。所以我們需要進行交換機層面的隔離,讓離線機器和實時集群不要掛載到相同的交換機下。即使有掛載到相同交換機下面的,我們也將進行網絡通行優先級(金、銀、銅、鐵)標記,當網絡帶寬緊張的時候,讓實時業務優先通行。
CPU
Kafka的瓶頸不在CPU,單節點一般有32核的CPU都足夠使用。
現在問題來了,前面我們提到很多監控、優化等手段;難道我們管理員或者業務用戶對集群所有的操作都需要登錄集群服務器嗎?答案當然是否定的,我們需要豐富的平臺化功能來支持。一方面是為了提升我們操作的效率,另外一方面也是為了提升集群的穩定和降低出錯的可能。
配置管理
黑屏操作,每次修改broker的server.properties配置文件都沒有變更記錄可追溯,有時可能因為有人修改了集群配置導致一些故障,卻找不到相關記錄。如果我們把配置管理做到平臺上,每次變更都有跡可循,同時降低了變更出錯的風險。
滾動重啟
當我們需要做線上變更時,有時候需要對集群對多個節點做滾動重啟,如果到命令行去操作,那效率將變得很低,而且需要人工去干預,浪費人力。這個時候我們就需要把這種重復性的工作進行平臺化,提升我們的操作效率。
集群管理
集群管理主要是把原來在命令行的一系列操作做到平臺上,用戶和管理員不再需要黑屏操作Kafka集群;這樣做主要有以下優點:
提升操作效率;
操作出錯概率更小,集群更安全;
所有操作有跡可循,可以追溯;
集群管理主要包括:broker管理、topic管理、生產/消費權限管理、用戶管理等
在平臺上為用戶的topic提供生產樣例數據與消費抽樣的功能,用戶可以不用自己寫代碼也可以測試topic是否可以使用,權限是否正常;
在平臺上為用戶的topic提供生產/消費權限驗證功能,讓用戶可以明確自己的賬號對某個topic有沒有讀寫權限;
把用戶讀/寫權限管理等相關操作進行平臺化。
把broker節點上下線做到平臺上,所有的上線和下線節點不再需要操作命令行。
1)無流量topic的治理,對集群中無流量topic進行清理,減少過多無用元數據對集群造成的壓力;
2)topic分區數據大小治理,把topic分區數據量過大的topic(如單分區數據量超過100GB/天)進行梳理,看看是否需要擴容,避免數據集中在集群部分節點上;
3)topic分區數據傾斜治理,避免客戶端在生產消息的時候,指定消息的key,但是key過于集中,消息只集中分布在部分分區,導致數據傾斜;
4)topic分區分散性治理,讓topic分區分布在集群盡可能多的broker上,這樣可以避免因topic流量突增,流量只集中到少數節點上的風險,也可以避免某個broker異常對topic影響非常大;
5)topic分區消費延時治理;一般有延時消費較多的時候有兩種情況,一種是集群性能下降,另外一種是業務方的消費并發度不夠,如果是消費者并發不夠的化應該與業務聯系增加消費并發。
1)把所有指標采集做成平臺可配置,提供統一的指標采集和指標展示及告警平臺,實現一體化監控;
2)把上下游業務進行關聯,做成全鏈路監控;
3)用戶可以配置topic或者分區流量延時、突變等監控告警;
業務大屏主要指標:集群個數、節點個數、日入流量大小、日入流量記錄、日出流量大小、日出流量記錄、每秒入流量大小、每秒入流量記錄、每秒出流量大小、每秒出流量記錄、用戶個數、生產延時、消費延時、數據可靠性、服務可用性、數據存儲大小、資源組個數、topic個數、分區個數、副本個數、消費組個數等指標。
把用戶流量現在做到平臺,在平臺進行智能限流處理。
把自動負載均衡功能做到平臺,通過平臺進行調度和管理。
當集群達到一定規模,流量不斷增長,那么集群擴容機器從哪里來呢?業務的資源預算,讓集群里面的多個業務根據自己在集群中當流量去分攤整個集群的硬件成本;當然,獨立集群與獨立隔離的資源組,預算方式可以單獨計算。
我們做單broker性能評估的目的包括以下幾方面:
1)為我們進行資源申請評估提供依據;
2)讓我們更了解集群的讀寫能力及瓶頸在哪里,針對瓶頸進行優化;
3)為我們限流閾值設置提供依據;
4)為我們評估什么時候應該擴容提供依據;
1)為我們創建topic時,評估應該指定多少分區合理提供依據;
2)為我們topic的分區擴容評估提供依據;
1)為我們了解磁盤的真正讀寫能力,為我們選擇更合適Kafka的磁盤類型提供依據;
2)為我們做磁盤流量告警閾值設置提供依據;
1)我們需要了解單個集群規模的上限或者是元數據規模的上限,探索相關信息對集群性能和穩定性的影響;
2)根據摸底情況,評估集群節點規模的合理范圍,及時預測風險,進行超大集群的拆分等工作;
當我們的集群節點達到一定規模,比如單集群數百個broker節點,那么此時我們生產消費客戶端指定bootstrap.servers配置時,如果指定呢?是隨便選擇其中幾個broker配置還是全部都配上呢?
其實以上做法都不合適,如果只配置幾個IP,當我們配置當幾個broker節點下線后,我們當應用將無法連接到Kafka集群;如果配置所有IP,那更不現實啦,幾百個IP,那么我們應該怎么做呢?
**方案:**采用DNS+LVS網絡架構,最終生產者和消費者客戶端只需要配置域名就可以啦。需要注意的是,有新節點加入集群時,需要添加映射;有節點下線時,需要從映射中踢掉,否則這批機器如果拿到其他的地方去使用,如果端口和Kafka的一樣的話,原來集群部分請求將發送到這個已經下線的服務器上來,造成生產環境重點故障。
RTMP協議主要的特點有:多路復用,分包和應用層協議。以下將對這些特點進行詳細的描述。
無法實現增量遷移;【我們已經基于2.1.1版本源碼改造,實現了增量遷移】
無法實現并發遷移;【開源版本直到2.6.0才實現了并發遷移】
無法實現終止遷移;【我們已經基于2.1.1版本源碼改造,實現了終止副本遷移】【開源版本直到2.6.0才實現了暫停遷移,和終止遷移有些不一樣,不會回滾元數據】
當指定遷移數據目錄時,遷移過程中,如果把topic保留時間改短,topic保留時間針對正在遷移topic分區不生效,topic分區過期數據無法刪除;【開源版本bug,目前還沒有修復】
當指定遷移數據目錄時,當遷移計劃為以下場景時,整個遷移任務無法完成遷移,一直處于卡死狀態;【開源版本bug,目前還沒有修復】
遷移過程中,如果有重啟broker節點,那個broker節點上的所有leader分區無法切換回來,導致節點流量全部轉移到其他節點,直到所有副本被遷移完畢后leader才會切換回來;【開源版本bug,目前還沒有修復】。
在原生的Kafka版本中存在以下指定數據目錄場景無法遷移完畢的情況,此版本我們也不決定修復次bug: 1.針對同一個topic分區,如果部分目標副本相比原副本是所屬broker發生變化,部分目標副本相比原副本是broker內部所屬數據目錄發生變化; 那么副本所屬broker發生變化的那個目標副本可以正常遷移完畢,目標副本是在broker內部數據目錄發生變化的無法正常完成遷移; 但是舊副本依然可以正常提供生產、消費服務,并且不影響下一次遷移任務的提交,下一次遷移任務只需要把此topic分區的副本列表所屬broker列表變更后提交依然可以正常完成遷移,并且可以清理掉之前未完成的目標副本; 這里假設topic yyj1的初始化副本分布情況如下: { "version":1, "partitions":[ {"topic":"yyj","partition":0,"replicas":[1000003,1000001],"log_dirs":["/kfk211data/data31","/kfk211data/data13"]} ] } //遷移場景1: { "version":1, "partitions":[ {"topic":"yyj","partition":0,"replicas":[1000003,1000002],"log_dirs":["/kfk211data/data32","/kfk211data/data23"]} ] } //遷移場景2: { "version":1, "partitions":[ {"topic":"yyj","partition":0,"replicas":[1000002,1000001],"log_dirs":["/kfk211data/data22","/kfk211data/data13"]} ] } 針對上述的topic yyj1的分布分布情況,此時如果我們的遷移計劃為“遷移場景1”或遷移場景2“,那么都將出現有副本無法遷移完畢的情況。 但是這并不影響舊副本處理生產、消費請求,并且我們可以正常提交其他的遷移任務。 為了清理舊的未遷移完成的副本,我們只需要修改一次遷移計劃【新的目標副本列表和當前分區已分配副本列表完全不同即可】,再次提交遷移即可。 這里,我們依然以上述的例子做遷移計劃修改如下: { "version":1, "partitions":[ {"topic":"yyj","partition":0,"replicas":[1000004,1000005],"log_dirs":["/kfk211data/data42","/kfk211data/data53"]} ] } 這樣我們就可以正常完成遷移。
限流粒度較粗,不夠靈活精準,不夠智能。
當前限流維度組合
/config/users/<user>/clients/<client-id> /config/users/<user>/clients/<default> /config/users/<user> /config/users/<default>/clients/<client-id> /config/users/<default>/clients/<default> /config/users/<default> /config/clients/<client-id> /config/clients/<default>
存在問題
當同一個broker上有多個用戶同時進行大量的生產和消費時,想要讓broker可以正常運行,那必須在做限流時讓所有的用戶流量閾值之和不超過broker的吞吐上限;如果超過broker上限,那么broker就存在被打掛的風險;然而,即使用戶流量沒有達到broker的流量上限,但是,如果所有用戶流量集中到了某幾塊盤上,超過了磁盤的讀寫負載,也會導致所有生產、消費請求將被阻塞,broker可能處于假死狀態。
解決方案
(1)改造源碼,實現單個broker流量上限限制,只要流量達到broker上限立即進行限流處理,所有往這個broker寫的用戶都可以被限制住;或者對用戶進行優先級處理,放過高優先級的,限制低優先級的;
(2)改造源碼,實現broker上單塊磁盤流量上限限制(很多時候都是流量集中到某幾塊磁盤上,導致沒有達到broker流量上限卻超過了單磁盤讀寫能力上限),只要磁盤流量達到上限,立即進行限流處理,所有往這個磁盤寫的用戶都可以被限制住;或者對用戶進行優先級處理,放過高優先級的,限制低優先級的;
(3)改造源碼,實現topic維度限流以及對topic分區的禁寫功能;
(4)改造源碼,實現用戶、broker、磁盤、topic等維度組合精準限流;
到此,關于“Kafka怎么用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。