温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spark Streaming怎么使用

发布时间:2021-12-16 15:26:17 来源:亿速云 阅读:188 作者:iii 栏目:云计算

这篇文章主要介绍“Spark Streaming怎么使用”,在日常操作中,相信很多人在Spark Streaming怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark Streaming怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,转过来对其内部的RDD操作。

Spark Streaming怎么使用

纵轴为空间维度:代表的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的。

横轴为时间维度:按照特定的时间间隔不断地生成job对象,并在集群上运行。

随着时间的推移,基于DStream Graph 不断生成RDD Graph ,也即DAG的方式生成job,并通过Job Scheduler的线程池的方式提交给spark cluster不断的执行。

由上可知,RDD    与  DStream的关系如下

RDD是物理级别的,而 DStream 是逻辑级别的

DStream是RDD的封装类,是RDD进一步的抽象

DStream 是RDD的模板。DStream要依赖RDD进行具体的数据计算

注意:纵轴维度需要RDD,DAG的生成模板,需要TimeLine的job控制器

横轴维度(时间维度)包含batch interval,窗口长度,窗口滑动时间等。

3,Spark Streaming源码解析

StreamingContext方法中调用JobScheduler的start方法

Spark Streaming怎么使用

JobGenerator的start方法中,调用startFirstTime方法,来开启定时生成Job的定时器

Spark Streaming怎么使用

startFirstTime方法,首先调用DStreamGraph的start方法,然后再调用RecurringTimer的start方法。

Spark Streaming怎么使用

timer对象为一个定时器,根据batchInterval时间间隔定期向EventLoop发送GenerateJobs的消息。

Spark Streaming怎么使用

接收到GenerateJobs消息后,会回调generateJobs方法。

Spark Streaming怎么使用

generateJobs方法再调用DStreamGraph的generateJobs方法生成Job

Spark Streaming怎么使用

DStreamGraph的generateJobs方法

Spark Streaming怎么使用

DStreamGraph的实例化是在StreamingContext中的

Spark Streaming怎么使用

DStreamGraph类中保存了输入流和输出流信息

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

回到JobGenerator的start方法中receiverTracker.start()

Spark Streaming怎么使用

Spark Streaming怎么使用

其中ReceiverTrackerEndpoint对象为一个消息循环体

Spark Streaming怎么使用

launchReceivers方法中发送StartAllReceivers消息

Spark Streaming怎么使用

接收到StartAllReceivers消息后,进行如下处理

Spark Streaming怎么使用

Spark Streaming怎么使用

StartReceiverFunc方法如下,实例化Receiver监控者,开启并等待退出

Spark Streaming怎么使用

supervisor的start方法中调用startReceiver方法

Spark Streaming怎么使用

Spark Streaming怎么使用

我们以socketTextStream为例,其启动的是SocketReceiver,内部开启一个线程,来接收数据。

Spark Streaming怎么使用

Spark Streaming怎么使用

内部调用supervisor的pushSingle方法,将数据聚集后存放在内存中

Spark Streaming怎么使用

supervisor的pushSingle方法如下,将数据放入到defaultBlockGenerator中,defaultBlockGenerator为BlockGenerator,保存Socket接收到的数据

Spark Streaming怎么使用

Spark Streaming怎么使用

BlockGenerator对象中有一个定时器,来更新当前的Buffer

Spark Streaming怎么使用

Spark Streaming怎么使用

BlockGenerator对象中有一个线程,来从阻塞队列中取出数据

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

调用ReceiverSupervisorImpl类中的继承BlockGeneratorListener的匿名类中的onPushBlock方法。

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

receivedBlockHandler对象如下

Spark Streaming怎么使用

这里我们讲解BlockManagerBasedBlockHandler的方式

Spark Streaming怎么使用

trackerEndpoint如下

Spark Streaming怎么使用

Spark Streaming怎么使用

其实是发送给ReceiverTrackerEndpoint类,

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

InputInfoTracker类的reportInfo方法只是对数据进行记录统计

Spark Streaming怎么使用

Spark Streaming怎么使用

Spark Streaming怎么使用

其generateJob方法是被DStreamGraph调用

Spark Streaming怎么使用

DStreamGraph的generateJobs方法是被JobGenerator类的generateJobs方法调用。

Spark Streaming怎么使用

JobGenerator类中有一个定时器,batchInterval发送GenerateJobs消息

Spark Streaming怎么使用

总结:

1,当调用StreamingContext的start方法时,启动了JobScheduler

2,当JobScheduler启动后会先后启动ReceiverTracker和JobGenerator

3,ReceiverTracker启动后会创建ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息

4,ReceiverTracker在启动时会给自己发送StartAllReceivers消息,自己接收到消息后,向Spark提交startReceiverFunc的Job

5,startReceiverFunc方法中在Executor上启动Receiver,并实例化ReceiverSupervisorImpl对象,来监控Receiver的运行

6,ReceiverSupervisorImpl对象会调用Receiver的onStart方法,我们以SocketReceiver为例,启动一个线程,连接Server,读取网络数据先调用ReceiverSupervisorImpl的pushSingle方法,

保存在BlockGenerator对象中,该对象内部有个定时器,放到阻塞队列blocksForPushing,等待内部线程取出数据放到BlockManager中,并发AddBlock消息给ReceiverTrackerEndpoint。

ReceiverTrackerEndpoint为ReceiverTracker的内部类,在接收到addBlock消息后将streamId对应的数据阻塞队列streamIdToUnallocatedBlockQueues中

7,JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器

8,接收到GenerateJobs消息会先后触发ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法

9,ReceiverTracker的allocateBlocksToBatch方法会调用getReceivedBlockQueue方法从阻塞队列streamIdToUnallocatedBlockQueues中根据streamId获取数据

10,DStreamGraph的generateJobs方法,继而调用变量名为outputStreams的DStream集合的generateJob方法

11,继而调用DStream的getOrCompute来调用具体的DStream的compute方法,我们以ReceiverInputDStream为例,compute方法是从ReceiverTracker中获取数据

到此,关于“Spark Streaming怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI