Direct Access
Kafka
前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读。但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势:
1. 更强的控制自由度
2. 语义一致性
object DirectKafkaWordCount { def main(args: Array[String]) { val Array(brokers, topics) = args // Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print
val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print() // Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** * A batch-oriented interface for consuming from Kafka. * Starting and ending offsets are specified in advance, * so that you can control exactly-once semantics. * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set * with Kafka broker(s) specified in host1:port1,host2:port2 form. * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD * @param messageHandler function for translating each message into the desired type */private[kafka]class KafkaRDD[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag] private[spark] ( sc: SparkContext, kafkaParams: Map[String, String], val offsetRanges: Array[OffsetRange], leaders: Map[TopicAndPartition, (String, Int)], messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
... override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}") Iterator.empty
} else { new KafkaRDDIterator(part, context)
}
}
private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends NextIterator[R] { val kc = new KafkaCluster(kafkaParams)
... private def fetchBatch: Iterator[MessageAndOffset] = { val req = new FetchRequestBuilder()
.addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
.build() val resp = consumer.fetch(req)
handleFetchErr(resp) // kafka may return a batch that starts before the requested offset
resp.messageSet(part.topic, part.partition)
.iterator
.dropWhile(_.offset < requestOffset)
} override def close(): Unit = { if (consumer != null) {
consumer.close()
}
} override def getNext(): R = { if (iter == null || !iter.hasNext) {
iter = fetchBatch
} if (!iter.hasNext) {
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else { val item = iter.next() if (item.offset >= part.untilOffset) {
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
}
}
其中会调用KafkaCluster的connect方法:
org/apache/spark/streaming/kafka/KafkaCluster.scala def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R
): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]
ssc, kafkaParams, fromOffsets, cleanedHandler)
}
org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map { case (tp, fo) =>
val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset)
} val description = offsetRanges.filter { offsetRange =>
// Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd)
}
我们再重新思考有Receiver和No Receiver的Spark Streaming应用 Direct访问的好处:
1. 不需要缓存,不会出现OOM等问题(数据缓存在Kafka中)
2. 如果采用Receiver的方式,Receiver和Worker上Executor绑定了,不方便做分布式(配置一下也可以做)。如果采用Direct的方式,直接是RDD操作,数据默认分布在多个Executor上,天然就是分布式的。
3. 数据消费的问题,在实际操作的时候,如果采用Receiver的方式,如果数据操作来不及消费,Delay多次之后,Spark Streaming程序有可能崩溃。如果是Direct的方式,就不会。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。