这篇文章主要介绍“Dstream的创建方法”,在日常操作中,相信很多人在Dstream的创建方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Dstream的创建方法”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
案例
object SparkStreaming02_RDDQueue {
def main(args: Array[String]): Unit = {
//创建配置文件对象
val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")
//创建SparkStreaming上下文环境对象
val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
//创建队列,里面放的是RDD
val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()
//从队列中采集数据,获取DS
val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false)
//处理采集到的数据
val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_)
//打印结果
resDS.print()
//启动采集器
ssc.start()
//循环创建RDD,并将创建的RDD放到队列里
for( i <- 1 to 5){
rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10))
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
用一个案例来说明
/**
* Author: Felix
* Date: 2020/5/20
* Desc: 通过自定义数据源方式创建DStream
* 模拟从指定的网络端口获取数据
*/
object SparkStreaming03_CustomerReceiver {
def main(args: Array[String]): Unit = {
//创建配置文件对象
val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")
//创建SparkStreaming上下文环境对象
val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))
//通过自定义数据源创建Dstream
val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999))
//扁平化
val flatMapDS: DStream[String] = myDS.flatMap(_.split(" "))
//结构转换 进行计数
val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))
//聚合
val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)
//打印输出
reduceDS.print
ssc.start()
ssc.awaitTermination()
}
}
//Receiver[T] 泛型表示的是 读取的数据类型
class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var socket: Socket = _
// 真正的处理接收数据的逻辑
def receive() {
try {
//创建连接
socket = new Socket(host,port)
//根据连接对象获取输入流
val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))
//定义一个变量,用于接收读取到的一行数据
var input:String = null
while((input = reader.readLine())!= null){
store(input)
}
} catch {
case e: ConnectException =>
restart(s"Error connecting to $host:$port", e)
return
} finally {
onStop()
}
}
override def onStart(): Unit = {
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
override def onStop(): Unit = {
synchronized {
if (socket != null) {
socket.close()
socket = null
}
}
}
}
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
编写代码 0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看
object Spark04_ReceiverAPI {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.使用ReceiverAPI读取Kafka数据创建DStream
val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,
"hadoop202:2181,hadoop203:2181,hadoop204:2181",
"bigdata",
//v表示的主题的分区数
Map("mybak" -> 2))
//4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]())
val lineDStream: DStream[String] = kafkaDStream.map(_._2)
val word: DStream[String] = lineDStream.flatMap(_.split(" "))
val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
wordToCountDStream.print()
//5.开启任务
ssc.start()
ssc.awaitTermination()
}
}
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
编写代码(自动维护offset1)
offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息
object Spark05_DirectAPI_Auto01 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
//3.准备Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
Set("mybak"))
//5.计算WordCount并打印
kafkaDStream.map(_._2)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.开启任务
ssc.start()
ssc.awaitTermination()
}
}
编写代码(自动维护offset2)
offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate
这种方式缺点:
checkpoint小文件过多
checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次
object Spark06_DirectAPI_Auto02 {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)
ssc.start()
ssc.awaitTermination()
}
def getStreamingContext: StreamingContext = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")
//3.准备Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream
val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams,
Set("mybak"))
//5.计算WordCount并打印
kafkaDStream.map(_._2)
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.返回结果
ssc
}
}
编写代码(手动维护offset)
object Spark07_DirectAPI_Handler {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//3.创建Kafka参数
val kafkaParams: Map[String, String] = Map[String, String](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"
)
//4.获取上一次消费的位置信息
val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](
TopicAndPartition("mybak", 0) -> 13L,
TopicAndPartition("mybak", 1) -> 10L
)
//5.使用DirectAPI手动维护offset的方式消费数据
val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](
ssc,
kafkaParams,
fromOffsets,
(m: MessageAndMetadata[String, String]) => m.message())
//6.定义空集合用于存放数据的offset
var offsetRanges = Array.empty[OffsetRange]
//7.将当前消费到的offset进行保存
kafakDStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.fromOffset}-${o.untilOffset}")
}
}
//8.开启任务
ssc.start()
ssc.awaitTermination()
}
}
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
导入依赖,为了避免和0-8冲突,我们新建一个module演示
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
3)编写代码
object Spark01_DirectAPI010 {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")
//2.创建StreamingContext
val ssc = new StreamingContext(conf, Seconds(3))
//3.构建Kafka参数
val kafkaParmas: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)
//4.消费Kafka数据创建流
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))
//5.计算WordCount并打印
kafkaDStream.map(_.value())
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.print()
//6.启动任务
ssc.start()
ssc.awaitTermination()
}
}
0-8 ReceiverAPI:
1)专门的Executor读取数据,速度不统一
2)跨机器传输数据,WAL
3)Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union
4)offset存储在Zookeeper中
0-8 DirectAPI:
1)Executor读取数据并计算
2)增加Executor个数来增加消费的并行度
3)offset存储
a)CheckPoint(getActiveOrCreate方式创建StreamingContext)
b)手动维护(有事务的存储系统)
c)获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0-10 DirectAPI:
1)Executor读取数据并计算
2)增加Executor个数来增加消费的并行度
3)offset存储
i.a.__consumer_offsets系统主题中
ii.b.手动维护(有事务的存储系统)
到此,关于“Dstream的创建方法”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/bytyc123/blog/4974673