亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka數據源Flink Kafka Consumer分析

發布時間:2021-11-22 09:51:33 來源:億速云 閱讀:199 作者:iii 欄目:大數據

這篇文章主要講解了“kafka數據源Flink Kafka Consumer分析”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“kafka數據源Flink Kafka Consumer分析”吧!

一、open()方法調用時機

FlinkKafkaConsumer繼承自RichFunction,具有生命周期方法open()。那么flink是何時調用FlinkKafkaConsumer的open()方法呢?

StreamTask在調用算子程序之前,會執行beforeInvoke()方法,在該方法中會初始化算子的算子并且執行open()方法:

	operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());

initializeStateAndOpenOperators()方法中循環對算子初始化:

	protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
		for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
			StreamOperator<?> operator = operatorWrapper.getStreamOperator();
			operator.initializeState(streamTaskStateInitializer);
			operator.open();
		}
	}

kafka source對應的operator為StreamSource,其open()方法為

	public void open() throws Exception {
		super.open();
		FunctionUtils.openFunction(userFunction, new Configuration());
	}

FunctionUtils的openFunction()即執行算子(要繼承RichFunction)的open()方法:

	public static void openFunction(Function function, Configuration parameters) throws Exception{
		if (function instanceof RichFunction) {
			RichFunction richFunction = (RichFunction) function;
			richFunction.open(parameters);
		}
	}

二、運行時上下文RuntimeContext何時賦值?

在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的構造函數中,通過工廠類StreamOperatorFactory來創建StreamOperator。kafka source對應的StreamOperatorFactory為SimpleOperatorFactory,createStreamOperator()方法中調用StreamOperator的setup()方法:

	public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
		if (operator instanceof AbstractStreamOperator) {
			((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
		}
		if (operator instanceof SetupableStreamOperator) {
			((SetupableStreamOperator) operator).setup(
				parameters.getContainingTask(),
				parameters.getStreamConfig(),
				parameters.getOutput());
		}
		return (T) operator;
	}

kafka source對應的StreamOperator為StreamSource,其實現了SetupableStreamOperator接口。其setup方法在父類AbstractUdfStreamOperator:

	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
		super.setup(containingTask, config, output);
		FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());

	}

FunctionUtils.setFunctionRuntimeContext()來給算子設置RuntimeContext。設置的RuntimeContext在AbstractStreamOperator的setup()方法中,為StreamingRuntimeContext:

		this.runtimeContext = new StreamingRuntimeContext(
			environment,
			environment.getAccumulatorRegistry().getUserMap(),
			getMetricGroup(),
			getOperatorID(),
			getProcessingTimeService(),
			null,
			environment.getExternalResourceInfoProvider());

三、FlinkKafkaConsumer的run()方法

Flink調用FlinkKafkaConsumer的run()方法來生產數據。run()方法的處理邏輯:

①創建KafkaFetcher,來拉取數據

		this.kafkaFetcher = createFetcher(
				sourceContext,
				subscribedPartitionsToStartOffsets,
				watermarkStrategy,
				(StreamingRuntimeContext) getRuntimeContext(),
				offsetCommitMode,
				getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
				useMetrics);

②KafkaFetcher的runFetchLoop()中創建KafkaConsumerThread線程來循環拉取kafka數據。KafkaConsumerThread通過KafkaConsumer拉取kafka數據,并交給Handover

				if (records == null) {
					try {
						records = consumer.poll(pollTimeout);
					}
					catch (WakeupException we) {
						continue;
					}
				}

				try {
					handover.produce(records);
					records = null;
				}

KafkaFetcher通過Handover獲取拉取的kafka數據

			while (running) {
				// this blocks until we get the next records
				// it automatically re-throws exceptions encountered in the consumer thread
				final ConsumerRecords<byte[], byte[]> records = handover.pollNext();

				// get the records for each topic partition
				for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {

					List<ConsumerRecord<byte[], byte[]>> partitionRecords =
						records.records(partition.getKafkaPartitionHandle());

					partitionConsumerRecordsHandler(partitionRecords, partition);
				}
			}

③通過SourceContext中的Output<StreamRecord<T>>來發送數據給下一個算子

		public void collect(T element) {
			synchronized (lock) {
				output.collect(reuse.replace(element));
			}
		}

SourceContext在StreamSource的run()方法中通過StreamSourceContexts.getSourceContext()創建。Output<StreamRecord<T>>在OperatorChain的createOutputCollector()創建,為其返回值。

		for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
			@SuppressWarnings("unchecked")
			RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);

			allOutputs.add(new Tuple2<>(output, outputEdge));
		}

當有一個輸出時,是RecordWriterOutput;多個時,是CopyingDirectedOutput或DirectedOutput

④單個輸出RecordWriterOutput時,是通過成員屬性RecordWriter實例來輸出。RecordWriter通過StreamTask的createRecordWriterDelegate()創建,RecordWriterDelegate為RecordWriter的代理類,內部持有RecordWriter實例:

	public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(
			StreamConfig configuration,
			Environment environment) {
		List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = createRecordWriters(
			configuration,
			environment);
		if (recordWrites.size() == 1) {
			return new SingleRecordWriter<>(recordWrites.get(0));
		} else if (recordWrites.size() == 0) {
			return new NonRecordWriter<>();
		} else {
			return new MultipleRecordWriters<>(recordWrites);
		}
	}

	private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
			StreamConfig configuration,
			Environment environment) {
		List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
		List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());

		for (int i = 0; i < outEdgesInOrder.size(); i++) {
			StreamEdge edge = outEdgesInOrder.get(i);
			recordWriters.add(
				createRecordWriter(
					edge,
					i,
					environment,
					environment.getTaskInfo().getTaskName(),
					edge.getBufferTimeout()));
		}
		return recordWriters;
	}

outEdgesInOrder來源于StreamGraph中的StreamNode的List<StreamEdge> outEdges。

創建RecordWriter時,根據StreamEdge的StreamPartitioner<?> outputPartitioner的isBroadcast()方法判斷是BroadcastRecordWriter還是ChannelSelectorRecordWriter:

	public RecordWriter<T> build(ResultPartitionWriter writer) {
		if (selector.isBroadcast()) {
			return new BroadcastRecordWriter<>(writer, timeout, taskName);
		} else {
			return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
		}
	}

outputPartitioner是根據上下游節點并行度是否一致來確定:

			if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
				partitioner = new ForwardPartitioner<Object>();
			} else if (partitioner == null) {
				partitioner = new RebalancePartitioner<Object>();
			}

BroadcastRecordWriter和ChannelSelectorRecordWriter最終都會調用成員屬性ResultPartitionWriter targetPartition的flush()方法來輸出數據。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根據對應的ResultPartitionDeploymentDescriptor來判斷是ConsumableNotifyingResultPartitionWriterDecorator還是直接傳入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator會把消息直接傳給下個節點消費,通過ResultPartitionConsumableNotifier來通知:

	public static ResultPartitionWriter[] decorate(
			Collection<ResultPartitionDeploymentDescriptor> descs,
			ResultPartitionWriter[] partitionWriters,
			TaskActions taskActions,
			JobID jobId,
			ResultPartitionConsumableNotifier notifier) {

		ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
		int counter = 0;
		for (ResultPartitionDeploymentDescriptor desc : descs) {
			if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {
				consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator(
					taskActions,
					jobId,
					partitionWriters[counter],
					notifier);
			} else {
				consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];
			}
			counter++;
		}
		return consumableNotifyingPartitionWriters;
	}

partitionWriters通過 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create()  創建。 ResultPartition的輸出是通過成員屬性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:

	private void createSubpartitions(
			ResultPartition partition,
			ResultPartitionType type,
			BoundedBlockingSubpartitionType blockingSubpartitionType,
			ResultSubpartition[] subpartitions) {
		// Create the subpartitions.
		if (type.isBlocking()) {
			initializeBoundedBlockingPartitions(
				subpartitions,
				partition,
				blockingSubpartitionType,
				networkBufferSize,
				channelManager);
		} else {
			for (int i = 0; i < subpartitions.length; i++) {
				subpartitions[i] = new PipelinedSubpartition(i, partition);
			}
		}
	}

流式任務時,ResultSubpartition為PipelinedSubpartition。

四、數據寫出

4.1 ResultPartitionConsumableNotifier通知

ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:

	private JobTable.Connection associateWithJobManager(
			JobTable.Job job,
			ResourceID resourceID,
			JobMasterGateway jobMasterGateway) {
		......
        ......

		ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
			jobMasterGateway,
			getRpcService().getExecutor(),
			taskManagerConfiguration.getTimeout());

		......
        ......
	}

RpcResultPartitionConsumableNotifier遠程調用JobMaster的scheduleOrUpdateConsumers()方法,傳入ResultPartitionID partitionId

4.1.1 JobMaster的scheduleOrUpdateConsumers()

JobMaster通過ExecutionGraph的scheduleOrUpdateConsumers()通知下游消費算子。

這里有兩個關鍵代碼:

①從本算子ExecutionVertex的成員Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions中取出該分區對應的生產消費信息,這些信息存儲在IntermediateResultPartition中;

	void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {

		.......

		final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());

		.......

		if (partition.getIntermediateResult().getResultType().isPipelined()) {
			// Schedule or update receivers of this partition
			execution.scheduleOrUpdateConsumers(partition.getConsumers());
		}
		else {
			throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
					"pipelined partitions.");
		}
	}

從IntermediateResultPartition取出消費者List<List<ExecutionEdge>> allConsumers;

從ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出執行任務;

②Execution的sendUpdatePartitionInfoRpcCall()方法通過rpc調用TaskExcutor的updatePartitions()方法來執行下游消費者算子

	private void sendUpdatePartitionInfoRpcCall(
			final Iterable<PartitionInfo> partitionInfos) {

		final LogicalSlot slot = assignedResource;

		if (slot != null) {
			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
			final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();

			CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);

			updatePartitionsResultFuture.whenCompleteAsync(
				(ack, failure) -> {
					// fail if there was a failure
					if (failure != null) {
						fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() +
							"] on TaskManager " + taskManagerLocation + " failed", failure));
					}
				}, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());
		}
	}
4.1.2 TaskExecutor的updatePartitions()

TaskExecutor的updatePartitions()來更新分區信息。如果之前InputChannel是未知的,則進行更新。SimpleInputGate的updateInputChannel():

	public void updateInputChannel(
			ResourceID localLocation,
			NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
		synchronized (requestLock) {
			if (closeFuture.isDone()) {
				// There was a race with a task failure/cancel
				return;
			}

			IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();

			InputChannel current = inputChannels.get(partitionId);

			if (current instanceof UnknownInputChannel) {
				UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
				boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
				InputChannel newChannel;
				if (isLocal) {
					newChannel = unknownChannel.toLocalInputChannel();
				} else {
					RemoteInputChannel remoteInputChannel =
						unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
					remoteInputChannel.assignExclusiveSegments();
					newChannel = remoteInputChannel;
				}
				LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);

				inputChannels.put(partitionId, newChannel);
				channels[current.getChannelIndex()] = newChannel;

				if (requestedPartitionsFlag) {
					newChannel.requestSubpartition(consumedSubpartitionIndex);
				}

				for (TaskEvent event : pendingEvents) {
					newChannel.sendTaskEvent(event);
				}

				if (--numberOfUninitializedChannels == 0) {
					pendingEvents.clear();
				}
			}
		}
	}

4.2 PipelinedSubpartition寫出

kafka數據源Flink Kafka Consumer分析

記錄先寫到緩存ArrayDeque<BufferConsumer> buffers中,然后通過PipelinedSubpartitionView readView的notifyDataAvailable()  -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法來通知。

4.2.1 BufferAvailabilityListener創建時機?

①TaskManagerServices在創建ShuffleEnvironment時,通過 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 獲取Netty服務端的處理器PartitionRequestServerHandler:

	public ChannelHandler[] getServerChannelHandlers() {
		PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
		PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
			partitionProvider,
			taskEventPublisher,
			queueOfPartitionQueues);

		return new ChannelHandler[] {
			messageEncoder,
			new NettyMessage.NettyMessageDecoder(),
			serverHandler,
			queueOfPartitionQueues
		};
	}

②PartitionRequestServerHandler在獲取到客戶端發送的PartitionRequest 消息時, 創建CreditBasedSequenceNumberingViewReader,并通過 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 來設置CreditBasedSequenceNumberingViewReader

③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法調用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:

	void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
		// The notification might come from the same thread. For the initial writes this
		// might happen before the reader has set its reference to the view, because
		// creating the queue and the initial notification happen in the same method call.
		// This can be resolved by separating the creation of the view and allowing
		// notifications.

		// TODO This could potentially have a bad performance impact as in the
		// worst case (network consumes faster than the producer) each buffer
		// will trigger a separate event loop task being scheduled.
		ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
	}

感謝各位的閱讀,以上就是“kafka數據源Flink Kafka Consumer分析”的內容了,經過本文的學習后,相信大家對kafka數據源Flink Kafka Consumer分析這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

星座| 济阳县| 宜兴市| 伊春市| 武强县| 大姚县| 西宁市| 麻城市| 长丰县| 思南县| 垦利县| 敦煌市| 霍邱县| 辰溪县| 新乐市| 龙游县| 百色市| 无极县| 普兰店市| 巍山| 兴和县| 凌源市| 松阳县| 德化县| 连江县| 梨树县| 霍林郭勒市| 交口县| 海盐县| 巴彦淖尔市| 定州市| 罗甸县| 湘潭县| 城市| 田阳县| 鄂伦春自治旗| 安义县| 全州县| 乌海市| 湘阴县| 比如县|