您好,登錄后才能下訂單哦!
這篇文章主要講解了“怎么理解kafka分區、生產和消費”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“怎么理解kafka分區、生產和消費”吧!
分區規則指的是將每個Topic劃分成多個分區(Partition),每個分區是一組有序的消息日志,生產者生產的每條消息只會被發送到其中一個分區。
分區 (Partition) 都是一個有序的、不可變的數據序列,消息數據被不斷的添加到序列的尾部。分區中的每一條消息數據都被賦予了一個連續的數字ID,即偏移量 (offset) ,用于唯一標識分區中的每條消息數據。
分區(Partition)的作用就是提供負載均衡的能力,單個topic的不同分區可存儲在相同或不同節點機上,為實現系統的高伸縮性(Scalability),不同的分區被放置到不同節點的機器上,各節點機獨立地執行各自分區的讀寫任務,如果性能不足,可通過添加新的節點機器來增加整體系統的吞吐量。
Kafka分區下數據使用消息日志(Log)方式保存數據,具體方式是在磁盤上創建只能追加寫(Append-only)消息的物理文件。因為只能追加寫入,因此避免了緩慢的隨機I/O操作,改為性能較好的順序I/O寫操作。Kafka日志文件分為多個日志段(Log Segment),消息被追加寫到當前最新的日志段中,當寫滿一個日志段后Kafka會自動切分出一個新的日志段,并將舊的日志段封存。
Kafka將消息數據根據Partition進行存儲,Partition分為若干Segment,每個Segment的大小相等。Segment由index file、log file、timeindex file等組成,后綴為".index"和".log",分別表示為Segment索引文件、數據文件,每一個Segment存儲著多條信息。
分區策略是決定生產者將消息發送到哪個分區的算法。Kafka提供默認的分區策略,同時支持自定義分區策略。
Kafka 默認分區策略同時實現了兩種策略:如果指定Key,那么默認實現按消息鍵保序策略;如果沒有指定Key,則使用輪詢策略
輪詢策略(Round-robin),即順序分配策略。如果一個Topic有3個分區,則第1條消息被發送到分區0,第2條被發送到分區1,第3條被發送到分區2,以此類推。當生產第4條消息時又會重新輪詢將其分配到分區0。
輪詢策略是Kafka Java生產者API默認提供的分區策略。如果未指定partitioner.class參數,那么生產者程序會按照輪詢的方式在Topic的所有分區間均勻地存儲消息。輪詢策略有非常優秀的負載均衡表現,能保證消息最大限度地被平均分配到所有分區上。
隨機策略(Randomness)是將消息隨機地放置到任意一個分區上。如果要實現隨機策略版的partition方法,Java版如下:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return ThreadLocalRandom.current().nextInt(partitions.size());
先計算出Topic的總分區數,然后隨機地返回一個小于分區數的正整數。隨機策略本質上是力求將數據均勻地分散到各個分區,但實際表現要遜于輪詢策略,如果追求數據的均勻分布,推薦使用輪詢策略。
Kafka允許為每條消息定義消息鍵,簡稱為Key。Key可以是一個有著明確業務含義的字符串,如客戶代碼、部門編號或是業務ID等,也可以用來表征消息元數據。一旦消息被定義了Key,就可以保證同一個Key的所有消息都進入到相同的分區中。
實現分區策略的partition方法只需要兩行代碼即可:
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return Math.abs(key.hashCode()) % partitions.size();
基于地理位置的分區策略通常只針對大規模的Kafka集群,特別是跨城市、跨國家甚至跨大洲的集群。假設天貓計劃為每個新注冊用戶提供一份注冊禮品,比如歐美的用戶注冊天貓時可以免費得到一臺iphone SE手機,而中國的新注冊用戶可以得到一臺華為P40 Pro。為了實現相應的注冊業務邏輯,只需要創建一個雙分區的主題,然后再創建兩個消費者程序分別處理歐美和中國用戶的注冊用戶邏輯即可,同時必須把不同地理位置的用戶注冊的消息發送到不同機房中,因為處理注冊消息的消費者程序只可能在某一個機房中啟動著。基于地理位置的分區策略可以根據Broker的IP地址實現定制化的分區策略。
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); return partitions.stream().filter(p -> isChina(p.leader().host())).map(PartitionInfo::partition).findAny().get();
可以從所有分區中找出Leader副本在中國的所有分區,然后隨機挑選一個進行消息發送。
如果要自定義分區策略,需要顯式地配置生產者端的參數partitioner.class。編寫生產者程序時,可以編寫一個具體的類實現org.apache.kafka.clients.producer.Partitioner
接口(partition()和close()),通常只需要實現最重要的partition方法。
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
topic、key、keyBytes、value和valueBytes都屬于消息數據,cluster則是集群信息(比如當前Kafka集群共有多少主題、多少Broker等)。設置partitioner.class參數為自己實現類的Full Qualified Name,生產者程序就會按照自定義分區策略的代碼邏輯對消息進行分區。
無論消息是否被消費,kafka都會保留所有消息,同時定期檢查舊的日志段是否能夠被刪除,從而回收磁盤空間,刪除策略有兩種:
基于時間:log.retention.hours=168
基于大小:log.retention.bytes=1073741824
需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高Kafka 性能無關。
Kafka 2.1.0版本前,支持GZIP、Snappy、LZ4三種壓縮算法。2.1.0版本開始正式支持Zstandard算法(簡寫為zstd ,Facebook開源的一個壓縮算法),該算法能夠提供超高的壓縮比(compression ratio)。壓縮算法可以使用壓縮比和壓縮/解壓縮吞吐量兩個指標進行衡量。不同壓縮算法的性能比較如下:
生產環境中,GZIP、Snappy、LZ4、zstd性能表現各有千秋,在吞吐量方面:LZ4 > Snappy > zstd > GZIP;在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
如果要啟用Producer端的壓縮,Producer程序運行機器上的CPU資源必須充足。除了CPU資源充足,如果生產環境中帶寬資源有限,也建議Producer端開啟壓縮。通常,帶寬比CPU和內存要昂貴的多,因此千兆網絡中Kafka集群帶寬資源耗盡很容易出現。如果客戶端機器CPU資源富余,建議Producer端開啟zstd壓縮,可以極大地節省網絡資源消耗。對于解壓縮,需要避免非正常的解壓縮,如消息格式轉換的解壓縮操作、Broker與Producer解壓縮算法不一致。
Producer發送壓縮消息到Broker后,Broker會原封不動保存。當Consumer程序請求消息時,Broker 會原樣發出,當消息到達Consumer端后,Consumer自行解壓縮消息。Kafka會將使用的壓縮算法封裝進消息集合中,當Consumer讀取到消息集合時,會知道消息使用的壓縮算法。除了在Consumer端解壓縮,Broker端也會進行解壓縮,每個壓縮過的消息集合在Broker端寫入時都要發生解壓縮操作,對消息執行各種驗證。解壓縮對Broker端性能是有一定影響的。
如果將Topic設置成單分區,該Topic的所有的消息都只在一個分區內讀寫,保證全局的順序性,但將喪失Kafka多分區帶來的高吞吐量和負載均衡的性能優勢。
多分區消息保序的方法是按消息鍵保序策略,根據業務提取出需要保序的消息的邏輯主體,并建立消息標志位ID,,對標志位設定專門的分區策略,保證同一標志位的所有消息都發送到同一分區,既可以保證分區內的消息順序,也可以享受到多分區帶來的搞吞吐量。
說明:消息重試只是簡單將消息重新發送到原來的分區,不會重新選擇分區。
kafka只能保證分區內有序,無法保證分區間有序,所以消費時,數據是相對有序的。
在通過API方式發布消息時,生產者是以Record為消息進行發布的。Record中包含key與value,value才是消息本身,而key用于路由消息所要存放Partition。消息要寫入到哪個Partition并不是隨機的,而是由路由策略決定。
指定Partition,直接寫入指定Partition。
沒有指定Partition但指定了key,則通過對key的hash值與Partition數量取模,結果就是要選出的Partition索引。
Partition和key都未指定,則使用輪詢算法選出一個Partition。
增加分區時,Partition內的消息不會重新進行分配,隨著數據繼續寫入,新分區才會參與再平衡。
Producer先通過分區策略確定數據錄入的partition,再從Zookeeper中找到Partition的Leader
Producer將消息發送給分區的Leader。
Leader將消息接入本地的Log,并通知ISR(In-sync Replicas,副本同步列表)的Followers。
ISR中的Followers從Leader中pull消息,寫入本地Log后向Leader發送ACK(消息發送確認機制)。
Leader收到所有ISR中的Followers的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer發送ACK,表示消息寫入成功。
必須使用producer.send(msg, callback)接口發送消息。
Producer端設置acks參數值為all。acks參數值為all表示ISR中所有Broker副本都接收到消息,消息才算已提交。
設置Producer端retries參數值為一個較大值,表示Producer自動重試次數。當出現網絡瞬時抖動時,消息發送可能會失敗,此時Producer能夠自動重試消息發送,避免消息丟失。
設置Broker端unclean.leader.election.enable = false,unclean.leader.election.enable參數用于控制有資格競選分區Leader的Broker。如果一個Broker落后原Leader太多,那么成為新Leader必然會造成消息丟失。因此,要將unclean.leader.election.enable參數設置成false。
設置Broker端參數replication.factor >= 3,將消息保存多份副本。
設置Broker參數min.insync.replicas > 1,保證ISR中Broker副本的最少個數,在acks=-1時才生效。設置成大于1可以提升消息持久性,生產環境中不能使用默認值 1。
必須確保replication.factor > min.insync.replicas,如果兩者相等,那么只要有一個副本掛機,整個分區無法正常工作。推薦設置成replication.factor = min.insync.replicas + 1。
確保消息消費完成再提交。設置Consumer端參數enable.auto.commit為false,并采用手動提交位移的方式。
Producer端攔截器實現類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口有兩個核心的方法:
onSend:在消息發送前被調用。
onAcknowledgement:在消息成功提交或發送失敗后被調用。onAcknowledgement 調用要早于發送回調通知callback的調用。onAcknowledgement與onSend 方法不是在同一個線程中被調用,因此如果兩個方法中使用了某個共享可變對象,要保證線程安全。
假設第一個攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個攔截器是com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,Producer指定攔截器的Java代碼示例如下:
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1 interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2 props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Consumer向Broker提交連接請求,連接的Broker會向其發送Broker Controller的通信URL,即配置文件中的listeners地址;
當Consumer指定了要消費的Topic后,會向Broker Controller發送消費請求;
Broker Controller會為Consumer分配一個或幾個Partition Leader,并將Partition的當前offset發送給Consumer;
Consumer會按照Broker Controller分配的Partition對其中的消息進行消費;
當Consumer消費完消息后,Consumer會向Broker發送一個消息已經被消費反饋,即消息的offset;
在Broker接收到Consumer的offset后,會更新相應的__consumer_offset中;
Consumer攔截器的實現類要實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口,ConsumerInterceptor有兩個核心方法。
onConsume:在消息返回給Consumer程序前調用。在開始正式處理消息前,攔截器會先做一些處理,再返回給Consumer。
onCommit:Consumer在提交位移后調用,可以進行一些打日志操作等。
同一個Consumer重復消費
當Consumer由于消費能力低而引發了消費超時,則可能會形成重復消費。
在某數據剛好消費完畢,但正準備提交offset時,消費時間超時,則Broker認為消息未消費成功,產生重復消費問題。
其解決方案:延長offset提交時間。
不同的Consumer重復消費
當Consumer消費了消息,但還沒有提交offset時宕機,則已經被消費過的消息會被重復消費。
感謝各位的閱讀,以上就是“怎么理解kafka分區、生產和消費”的內容了,經過本文的學習后,相信大家對怎么理解kafka分區、生產和消費這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。