这篇文章主要讲解了“kafka数据源Flink Kafka Consumer分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“kafka数据源Flink Kafka Consumer分析”吧!
FlinkKafkaConsumer继承自RichFunction,具有生命周期方法open()。那么flink是何时调用FlinkKafkaConsumer的open()方法呢?
StreamTask在调用算子程序之前,会执行beforeInvoke()方法,在该方法中会初始化算子的算子并且执行open()方法:
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
initializeStateAndOpenOperators()方法中循环对算子初始化:
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }
kafka source对应的operator为StreamSource,其open()方法为
public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); }
FunctionUtils的openFunction()即执行算子(要继承RichFunction)的open()方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{ if (function instanceof RichFunction) { RichFunction richFunction = (RichFunction) function; richFunction.open(parameters); } }
在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的构造函数中,通过工厂类StreamOperatorFactory来创建StreamOperator。kafka source对应的StreamOperatorFactory为SimpleOperatorFactory,createStreamOperator()方法中调用StreamOperator的setup()方法:
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) { if (operator instanceof AbstractStreamOperator) { ((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService); } if (operator instanceof SetupableStreamOperator) { ((SetupableStreamOperator) operator).setup( parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } return (T) operator; }
kafka source对应的StreamOperator为StreamSource,其实现了SetupableStreamOperator接口。其setup方法在父类AbstractUdfStreamOperator:
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) { super.setup(containingTask, config, output); FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext()); }
FunctionUtils.setFunctionRuntimeContext()来给算子设置RuntimeContext。设置的RuntimeContext在AbstractStreamOperator的setup()方法中,为StreamingRuntimeContext:
this.runtimeContext = new StreamingRuntimeContext( environment, environment.getAccumulatorRegistry().getUserMap(), getMetricGroup(), getOperatorID(), getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
Flink调用FlinkKafkaConsumer的run()方法来生产数据。run()方法的处理逻辑:
①创建KafkaFetcher,来拉取数据
this.kafkaFetcher = createFetcher( sourceContext, subscribedPartitionsToStartOffsets, watermarkStrategy, (StreamingRuntimeContext) getRuntimeContext(), offsetCommitMode, getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), useMetrics);
②KafkaFetcher的runFetchLoop()中创建KafkaConsumerThread线程来循环拉取kafka数据。KafkaConsumerThread通过KafkaConsumer拉取kafka数据,并交给Handover
if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { handover.produce(records); records = null; }
KafkaFetcher通过Handover获取拉取的kafka数据
while (running) { // this blocks until we get the next records // it automatically re-throws exceptions encountered in the consumer thread final ConsumerRecords<byte[], byte[]> records = handover.pollNext(); // get the records for each topic partition for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) { List<ConsumerRecord<byte[], byte[]>> partitionRecords = records.records(partition.getKafkaPartitionHandle()); partitionConsumerRecordsHandler(partitionRecords, partition); } }
③通过SourceContext中的Output<StreamRecord<T>>来发送数据给下一个算子
public void collect(T element) { synchronized (lock) { output.collect(reuse.replace(element)); } }
SourceContext在StreamSource的run()方法中通过StreamSourceContexts.getSourceContext()创建。Output<StreamRecord<T>>在OperatorChain的createOutputCollector()创建,为其返回值。
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) { @SuppressWarnings("unchecked") RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge); allOutputs.add(new Tuple2<>(output, outputEdge)); }
当有一个输出时,是RecordWriterOutput;多个时,是CopyingDirectedOutput或DirectedOutput
④单个输出RecordWriterOutput时,是通过成员属性RecordWriter实例来输出。RecordWriter通过StreamTask的createRecordWriterDelegate()创建,RecordWriterDelegate为RecordWriter的代理类,内部持有RecordWriter实例:
public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate( StreamConfig configuration, Environment environment) { List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = createRecordWriters( configuration, environment); if (recordWrites.size() == 1) { return new SingleRecordWriter<>(recordWrites.get(0)); } else if (recordWrites.size() == 0) { return new NonRecordWriter<>(); } else { return new MultipleRecordWriters<>(recordWrites); } } private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters( StreamConfig configuration, Environment environment) { List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>(); List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader()); for (int i = 0; i < outEdgesInOrder.size(); i++) { StreamEdge edge = outEdgesInOrder.get(i); recordWriters.add( createRecordWriter( edge, i, environment, environment.getTaskInfo().getTaskName(), edge.getBufferTimeout())); } return recordWriters; }
outEdgesInOrder来源于StreamGraph中的StreamNode的List<StreamEdge> outEdges。
创建RecordWriter时,根据StreamEdge的StreamPartitioner<?> outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter:
public RecordWriter<T> build(ResultPartitionWriter writer) { if (selector.isBroadcast()) { return new BroadcastRecordWriter<>(writer, timeout, taskName); } else { return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName); } }
outputPartitioner是根据上下游节点并行度是否一致来确定:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = new ForwardPartitioner<Object>(); } else if (partitioner == null) { partitioner = new RebalancePartitioner<Object>(); }
BroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知:
public static ResultPartitionWriter[] decorate( Collection<ResultPartitionDeploymentDescriptor> descs, ResultPartitionWriter[] partitionWriters, TaskActions taskActions, JobID jobId, ResultPartitionConsumableNotifier notifier) { ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length]; int counter = 0; for (ResultPartitionDeploymentDescriptor desc : descs) { if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) { consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator( taskActions, jobId, partitionWriters[counter], notifier); } else { consumableNotifyingPartitionWriters[counter] = partitionWriters[counter]; } counter++; } return consumableNotifyingPartitionWriters; }
partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 创建。 ResultPartition的输出是通过成员属性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:
private void createSubpartitions( ResultPartition partition, ResultPartitionType type, BoundedBlockingSubpartitionType blockingSubpartitionType, ResultSubpartition[] subpartitions) { // Create the subpartitions. if (type.isBlocking()) { initializeBoundedBlockingPartitions( subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager); } else { for (int i = 0; i < subpartitions.length; i++) { subpartitions[i] = new PipelinedSubpartition(i, partition); } } }
流式任务时,ResultSubpartition为PipelinedSubpartition。
ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:
private JobTable.Connection associateWithJobManager( JobTable.Job job, ResourceID resourceID, JobMasterGateway jobMasterGateway) { ...... ...... ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( jobMasterGateway, getRpcService().getExecutor(), taskManagerConfiguration.getTimeout()); ...... ...... }
RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId
JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。
这里有两个关键代码:
①从本算子ExecutionVertex的成员Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions中取出该分区对应的生产消费信息,这些信息存储在IntermediateResultPartition中;
void scheduleOrUpdateConsumers(ResultPartitionID partitionId) { ....... final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId()); ....... if (partition.getIntermediateResult().getResultType().isPipelined()) { // Schedule or update receivers of this partition execution.scheduleOrUpdateConsumers(partition.getConsumers()); } else { throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" + "pipelined partitions."); } }
从IntermediateResultPartition取出消费者List<List<ExecutionEdge>> allConsumers;
从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务;
②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子
private void sendUpdatePartitionInfoRpcCall( final Iterable<PartitionInfo> partitionInfos) { final LogicalSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation(); CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout); updatePartitionsResultFuture.whenCompleteAsync( (ack, failure) -> { // fail if there was a failure if (failure != null) { fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() + "] on TaskManager " + taskManagerLocation + " failed", failure)); } }, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor()); } }
TaskExecutor的updatePartitions()来更新分区信息。如果之前InputChannel是未知的,则进行更新。SimpleInputGate的updateInputChannel():
public void updateInputChannel( ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException { synchronized (requestLock) { if (closeFuture.isDone()) { // There was a race with a task failure/cancel return; } IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId(); InputChannel current = inputChannels.get(partitionId); if (current instanceof UnknownInputChannel) { UnknownInputChannel unknownChannel = (UnknownInputChannel) current; boolean isLocal = shuffleDescriptor.isLocalTo(localLocation); InputChannel newChannel; if (isLocal) { newChannel = unknownChannel.toLocalInputChannel(); } else { RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId()); remoteInputChannel.assignExclusiveSegments(); newChannel = remoteInputChannel; } LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel); inputChannels.put(partitionId, newChannel); channels[current.getChannelIndex()] = newChannel; if (requestedPartitionsFlag) { newChannel.requestSubpartition(consumedSubpartitionIndex); } for (TaskEvent event : pendingEvents) { newChannel.sendTaskEvent(event); } if (--numberOfUninitializedChannels == 0) { pendingEvents.clear(); } } } }
记录先写到缓存ArrayDeque<BufferConsumer> buffers中,然后通过PipelinedSubpartitionView readView的notifyDataAvailable() -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法来通知。
①TaskManagerServices在创建ShuffleEnvironment时,通过 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 获取Netty服务端的处理器PartitionRequestServerHandler:
public ChannelHandler[] getServerChannelHandlers() { PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue(); PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler( partitionProvider, taskEventPublisher, queueOfPartitionQueues); return new ChannelHandler[] { messageEncoder, new NettyMessage.NettyMessageDecoder(), serverHandler, queueOfPartitionQueues }; }
②PartitionRequestServerHandler在获取到客户端发送的PartitionRequest 消息时, 创建CreditBasedSequenceNumberingViewReader,并通过 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 来设置CreditBasedSequenceNumberingViewReader
③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法调用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) { // The notification might come from the same thread. For the initial writes this // might happen before the reader has set its reference to the view, because // creating the queue and the initial notification happen in the same method call. // This can be resolved by separating the creation of the view and allowing // notifications. // TODO This could potentially have a bad performance impact as in the // worst case (network consumes faster than the producer) each buffer // will trigger a separate event loop task being scheduled. ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader)); }
感谢各位的阅读,以上就是“kafka数据源Flink Kafka Consumer分析”的内容了,经过本文的学习后,相信大家对kafka数据源Flink Kafka Consumer分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。