温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

(版本定制)第15课:Spark Streaming源码解读之No Receivers彻底思考

发布时间:2020-07-19 11:38:42 阅读:867 作者:Spark_2016 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

hu本期内容:

    1、Kafka解密

背景: 
目前No Receivers在企业中使用的越来越多,No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。

所以Spark Streaming就产生了自定义RDD –> KafkaRDD.

源码分析:

1、KafkaRDD源码

private[kafka]
class KafkaRDD[
KClassTag,
VClassTag,
U <: Decoder[_]: ClassTag,
T <: Decoder[_]: ClassTag,
RClassTagprivate[spark] (
    scSparkContext,
kafkaParamsMap[StringString],
val offsetRangesArray[OffsetRange], //指定数据范围
leadersMap[TopicAndPartition, (StringInt)],
messageHandlerMessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nilwith Logging with HasOffsetRanges {
override def getPartitionsArray[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
    }.toArray
  }

2、HasOffsetRanges

/**
 * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
 * offset ranges in RDDs generated by the direct Kafka DStream (see
 * [[KafkaUtils.createDirectStream()]]).
 * {{{
*   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
 *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 *      ...
 *   }
 * }}}
*/
trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}

3、KafkaRDD中的compute

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.emptyelse {
new KafkaRDDIterator(part, context)
  }
}

SparkStreaming一般使用KafkaUtils的createDirectStream读取数据

def createDirectStream[
KClassTag,
VClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
    sscStreamingContext,
kafkaParamsMap[StringString],
topicsSet[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
new DirectKafkaInputDStream[K, V, KDVD, (K, V)](
    ssc, kafkaParams, fromOffsets, messageHandler)
}

4、通过getFromOffsets的方法获取topic的fromOffset值

[kafka] (
    kc: KafkaClusterkafkaParams: []topics: []
  ): [TopicAndPartition] = {
reset = kafkaParams.get().map(_.toLowerCase)
result = {
    topicPartitions <- kc.getPartitions(topics).right
    leaderOffsets <- ((reset == ()) {
      kc.getEarliestLeaderOffsets(topicPartitions)
    } {
      kc.getLatestLeaderOffsets(topicPartitions)
    }).right
  } {
    leaderOffsets.map { (tplo) =>
        (tplo.offset)
    }
  }
  KafkaCluster.(result)
}

createDirectStream其实生成的是DirectKafkaInputDStream对象,通过compute方法会产生KafkaRDD

(validTime: Time): Option[KafkaRDD[]] = {
untilOffsets = clamp(latestLeaderOffsets())
rdd = [](
    context.sparkContextkafkaParamsuntilOffsetsmessageHandler)

offsetRanges = .map { (tpfo) =>
uo = untilOffsets(tp)
(tp.topictp.partitionfouo.offset)
  }
description = offsetRanges.filter { offsetRange =>
offsetRange.fromOffset != offsetRange.untilOffset
  }.map { offsetRange =>
{offsetRange.topic}{offsetRange.partition}+
{offsetRange.fromOffset}{offsetRange.untilOffset}}.mkString()
metadata = (
-> offsetRanges.toListStreamInputInfo.-> description)
inputInfo = (rdd.countmetadata)
  ssc...reportInfo(validTimeinputInfo)

= untilOffsets.map(kv => kv._1 -> kv._2.offset)
(rdd)
}

采用Direct的好处? 
1. Direct方式没有数据缓存,因此不会出现内存溢出,但是如果采用Receiver的话就需要缓存。 
2. 如果采用Receiver的方式,不方便做分布式,而Direct方式默认数据就在多台机器上。 
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。 
4. 语义一致性,Direct的方式数据一定会被执行。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI

开发者交流群×