本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一节中的springboot-flink-train项目
第一步:创建流处理上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
第二步:读取数据,使用socket流方式读取数据
DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);
第三步:transform
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for(String token: tokens) {
if(token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。
第四步:执行
env.execute("StreamingWCJavaApp");
整体代码如下:
/**
* 使用Java API来开发Flink的实时处理应用程序
* wc统计的数据源自socket
*/
public class StreamingWCJava02App {
public static void main(String[] args) throws Exception {
// 获取参数
int port;
try{
ParameterTool tool = ParameterTool.fromArgs(args);
port = tool.getInt("port");
} catch (Exception e) {
System.out.println("端口未设置, 使用默认端口9999");
port = 9999;
}
// step1: 获取流处理上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// step2: 读取数据
DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port);
// step3: transform
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for(String token: tokens) {
if(token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
env.execute("StreamingWCJavaApp");
}
}
首先在192.168.152.45上运行命令
nc -l 9999
然后在运行main方法。在192.168.152.45的nc上输入
abc,def,abc,ddd
在idea控制台输出如下:
4> (abc,2)
1> (def,1)
4> (ddd,1)
这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] tokens = value.toLowerCase().split(",");
for(String token: tokens) {
if(token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
这样控制台的打印结果如下:
(abc,2)
(ddd,1)
(def,1)
这样一个简单的demo就成功了!
上面的代码中localhost与port需要用参数传递进来。
代码如下:
// 获取参数
int port;
try{
ParameterTool tool = ParameterTool.fromArgs(args);
port = tool.getInt("port");
} catch (Exception e) {
System.out.println("端口未设置, 使用默认端口9999");
port = 9999;
}
使用Flink提供的ParameterTool来接收参数。
我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。
在运行时,配置参数:
这样运行就可以从外界传递参数了
接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:
/**
* 使用Scala开发Flink的实时处理应用程序
*/
object StreamingWCScalaApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 引入隐式转换
import org.apache.flink.api.scala._
val text = env.socketTextStream("192.168.152.45", 9999)
text.flatMap(_.split(","))
.map((_,1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
.setParallelism(1)
env.execute("StreamingWCScalaApp");
}
}
这种方式比java实现更加简洁。
上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/duanvincent/blog/3098885