这篇文章主要讲解了“Flink中怎么使用split”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Flink中怎么使用split”吧!
flink的神奇分流器-sideoutput
这个可以用来分流,很方便的一次就可以对数据进行筛选返回。
还有针对算法处理的迭代操作,我们已经讲过两篇文章了:
Flink特异的迭代操作-bulkIteration
不得不会的Flink Dataset的DeltaI迭代操作
一个是全量迭代,一个是增量迭代。
还有优秀又鸡肋的watermark机制
不懂watermark?来吧~
对于迭代操作,其实还有一讲,那就是流处理的迭代操作。那么本文就针对这个进行分析~
Flink的迭代流程序实际就是实现了一个步进函数,然后将其嵌入到IterativeStream内部。要知道Flink的Datastream正常情况下是不会结束的,所以也没有所谓的最大迭代次数。这种情况下,你需要自己指定哪个类型的数据需要回流去继续迭代,哪个类型的数据继续向下传输,这个分流的方式有两种:split和filter,官方网站在介绍迭代流的时候使用的是filter。我们这里就先按照官网的介绍走,然后案例展示的时候使用split给大家做个demo。
首先,要创建一个IterativeStream
IterativeStream<Integer> iteration =input.iterate();
接着就可以定义对该留要进行的逻辑操作,官网这里就很简单的举了一个map的例子。
DataStream<Integer> iterationBody =iteration.map(/* this is executed many times */);
调用IterativeStream的closeWith(feedbackStream)方法可以对迭代流进行闭环操作。传递给closeWith函数的DataStream会返回值迭代的头部。常用的做法是用filter来分离流的向后迭代的部分和向前传递的部分。。
iteration.closeWith(iterationBody.filter(/*one part of the stream */));
DataStream<Integer> output =iterationBody.filter(/* some other part of the stream */);
官方给了一个连续不断减1直到数据为零的例子:
DataStream<Long> someIntegers =env.generateSequence(0, 1000);
// 创建迭代流
IterativeStream<Long> iteration =someIntegers.iterate();
// 增加处理逻辑,对元素执行减一操作。
DataStream<Long> minusOne =iteration.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
return value - 1 ;
}
});
// 获取要进行迭代的流,
DataStream<Long> stillGreaterThanZero= minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value > 0);
}
});
// 对需要迭代的流形成一个闭环
iteration.closeWith(stillGreaterThanZero);
// 小于等于0的数据继续向前传输
DataStream<Long> lessThanZero =minusOne.filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long value) throws Exception {
return (value <= 0);
}
});
感谢各位的阅读,以上就是“Flink中怎么使用split”的内容了,经过本文的学习后,相信大家对Flink中怎么使用split这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。