这篇文章主要讲解了“Flink的SideOutputSplit分流怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink的SideOutputSplit分流怎么实现”吧!
环境: Windiws
Scala: 2.11.8
Flink :1.10.1
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。
除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。
process function的side outputs功能可以产生多条流(Flink 1.9版本之后推荐此种方案),并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector /** * * @param deviceNo 设备号 * @param timestamp 时间戳 * @param temperature 温度 */ case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double) object SensorReadingSplitStreaming { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //设置时间语义 时间发生时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt") val mapStream: DataStream[SensorReading] = socketSource .map(data => { val split: Array[String] = data.split(",") SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble) }) //对数据流进行分流处理 val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess()) tmpStageStream.print("main"); val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp")) val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp")) lowStream.print("low") highStream.print("high") env.execute() } } class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] { // 定义侧输出流 lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp"); lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp"); //处理数据 override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = { if (value.temperature < 10) { context.output(lowTmp, (value.deviceNo, value.temperature)) } else if (value.temperature > 70) { context.output(HighTmp, (value.deviceNo, value.temperature)) } else { collector.collect(value) } } } //测试文件内容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ /* 设备8,1610035289736,84.3 设备5,1610035371758,38.8 设备5,1610035458637,60.2 设备1,1610035543127,10.2 设备7,1610035623427,51.6 设备5,1610035705302,20.1 设备5,1610035787387,12.9 设备7,1610035877019,88.2 设备6,1610035960537,33.5 设备7,1610036043040,63.0 设备5,1610036125179,64.5 设备6,1610036214972,30.2 设备5,1610036296542,56.5 设备7,1610036377999,29.7 设备6,1610036467523,59.4 设备4,1610036557446,71.1 设备5,1610036641100,28.2 设备2,1610036725803,88.8 设备8,1610036808041,73.5 设备1,1610036897060,18.0 设备7,1610036980127,14.9 设备2,1610037069523,47.4 设备4,1610037154507,59.5 设备5,1610037235099,35.0 设备6,1610037317868,76.4 设备2,1610037403367,10.0 设备2,1610037484177,18.5 设备4,1610037571384,98.7 设备5,1610037653666,95.6 设备6,1610037735520,32.6 设备6,1610037823906,83.3 设备3,1610037913756,29.1 设备7,1610037994980,74.6 设备6,1610038081606,22.2 设备3,1610038163043,10.4 设备5,1610038244717,56.9 设备3,1610038326227,64.8 设备4,1610038411053,65.0 设备8,1610038500538,93.2 设备8,1610038583924,76.2 设备1,1610038670150,42.1 设备5,1610038756839,35.1 设备3,1610038840180,75.9 设备3,1610038929751,83.4 设备7,1610039019422,24.1 设备3,1610039101778,85.0 设备8,1610039183077,45.6 设备3,1610039264498,79.5 设备1,1610039351600,44.4 设备8,1610039434187,73.3 设备3,1610039518048,77.9 设备7,1610039598556,9.79 设备4,1610039679144,19.0 设备2,1610039761967,56.1 设备3,1610039847823,88.2 设备6,1610039933024,77.4 设备7,1610040014212,14.4 设备4,1610040101627,98.2 设备8,1610040182379,85.0 设备6,1610040265210,61.8 设备2,1610040345769,48.0 设备3,1610040432855,19.9 设备4,1610040515943,30.9 设备4,1610040601373,51.7 设备1,1610040681803,29.7 设备8,1610040770779,31.6 设备3,1610040851986,67.1 设备4,1610040941421,93.2 设备7,1610041022836,37.2 设备8,1610041105401,84.6 设备6,1610041189301,19.2 设备4,1610041270735,99.0 设备4,1610041354109,77.0 设备5,1610041435113,49.7 设备1,1610041521773,74.2 设备8,1610041603035,42.2 设备3,1610041687230,87.1 设备1,1610041767985,82.7 设备3,1610041848130,0.59 设备4,1610041933021,7.38 设备2,1610042016080,28.9 设备2,1610042103229,99.2 设备2,1610042190222,42.2 设备3,1610042277841,12.0 设备7,1610042364076,93.5 设备7,1610042444652,10.5 设备4,1610042530461,68.5 设备1,1610042615421,78.2 设备3,1610042702219,18.5 设备6,1610042787478,64.8 设备5,1610042874301,6.34 设备2,1610042956073,65.6 设备8,1610043038793,10.6 设备8,1610043122971,30.3 设备7,1610043203810,17.5 设备8,1610043291566,83.8 设备5,1610043373188,30.5 设备2,1610043456107,84.7 设备1,1610043545998,53.4 设备3,1610043627174,97.4 */
输出结果:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17:19:42,659 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. 17:19:42,725 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. 17:19:43,088 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set. 17:19:43,089 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'. high> (设备8,84.3) main> SensorReading(设备5,1610035371758,38.8) main> SensorReading(设备5,1610035458637,60.2) main> SensorReading(设备1,1610035543127,10.2) main> SensorReading(设备7,1610035623427,51.6) main> SensorReading(设备5,1610035705302,20.1) main> SensorReading(设备5,1610035787387,12.9) high> (设备7,88.2) main> SensorReading(设备6,1610035960537,33.5) main> SensorReading(设备7,1610036043040,63.0) main> SensorReading(设备5,1610036125179,64.5) main> SensorReading(设备6,1610036214972,30.2) main> SensorReading(设备5,1610036296542,56.5) main> SensorReading(设备7,1610036377999,29.7) main> SensorReading(设备6,1610036467523,59.4) high> (设备4,71.1) main> SensorReading(设备5,1610036641100,28.2) high> (设备2,88.8) high> (设备8,73.5) main> SensorReading(设备1,1610036897060,18.0) main> SensorReading(设备7,1610036980127,14.9) main> SensorReading(设备2,1610037069523,47.4) main> SensorReading(设备4,1610037154507,59.5) main> SensorReading(设备5,1610037235099,35.0) high> (设备6,76.4) main> SensorReading(设备2,1610037403367,10.0) main> SensorReading(设备2,1610037484177,18.5) high> (设备4,98.7) high> (设备5,95.6) main> SensorReading(设备6,1610037735520,32.6) high> (设备6,83.3) main> SensorReading(设备3,1610037913756,29.1) high> (设备7,74.6) main> SensorReading(设备6,1610038081606,22.2) main> SensorReading(设备3,1610038163043,10.4) main> SensorReading(设备5,1610038244717,56.9) main> SensorReading(设备3,1610038326227,64.8) main> SensorReading(设备4,1610038411053,65.0) high> (设备8,93.2) high> (设备8,76.2) main> SensorReading(设备1,1610038670150,42.1) main> SensorReading(设备5,1610038756839,35.1) high> (设备3,75.9) high> (设备3,83.4) main> SensorReading(设备7,1610039019422,24.1) high> (设备3,85.0) main> SensorReading(设备8,1610039183077,45.6) high> (设备3,79.5) main> SensorReading(设备1,1610039351600,44.4) high> (设备8,73.3) high> (设备3,77.9) low> (设备7,9.79) main> SensorReading(设备4,1610039679144,19.0) main> SensorReading(设备2,1610039761967,56.1) high> (设备3,88.2) high> (设备6,77.4) main> SensorReading(设备7,1610040014212,14.4) high> (设备4,98.2) high> (设备8,85.0) main> SensorReading(设备6,1610040265210,61.8) main> SensorReading(设备2,1610040345769,48.0) main> SensorReading(设备3,1610040432855,19.9) main> SensorReading(设备4,1610040515943,30.9) main> SensorReading(设备4,1610040601373,51.7) main> SensorReading(设备1,1610040681803,29.7) main> SensorReading(设备8,1610040770779,31.6) main> SensorReading(设备3,1610040851986,67.1) high> (设备4,93.2) main> SensorReading(设备7,1610041022836,37.2) high> (设备8,84.6) main> SensorReading(设备6,1610041189301,19.2) high> (设备4,99.0) high> (设备4,77.0) main> SensorReading(设备5,1610041435113,49.7) high> (设备1,74.2) main> SensorReading(设备8,1610041603035,42.2) high> (设备3,87.1) high> (设备1,82.7) low> (设备3,0.59) low> (设备4,7.38) main> SensorReading(设备2,1610042016080,28.9) high> (设备2,99.2) main> SensorReading(设备2,1610042190222,42.2) main> SensorReading(设备3,1610042277841,12.0) high> (设备7,93.5) main> SensorReading(设备7,1610042444652,10.5) main> SensorReading(设备4,1610042530461,68.5) high> (设备1,78.2) main> SensorReading(设备3,1610042702219,18.5) main> SensorReading(设备6,1610042787478,64.8) low> (设备5,6.34) main> SensorReading(设备2,1610042956073,65.6) main> SensorReading(设备8,1610043038793,10.6) main> SensorReading(设备8,1610043122971,30.3) main> SensorReading(设备7,1610043203810,17.5) high> (设备8,83.8) main> SensorReading(设备5,1610043373188,30.5) high> (设备2,84.7) main> SensorReading(设备1,1610043545998,53.4) high> (设备3,97.4) Process finished with exit code 0
感谢各位的阅读,以上就是“Flink的SideOutputSplit分流怎么实现”的内容了,经过本文的学习后,相信大家对Flink的SideOutputSplit分流怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。