温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

结构化Kafka sql的代码框架是怎样的

发布时间:2021-12-15 09:53:54 来源:亿速云 阅读:147 作者:柒染 栏目:云计算

本篇文章给大家分享的是有关结构化Kafka sql的代码框架是怎样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

结构化流的典型应用是持续的读取kafka流。实现机制从SparkSession的readStream开始,readStream就是DataStreamReader:

def readStream: DataStreamReader = new DataStreamReader(self)

下面从DataStreamReader开始。可以想象得到,最终肯定是生成一个RDD来持续读取kafka流数据。

例子:

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

分两步:找到TableProvider;找到SupportRead然后生成StreamingRelationV2。

最后用StreamingRelationV2来调用Dataset.ofRows返回DataFrame,DataFrame就是Dataset[Row]。

下面首先要看看TableProvider接口和SupportRead接口是啥东东。

TableProvider

TableProvider接口未找到在哪里定义。

KafkaSourceRDD

先看看kafkaSourceRDD这个类,这是基础类,最基础的来读取kafka数据的RDD,入参包含一个offsetRange,表示读取kafka数据的区间范围。如果是Kafka.lastest则可以表示永久读取kafka。

既然是RDD,那么最重要的方法就是compute方法了,代码不解析了很简单,就是用Kafka的API来读取kafka分区的数据,形成RDD。

KafkaSource

KafkaSource顾名思义就是Kafka的读取者。

KafkaSource的父类是Source,最重要的方法是:getOffset和getBatch。

getBatch返回DataFrame,那么getBatch又是怎么返回DataFrame的呢?看代码就知道原来是通过创建KafkaSourceRDD来达到生成DataFrame的目的的。所以可以认为KafkaSource是KafkaSourceRDD的一种封装形式罢了。

KafkaSourceProvider

The provider class for all Kafka readers and writers。这个类是用来生成各种各样的Kafka的读取者和写入者的,比较重要,先看看这个类的定义:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

    with StreamSourceProvider

    with StreamSinkProvider

    with RelationProvider

    with CreatableRelationProvider

    with TableProvider

    with Logging 

继承了很多的特性或接口。比如:StreamSourceProvider、TableProvider、RelationProvider等等。我们这里就看看和读相关的特性吧,和写相关的不看了(道理差不多)。

(1)createSource

createSource方法返回Source,看代码其实返回的是KafkaSource,KafkaSource前面已经说过了,这里就不涉及了。

(2)createRelation

createRelation返回BaseRelation,实际返回的是KafkaRelation。

KafkaRelation继承BaseRelation,重写父 类的buildScan方法,buildScan方法返回KafkaSourceRDD作为RDD[Row]。

(3)KafkaTable

KafkaTable继承Table并且继承SupportsRead特性,其定义:

class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite 

里面辗转反侧看看如何生成ContinuousStream,主要是方法toContinuousStream,返回的ContinuousStream就是KafkaContinuousStream。

(4)KafkaContinuousStream

KafkaContinuousStream继承自ContinuousStream,具体的看代码,最后反正都是调用了Kafka的API来读取数据,所不同的只是外部表现形式的不同罢了。

以上就是结构化Kafka sql的代码框架是怎样的,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI