今天小编给大家分享一下kafka生产者发送消息流程是什么的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。
消息的发送可能会经过拦截器、序列化、分区器等过程。消息发送的主要涉及两个线程,分别为main线程和sender线程。
如图所示,主线程由 afkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器RecordAccumulator (也称为消息收集器)中。 Sender 线程负责从RecordAccumulator 获取消息并将其发送到 Kafka中。
在消息序列化之前会经过消息拦截器,自定义拦截器需要实现ProducerInterceptor接口,接口主要有两个方案#onSend和#onAcknowledgement,在消息发送之前会调用前者方法,可以在发送之前假如处理逻辑,比如计费。在收到服务端ack响应后会触发后者方法。需要注意的是拦截器中不要加入过多的复杂业务逻辑,以免影响发送效率。
消息ProducerRecord会将消息路由到那个分区中,分两种情况:
1.指定了partition字段
如果消息ProducerRecord中指定了 partition字段,那么就不需要走分区器,直接发往指定得partition分区中。
2.没有指定partition,但自定义了分区器
3.没指定parittion,也没有自定义分区器,但key不为空
4.没指定parittion,也没有自定义分区器,key也为空
看源码
// KafkaProducer#partition private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { //指定分区partition则直接返回,否则走分区器 Integer partition = record.partition(); return partition != null ? partition : partitioner.partition( record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }
//DefaultPartitioner#partition public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { if (keyBytes == null) { return stickyPartitionCache.partition(topic, cluster); } List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; }
partition 方法中定义了分区分配逻辑 如果 ke 不为 null , 那 么默认的分区器会对 key 进行哈 希(采 MurmurHash3 算法 ,具备高运算性能及 低碰 撞率),最终根据得到 哈希值来 算分区号, 有相同 key 的消息会被写入同一个分区 如果 key null ,那么消息将会以轮询的方式发往主题内的各个可用分区。
分区确定好了之后,消息并不是直接发送给broker,因为一个个发送网络消耗太大,而是先缓存到消息累加器RecordAccumulator,RecordAccumulator主要用来缓存消息 Sender 线程可以批量发送,进 减少网络传输 的资源消耗以提升性能 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer memory 配置,默认值为 33554432B ,即 32MB如果生产者发送消息的速度超过发 送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer的send()方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max block ms 的配置,此参数的默认值为 60秒。
消息累加器本质上是个ConcurrentMap,
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
//KafkaProducer @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions //首先执行拦截器链 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { //key序列化 serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { //value序列化 serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } //获取分区partition int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); //消息压缩 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); //判断消息是否超过最大允许大小,消息缓存空间是否已满 ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } //将消息缓存在消息累加器RecordAccumulator中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); //开辟新的ProducerBatch if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); //判断消息是否已满,唤醒sender线程进行发送消息 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }
消息发送到broker,什么情况下生产者才确定消息写入成功了呢?ack是生产者一个重要的参数,它有三个值,ack=1表示leader副本写入成功服务端即可返回给生产者,是吞吐量和消息可靠性的平衡方案;ack=0表示生产者发送消息之后不需要等服务端响应,这种消息丢失风险最大;ack=-1表示生产者需要等等ISR中所有副本写入成功后才能收到响应,这种消息可靠性最高但吞吐量也是最小的。
以上就是“kafka生产者发送消息流程是什么”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。