这篇文章主要讲解了“kafka数据源Flink Kafka Consumer分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“kafka数据源Flink Kafka Consumer分析”吧!
FlinkKafkaConsumer继承自RichFunction,具有生命周期方法open()。那么flink是何时调用FlinkKafkaConsumer的open()方法呢?
StreamTask在调用算子程序之前,会执行beforeInvoke()方法,在该方法中会初始化算子的算子并且执行open()方法:
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
initializeStateAndOpenOperators()方法中循环对算子初始化:
protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) {
StreamOperator<?> operator = operatorWrapper.getStreamOperator();
operator.initializeState(streamTaskStateInitializer);
operator.open();
}
}
kafka source对应的operator为StreamSource,其open()方法为
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
FunctionUtils的openFunction()即执行算子(要继承RichFunction)的open()方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
在 StreamTask.beforeInvoke() -> new OperatorChain() -> StreamOperatorFactoryUtil.createOperator(),在OperatorChain的构造函数中,通过工厂类StreamOperatorFactory来创建StreamOperator。kafka source对应的StreamOperatorFactory为SimpleOperatorFactory,createStreamOperator()方法中调用StreamOperator的setup()方法:
public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
if (operator instanceof AbstractStreamOperator) {
((AbstractStreamOperator) operator).setProcessingTimeService(processingTimeService);
}
if (operator instanceof SetupableStreamOperator) {
((SetupableStreamOperator) operator).setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
}
return (T) operator;
}
kafka source对应的StreamOperator为StreamSource,其实现了SetupableStreamOperator接口。其setup方法在父类AbstractUdfStreamOperator:
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}
FunctionUtils.setFunctionRuntimeContext()来给算子设置RuntimeContext。设置的RuntimeContext在AbstractStreamOperator的setup()方法中,为StreamingRuntimeContext:
this.runtimeContext = new StreamingRuntimeContext(
environment,
environment.getAccumulatorRegistry().getUserMap(),
getMetricGroup(),
getOperatorID(),
getProcessingTimeService(),
null,
environment.getExternalResourceInfoProvider());
Flink调用FlinkKafkaConsumer的run()方法来生产数据。run()方法的处理逻辑:
①创建KafkaFetcher,来拉取数据
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
watermarkStrategy,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),
useMetrics);
②KafkaFetcher的runFetchLoop()中创建KafkaConsumerThread线程来循环拉取kafka数据。KafkaConsumerThread通过KafkaConsumer拉取kafka数据,并交给Handover
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
handover.produce(records);
records = null;
}
KafkaFetcher通过Handover获取拉取的kafka数据
while (running) {
// this blocks until we get the next records
// it automatically re-throws exceptions encountered in the consumer thread
final ConsumerRecords<byte[], byte[]> records = handover.pollNext();
// get the records for each topic partition
for (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {
List<ConsumerRecord<byte[], byte[]>> partitionRecords =
records.records(partition.getKafkaPartitionHandle());
partitionConsumerRecordsHandler(partitionRecords, partition);
}
}
③通过SourceContext中的Output<StreamRecord<T>>来发送数据给下一个算子
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
SourceContext在StreamSource的run()方法中通过StreamSourceContexts.getSourceContext()创建。Output<StreamRecord<T>>在OperatorChain的createOutputCollector()创建,为其返回值。
for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
@SuppressWarnings("unchecked")
RecordWriterOutput<T> output = (RecordWriterOutput<T>) streamOutputs.get(outputEdge);
allOutputs.add(new Tuple2<>(output, outputEdge));
}
当有一个输出时,是RecordWriterOutput;多个时,是CopyingDirectedOutput或DirectedOutput
④单个输出RecordWriterOutput时,是通过成员属性RecordWriter实例来输出。RecordWriter通过StreamTask的createRecordWriterDelegate()创建,RecordWriterDelegate为RecordWriter的代理类,内部持有RecordWriter实例:
public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(
StreamConfig configuration,
Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWrites = createRecordWriters(
configuration,
environment);
if (recordWrites.size() == 1) {
return new SingleRecordWriter<>(recordWrites.get(0));
} else if (recordWrites.size() == 0) {
return new NonRecordWriter<>();
} else {
return new MultipleRecordWriters<>(recordWrites);
}
}
private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(
StreamConfig configuration,
Environment environment) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<>();
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge edge = outEdgesInOrder.get(i);
recordWriters.add(
createRecordWriter(
edge,
i,
environment,
environment.getTaskInfo().getTaskName(),
edge.getBufferTimeout()));
}
return recordWriters;
}
outEdgesInOrder来源于StreamGraph中的StreamNode的List<StreamEdge> outEdges。
创建RecordWriter时,根据StreamEdge的StreamPartitioner<?> outputPartitioner的isBroadcast()方法判断是BroadcastRecordWriter还是ChannelSelectorRecordWriter:
public RecordWriter<T> build(ResultPartitionWriter writer) {
if (selector.isBroadcast()) {
return new BroadcastRecordWriter<>(writer, timeout, taskName);
} else {
return new ChannelSelectorRecordWriter<>(writer, selector, timeout, taskName);
}
}
outputPartitioner是根据上下游节点并行度是否一致来确定:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
BroadcastRecordWriter和ChannelSelectorRecordWriter最终都会调用成员属性ResultPartitionWriter targetPartition的flush()方法来输出数据。ResultPartitionWriter 在ConsumableNotifyingResultPartitionWriterDecorator的decorate()生成。根据对应的ResultPartitionDeploymentDescriptor来判断是ConsumableNotifyingResultPartitionWriterDecorator还是直接传入的partitionWriters。ConsumableNotifyingResultPartitionWriterDecorator会把消息直接传给下个节点消费,通过ResultPartitionConsumableNotifier来通知:
public static ResultPartitionWriter[] decorate(
Collection<ResultPartitionDeploymentDescriptor> descs,
ResultPartitionWriter[] partitionWriters,
TaskActions taskActions,
JobID jobId,
ResultPartitionConsumableNotifier notifier) {
ResultPartitionWriter[] consumableNotifyingPartitionWriters = new ResultPartitionWriter[partitionWriters.length];
int counter = 0;
for (ResultPartitionDeploymentDescriptor desc : descs) {
if (desc.sendScheduleOrUpdateConsumersMessage() && desc.getPartitionType().isPipelined()) {
consumableNotifyingPartitionWriters[counter] = new ConsumableNotifyingResultPartitionWriterDecorator(
taskActions,
jobId,
partitionWriters[counter],
notifier);
} else {
consumableNotifyingPartitionWriters[counter] = partitionWriters[counter];
}
counter++;
}
return consumableNotifyingPartitionWriters;
}
partitionWriters通过 NettyShuffleEnvironment的createResultPartitionWriters() -> ResultPartitionFactory的create() 创建。 ResultPartition的输出是通过成员属性ResultSubpartition[] subpartitions完成。subpartitions在ResultPartitionFactory的createSubpartitions()生成:
private void createSubpartitions(
ResultPartition partition,
ResultPartitionType type,
BoundedBlockingSubpartitionType blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
// Create the subpartitions.
if (type.isBlocking()) {
initializeBoundedBlockingPartitions(
subpartitions,
partition,
blockingSubpartitionType,
networkBufferSize,
channelManager);
} else {
for (int i = 0; i < subpartitions.length; i++) {
subpartitions[i] = new PipelinedSubpartition(i, partition);
}
}
}
流式任务时,ResultSubpartition为PipelinedSubpartition。
ResultPartitionConsumableNotifier在TaskExecutor的associateWithJobManager()中生成:
private JobTable.Connection associateWithJobManager(
JobTable.Job job,
ResourceID resourceID,
JobMasterGateway jobMasterGateway) {
......
......
ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
jobMasterGateway,
getRpcService().getExecutor(),
taskManagerConfiguration.getTimeout());
......
......
}
RpcResultPartitionConsumableNotifier远程调用JobMaster的scheduleOrUpdateConsumers()方法,传入ResultPartitionID partitionId
JobMaster通过ExecutionGraph的scheduleOrUpdateConsumers()通知下游消费算子。
这里有两个关键代码:
①从本算子ExecutionVertex的成员Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions中取出该分区对应的生产消费信息,这些信息存储在IntermediateResultPartition中;
void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
.......
final IntermediateResultPartition partition = resultPartitions.get(partitionId.getPartitionId());
.......
if (partition.getIntermediateResult().getResultType().isPipelined()) {
// Schedule or update receivers of this partition
execution.scheduleOrUpdateConsumers(partition.getConsumers());
}
else {
throw new IllegalArgumentException("ScheduleOrUpdateConsumers msg is only valid for" +
"pipelined partitions.");
}
}
从IntermediateResultPartition取出消费者List<List<ExecutionEdge>> allConsumers;
从ExecutionEdge的ExecutionVertex target的Execution currentExecution中取出执行任务;
②Execution的sendUpdatePartitionInfoRpcCall()方法通过rpc调用TaskExcutor的updatePartitions()方法来执行下游消费者算子
private void sendUpdatePartitionInfoRpcCall(
final Iterable<PartitionInfo> partitionInfos) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final TaskManagerLocation taskManagerLocation = slot.getTaskManagerLocation();
CompletableFuture<Acknowledge> updatePartitionsResultFuture = taskManagerGateway.updatePartitions(attemptId, partitionInfos, rpcTimeout);
updatePartitionsResultFuture.whenCompleteAsync(
(ack, failure) -> {
// fail if there was a failure
if (failure != null) {
fail(new IllegalStateException("Update to task [" + getVertexWithAttempt() +
"] on TaskManager " + taskManagerLocation + " failed", failure));
}
}, getVertex().getExecutionGraph().getJobMasterMainThreadExecutor());
}
}
TaskExecutor的updatePartitions()来更新分区信息。如果之前InputChannel是未知的,则进行更新。SimpleInputGate的updateInputChannel():
public void updateInputChannel(
ResourceID localLocation,
NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
synchronized (requestLock) {
if (closeFuture.isDone()) {
// There was a race with a task failure/cancel
return;
}
IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();
InputChannel current = inputChannels.get(partitionId);
if (current instanceof UnknownInputChannel) {
UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
InputChannel newChannel;
if (isLocal) {
newChannel = unknownChannel.toLocalInputChannel();
} else {
RemoteInputChannel remoteInputChannel =
unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId());
remoteInputChannel.assignExclusiveSegments();
newChannel = remoteInputChannel;
}
LOG.debug("{}: Updated unknown input channel to {}.", owningTaskName, newChannel);
inputChannels.put(partitionId, newChannel);
channels[current.getChannelIndex()] = newChannel;
if (requestedPartitionsFlag) {
newChannel.requestSubpartition(consumedSubpartitionIndex);
}
for (TaskEvent event : pendingEvents) {
newChannel.sendTaskEvent(event);
}
if (--numberOfUninitializedChannels == 0) {
pendingEvents.clear();
}
}
}
}
记录先写到缓存ArrayDeque<BufferConsumer> buffers中,然后通过PipelinedSubpartitionView readView的notifyDataAvailable() -> BufferAvailabilityListener availabilityListener的notifyDataAvailable() 方法来通知。
①TaskManagerServices在创建ShuffleEnvironment时,通过 NettyShuffleServiceFactory的createNettyShuffleEnvironment() -> new NettyConnectionManager() -> new NettyServer() -> ServerChannelInitializer的initChannel() -> NettyProtocol的getServerChannelHandlers() 获取Netty服务端的处理器PartitionRequestServerHandler:
public ChannelHandler[] getServerChannelHandlers() {
PartitionRequestQueue queueOfPartitionQueues = new PartitionRequestQueue();
PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler(
partitionProvider,
taskEventPublisher,
queueOfPartitionQueues);
return new ChannelHandler[] {
messageEncoder,
new NettyMessage.NettyMessageDecoder(),
serverHandler,
queueOfPartitionQueues
};
}
②PartitionRequestServerHandler在获取到客户端发送的PartitionRequest 消息时, 创建CreditBasedSequenceNumberingViewReader,并通过 requestSubpartitionView() -> ResultPartitionManager的createSubpartitionView() -> ResultPartition的createSubpartitionView() 来设置CreditBasedSequenceNumberingViewReader
③CreditBasedSequenceNumberingViewReader的notifyDataAvailable()方法调用PartitionRequestQueue的notifyReaderNonEmpty(),通知下游算子:
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
// The notification might come from the same thread. For the initial writes this
// might happen before the reader has set its reference to the view, because
// creating the queue and the initial notification happen in the same method call.
// This can be resolved by separating the creation of the view and allowing
// notifications.
// TODO This could potentially have a bad performance impact as in the
// worst case (network consumes faster than the producer) each buffer
// will trigger a separate event loop task being scheduled.
ctx.executor().execute(() -> ctx.pipeline().fireUserEventTriggered(reader));
}
感谢各位的阅读,以上就是“kafka数据源Flink Kafka Consumer分析”的内容了,经过本文的学习后,相信大家对kafka数据源Flink Kafka Consumer分析这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/catcloud/blog/5010529