这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
示例环境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.Date; import java.util.Iterator; import java.util.List; /** * @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案 */ public class TimestampsAndWatermarks { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html */ /** * 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* TimeCharacteristic有三种时间类型: ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间; IngestionTime:以数据进入flink streaming data flow的时间为准; EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线; */ //使用event time,需要指定事件的时间戳 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); //设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。 env.getConfig().setAutoWatermarkInterval(1000L); List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList(); DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction()); DataStream<Tuple2<String, Integer>> dataStream = inStream //为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。 //Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束; // .assignTimestampsAndWatermarks( // WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // .withTimestampAssigner((element, timestamp) -> { // long times = System.currentTimeMillis() ; // System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); // return times; // }) // ) .assignTimestampsAndWatermarks(new MyWatermarkStrategy() .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() { @Override public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) { long times = System.currentTimeMillis(); System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); return times; } })) //分区窗口 .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1) //触发3s滚动窗口 .window(TumblingEventTimeWindows.of(Time.seconds(3))) //执行窗口数据,对keyBy数据流批量处理 .apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){ @Override public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception { long times = System.currentTimeMillis() ; System.out.println(); System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss")); Iterator<Tuple3<String, String, Integer>> iterator = input.iterator(); int total = 0; int size = 0; String sex = ""; while (iterator.hasNext()){ Tuple3<String, String, Integer> tuple3 = iterator.next(); total += tuple3.f2; size ++; sex = tuple3.f1; } out.collect(new Tuple2<>(sex, total / size)); } }); dataStream.print(); env.execute("flink Filter job"); } /** * 定期水印生成器 */ public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{ @Override public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Tuple3<String, String, Integer>>() { //设置固定的延迟量3.5 seconds private final long maxOutOfOrderness = 3500; private long currentMaxTimestamp; /** * 事件处理 * @param event 数据流对象 * @param eventTimestamp 事件水位线时间 * @param output 输出 */ @Override public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) { currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算) output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1)); } }; } } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> { @Override public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception { List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList(); int j = 0; for (int i=0;i<100;i++){ if (i%6 == 0){ j=0; } ctx.collect(tuple3List.get(j)); //1秒钟输出一个 Thread.sleep(1 * 1000); j ++; } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } } }
打印结果
man,张三的水位线为:2020-12-27 10:28:20 girl,李四的水位线为:2020-12-27 10:28:21 man,王五的水位线为:2020-12-27 10:28:22 girl,刘六的水位线为:2020-12-27 10:28:23 girl,伍七的水位线为:2020-12-27 10:28:24 窗口处理时间:2020-12-27 10:28:25 (man,20) man,吴八的水位线为:2020-12-27 10:28:25 man,张三的水位线为:2020-12-27 10:28:26 girl,李四的水位线为:2020-12-27 10:28:27 窗口处理时间:2020-12-27 10:28:28 (girl,28) 窗口处理时间:2020-12-27 10:28:28 (man,29)
关于“Flink中Watermarks怎么用”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。