温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Flink的SideOutputSplit分流怎么实现

发布时间:2021-12-31 15:29:28 来源:亿速云 阅读:139 作者:iii 栏目:大数据

这篇文章主要讲解了“Flink的SideOutputSplit分流怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink的SideOutputSplit分流怎么实现”吧!

版本说明:

环境: Windiws

Scala: 2.11.8

Flink :1.10.1

大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。

除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。

process function的side outputs功能可以产生多条流(Flink 1.9版本之后推荐此种方案),并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

/**
  *
  * @param deviceNo    设备号
  * @param timestamp   时间戳
  * @param temperature 温度
  */
case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double)

object SensorReadingSplitStreaming {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //设置时间语义  时间发生时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)


        val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt")

        val mapStream: DataStream[SensorReading] = socketSource
            .map(data => {
                val split: Array[String] = data.split(",")
                SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble)
            })

        //对数据流进行分流处理
        val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess())

        tmpStageStream.print("main");
        val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp"))
        val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp"))
        lowStream.print("low")
        highStream.print("high")
        env.execute()
    }

}


class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] {
    // 定义侧输出流
    lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp");
    lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp");

    //处理数据
    override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = {
        if (value.temperature < 10) {
            context.output(lowTmp, (value.deviceNo, value.temperature))
        } else if (value.temperature > 70) {
            context.output(HighTmp, (value.deviceNo, value.temperature))
        } else {
            collector.collect(value)
        }
    }
}  //测试文件内容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

/*
设备8,1610035289736,84.3
设备5,1610035371758,38.8
设备5,1610035458637,60.2
设备1,1610035543127,10.2
设备7,1610035623427,51.6
设备5,1610035705302,20.1
设备5,1610035787387,12.9
设备7,1610035877019,88.2
设备6,1610035960537,33.5
设备7,1610036043040,63.0
设备5,1610036125179,64.5
设备6,1610036214972,30.2
设备5,1610036296542,56.5
设备7,1610036377999,29.7
设备6,1610036467523,59.4
设备4,1610036557446,71.1
设备5,1610036641100,28.2
设备2,1610036725803,88.8
设备8,1610036808041,73.5
设备1,1610036897060,18.0
设备7,1610036980127,14.9
设备2,1610037069523,47.4
设备4,1610037154507,59.5
设备5,1610037235099,35.0
设备6,1610037317868,76.4
设备2,1610037403367,10.0
设备2,1610037484177,18.5
设备4,1610037571384,98.7
设备5,1610037653666,95.6
设备6,1610037735520,32.6
设备6,1610037823906,83.3
设备3,1610037913756,29.1
设备7,1610037994980,74.6
设备6,1610038081606,22.2
设备3,1610038163043,10.4
设备5,1610038244717,56.9
设备3,1610038326227,64.8
设备4,1610038411053,65.0
设备8,1610038500538,93.2
设备8,1610038583924,76.2
设备1,1610038670150,42.1
设备5,1610038756839,35.1
设备3,1610038840180,75.9
设备3,1610038929751,83.4
设备7,1610039019422,24.1
设备3,1610039101778,85.0
设备8,1610039183077,45.6
设备3,1610039264498,79.5
设备1,1610039351600,44.4
设备8,1610039434187,73.3
设备3,1610039518048,77.9
设备7,1610039598556,9.79
设备4,1610039679144,19.0
设备2,1610039761967,56.1
设备3,1610039847823,88.2
设备6,1610039933024,77.4
设备7,1610040014212,14.4
设备4,1610040101627,98.2
设备8,1610040182379,85.0
设备6,1610040265210,61.8
设备2,1610040345769,48.0
设备3,1610040432855,19.9
设备4,1610040515943,30.9
设备4,1610040601373,51.7
设备1,1610040681803,29.7
设备8,1610040770779,31.6
设备3,1610040851986,67.1
设备4,1610040941421,93.2
设备7,1610041022836,37.2
设备8,1610041105401,84.6
设备6,1610041189301,19.2
设备4,1610041270735,99.0
设备4,1610041354109,77.0
设备5,1610041435113,49.7
设备1,1610041521773,74.2
设备8,1610041603035,42.2
设备3,1610041687230,87.1
设备1,1610041767985,82.7
设备3,1610041848130,0.59
设备4,1610041933021,7.38
设备2,1610042016080,28.9
设备2,1610042103229,99.2
设备2,1610042190222,42.2
设备3,1610042277841,12.0
设备7,1610042364076,93.5
设备7,1610042444652,10.5
设备4,1610042530461,68.5
设备1,1610042615421,78.2
设备3,1610042702219,18.5
设备6,1610042787478,64.8
设备5,1610042874301,6.34
设备2,1610042956073,65.6
设备8,1610043038793,10.6
设备8,1610043122971,30.3
设备7,1610043203810,17.5
设备8,1610043291566,83.8
设备5,1610043373188,30.5
设备2,1610043456107,84.7
设备1,1610043545998,53.4
设备3,1610043627174,97.4

 */

输出结果:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17:19:42,659 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:42,725 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:43,088 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
17:19:43,089 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
high> (设备8,84.3)
main> SensorReading(设备5,1610035371758,38.8)
main> SensorReading(设备5,1610035458637,60.2)
main> SensorReading(设备1,1610035543127,10.2)
main> SensorReading(设备7,1610035623427,51.6)
main> SensorReading(设备5,1610035705302,20.1)
main> SensorReading(设备5,1610035787387,12.9)
high> (设备7,88.2)
main> SensorReading(设备6,1610035960537,33.5)
main> SensorReading(设备7,1610036043040,63.0)
main> SensorReading(设备5,1610036125179,64.5)
main> SensorReading(设备6,1610036214972,30.2)
main> SensorReading(设备5,1610036296542,56.5)
main> SensorReading(设备7,1610036377999,29.7)
main> SensorReading(设备6,1610036467523,59.4)
high> (设备4,71.1)
main> SensorReading(设备5,1610036641100,28.2)
high> (设备2,88.8)
high> (设备8,73.5)
main> SensorReading(设备1,1610036897060,18.0)
main> SensorReading(设备7,1610036980127,14.9)
main> SensorReading(设备2,1610037069523,47.4)
main> SensorReading(设备4,1610037154507,59.5)
main> SensorReading(设备5,1610037235099,35.0)
high> (设备6,76.4)
main> SensorReading(设备2,1610037403367,10.0)
main> SensorReading(设备2,1610037484177,18.5)
high> (设备4,98.7)
high> (设备5,95.6)
main> SensorReading(设备6,1610037735520,32.6)
high> (设备6,83.3)
main> SensorReading(设备3,1610037913756,29.1)
high> (设备7,74.6)
main> SensorReading(设备6,1610038081606,22.2)
main> SensorReading(设备3,1610038163043,10.4)
main> SensorReading(设备5,1610038244717,56.9)
main> SensorReading(设备3,1610038326227,64.8)
main> SensorReading(设备4,1610038411053,65.0)
high> (设备8,93.2)
high> (设备8,76.2)
main> SensorReading(设备1,1610038670150,42.1)
main> SensorReading(设备5,1610038756839,35.1)
high> (设备3,75.9)
high> (设备3,83.4)
main> SensorReading(设备7,1610039019422,24.1)
high> (设备3,85.0)
main> SensorReading(设备8,1610039183077,45.6)
high> (设备3,79.5)
main> SensorReading(设备1,1610039351600,44.4)
high> (设备8,73.3)
high> (设备3,77.9)
low> (设备7,9.79)
main> SensorReading(设备4,1610039679144,19.0)
main> SensorReading(设备2,1610039761967,56.1)
high> (设备3,88.2)
high> (设备6,77.4)
main> SensorReading(设备7,1610040014212,14.4)
high> (设备4,98.2)
high> (设备8,85.0)
main> SensorReading(设备6,1610040265210,61.8)
main> SensorReading(设备2,1610040345769,48.0)
main> SensorReading(设备3,1610040432855,19.9)
main> SensorReading(设备4,1610040515943,30.9)
main> SensorReading(设备4,1610040601373,51.7)
main> SensorReading(设备1,1610040681803,29.7)
main> SensorReading(设备8,1610040770779,31.6)
main> SensorReading(设备3,1610040851986,67.1)
high> (设备4,93.2)
main> SensorReading(设备7,1610041022836,37.2)
high> (设备8,84.6)
main> SensorReading(设备6,1610041189301,19.2)
high> (设备4,99.0)
high> (设备4,77.0)
main> SensorReading(设备5,1610041435113,49.7)
high> (设备1,74.2)
main> SensorReading(设备8,1610041603035,42.2)
high> (设备3,87.1)
high> (设备1,82.7)
low> (设备3,0.59)
low> (设备4,7.38)
main> SensorReading(设备2,1610042016080,28.9)
high> (设备2,99.2)
main> SensorReading(设备2,1610042190222,42.2)
main> SensorReading(设备3,1610042277841,12.0)
high> (设备7,93.5)
main> SensorReading(设备7,1610042444652,10.5)
main> SensorReading(设备4,1610042530461,68.5)
high> (设备1,78.2)
main> SensorReading(设备3,1610042702219,18.5)
main> SensorReading(设备6,1610042787478,64.8)
low> (设备5,6.34)
main> SensorReading(设备2,1610042956073,65.6)
main> SensorReading(设备8,1610043038793,10.6)
main> SensorReading(设备8,1610043122971,30.3)
main> SensorReading(设备7,1610043203810,17.5)
high> (设备8,83.8)
main> SensorReading(设备5,1610043373188,30.5)
high> (设备2,84.7)
main> SensorReading(设备1,1610043545998,53.4)
high> (设备3,97.4)

Process finished with exit code 0

感谢各位的阅读,以上就是“Flink的SideOutputSplit分流怎么实现”的内容了,经过本文的学习后,相信大家对Flink的SideOutputSplit分流怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI