亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka生產者發送消息流程是什么

發布時間:2023-03-31 17:02:07 來源:億速云 閱讀:106 作者:iii 欄目:開發技術

今天小編給大家分享一下kafka生產者發送消息流程是什么的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

消息發送過程

消息的發送可能會經過攔截器、序列化、分區器等過程。消息發送的主要涉及兩個線程,分別為main線程和sender線程。

kafka生產者發送消息流程是什么

如圖所示,主線程由 afkaProducer 創建消息,然后通過可能的攔截器、序列化器和分區器的作用之后緩存到消息累加器RecordAccumulator (也稱為消息收集器)中。 Sender 線程負責從RecordAccumulator 獲取消息并將其發送到 Kafka中。

攔截器

在消息序列化之前會經過消息攔截器,自定義攔截器需要實現ProducerInterceptor接口,接口主要有兩個方案#onSend和#onAcknowledgement,在消息發送之前會調用前者方法,可以在發送之前假如處理邏輯,比如計費。在收到服務端ack響應后會觸發后者方法。需要注意的是攔截器中不要加入過多的復雜業務邏輯,以免影響發送效率。

消息分區

消息ProducerRecord會將消息路由到那個分區中,分兩種情況:

1.指定了partition字段

如果消息ProducerRecord中指定了 partition字段,那么就不需要走分區器,直接發往指定得partition分區中。

2.沒有指定partition,但自定義了分區器

3.沒指定parittion,也沒有自定義分區器,但key不為空

4.沒指定parittion,也沒有自定義分區器,key也為空

看源碼

// KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
//指定分區partition則直接返回,否則走分區器
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(),                 serializedValue, cluster);
}
//DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

partition 方法中定義了分區分配邏輯 如果 ke 不為 null , 那 么默認的分區器會對 key 進行哈 希(采 MurmurHash3 算法 ,具備高運算性能及 低碰 撞率),最終根據得到 哈希值來 算分區號, 有相同 key 的消息會被寫入同一個分區 如果 key null ,那么消息將會以輪詢的方式發往主題內的各個可用分區。

消息累加器

分區確定好了之后,消息并不是直接發送給broker,因為一個個發送網絡消耗太大,而是先緩存到消息累加器RecordAccumulator,RecordAccumulator主要用來緩存消息 Sender 線程可以批量發送,進 減少網絡傳輸 的資源消耗以提升性能 RecordAccumulator 緩存的大 小可以通過生產者客戶端參數 buffer memory 配置,默認值為 33554432B ,即 32MB如果生產者發送消息的速度超過發 送到服務器的速度 ,則會導致生產者空間不足,這個時候 KafkaProducer的send()方法調用要么 被阻塞,要么拋出異常,這個取決于參數 max block ms 的配置,此參數的默認值為 60秒。

消息累加器本質上是個ConcurrentMap,

ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

發送流程源碼分析

//KafkaProducer
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
	// intercept the record, which can be potentially modified; this method does not throw exceptions
    //首先執行攔截器鏈
	ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
	return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;
	try {
		throwIfProducerClosed();
		// first make sure the metadata for the topic is available
		long nowMs = time.milliseconds();
		ClusterAndWaitTime clusterAndWaitTime;
		try {
			clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
		} catch (KafkaException e) {
			if (metadata.isClosed())
				throw new KafkaException("Producer closed while send in progress", e);
			throw e;
		}
		nowMs += clusterAndWaitTime.waitedOnMetadataMs;
		long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
		Cluster cluster = clusterAndWaitTime.cluster;
		byte[] serializedKey;
		try {
			//key序列化
			serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in key.serializer", cce);
		}
		byte[] serializedValue;
		try {
			//value序列化
			serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
		} catch (ClassCastException cce) {
			throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
					" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
					" specified in value.serializer", cce);
		}
		//獲取分區partition
		int partition = partition(record, serializedKey, serializedValue, cluster);
		tp = new TopicPartition(record.topic(), partition);
		setReadOnly(record.headers());
		Header[] headers = record.headers().toArray();
		//消息壓縮
		int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
				compressionType, serializedKey, serializedValue, headers);
		//判斷消息是否超過最大允許大小,消息緩存空間是否已滿
		ensureValidRecordSize(serializedSize);
		long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
		if (log.isTraceEnabled()) {
			log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
		}
		// producer callback will make sure to call both 'callback' and interceptor callback
		Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
		if (transactionManager != null && transactionManager.isTransactional()) {
			transactionManager.failIfNotReadyForSend();
		}
		//將消息緩存在消息累加器RecordAccumulator中
		RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
        //開辟新的ProducerBatch
		if (result.abortForNewBatch) {
			int prevPartition = partition;
			partitioner.onNewBatch(record.topic(), cluster, prevPartition);
			partition = partition(record, serializedKey, serializedValue, cluster);
			tp = new TopicPartition(record.topic(), partition);
			if (log.isTraceEnabled()) {
				log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
			}
			// producer callback will make sure to call both 'callback' and interceptor callback
			interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
 
			result = accumulator.append(tp, timestamp, serializedKey,
				serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
		}
		if (transactionManager != null && transactionManager.isTransactional())
			transactionManager.maybeAddPartitionToTransaction(tp);
		//判斷消息是否已滿,喚醒sender線程進行發送消息
		if (result.batchIsFull || result.newBatchCreated) {
			log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
			this.sender.wakeup();
		}
		return result.future;
		// handling exceptions and record the errors;
		// for API exceptions return them in the future,
		// for other exceptions throw directly
	} catch (Exception e) {
		// we notify interceptor about all exceptions, since onSend is called before anything else in this method
		this.interceptors.onSendError(record, tp, e);
		throw e;
	}
}

生產消息的可靠性

消息發送到broker,什么情況下生產者才確定消息寫入成功了呢?ack是生產者一個重要的參數,它有三個值,ack=1表示leader副本寫入成功服務端即可返回給生產者,是吞吐量和消息可靠性的平衡方案;ack=0表示生產者發送消息之后不需要等服務端響應,這種消息丟失風險最大;ack=-1表示生產者需要等等ISR中所有副本寫入成功后才能收到響應,這種消息可靠性最高但吞吐量也是最小的。

以上就是“kafka生產者發送消息流程是什么”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

朝阳县| 高淳县| 个旧市| 舞阳县| 依兰县| 出国| 柏乡县| 浦江县| 通州市| 永兴县| 上虞市| 滦平县| 田林县| 兴文县| 南丰县| 喀什市| 潮安县| 枣强县| 平罗县| 阿荣旗| 邹城市| 巴楚县| 台东市| 朔州市| 樟树市| 德保县| 长宁县| 布拖县| 和政县| 金平| 南涧| 博湖县| 赫章县| 武宣县| 保靖县| 敦化市| 晋江市| 克什克腾旗| 锦州市| 邳州市| 威远县|