本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
使用上一节中的springboot-flink-train项目
第一步:创建流处理上下文环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
第二步:读取数据,使用socket流方式读取数据
DataStreamSource<String> text = env.socketTextStream("192.168.152.45", 9999);
第三步:transform
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();
这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。
第四步:执行
env.execute("StreamingWCJavaApp");
整体代码如下:
/** * 使用Java API来开发Flink的实时处理应用程序 * wc统计的数据源自socket */ public class StreamingWCJava02App { public static void main(String[] args) throws Exception { // 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; } // step1: 获取流处理上下文环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // step2: 读取数据 DataStreamSource<String> text = env.socketTextStream("192.168.152.45", port); // step3: transform text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print(); env.execute("StreamingWCJavaApp"); } }
首先在192.168.152.45上运行命令
nc -l 9999
然后在运行main方法。在192.168.152.45的nc上输入
abc,def,abc,ddd
在idea控制台输出如下:
4> (abc,2) 1> (def,1) 4> (ddd,1)
这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:
text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
这样控制台的打印结果如下:
(abc,2) (ddd,1) (def,1)
这样一个简单的demo就成功了!
上面的代码中localhost与port需要用参数传递进来。
代码如下:
// 获取参数 int port; try{ ParameterTool tool = ParameterTool.fromArgs(args); port = tool.getInt("port"); } catch (Exception e) { System.out.println("端口未设置, 使用默认端口9999"); port = 9999; }
使用Flink提供的ParameterTool来接收参数。
我们在运行时就可以指定参数列表了,其中的key必须以“-”或者“--”开头。
在运行时,配置参数:
这样运行就可以从外界传递参数了
接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:
/** * 使用Scala开发Flink的实时处理应用程序 */ object StreamingWCScalaApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 引入隐式转换 import org.apache.flink.api.scala._ val text = env.socketTextStream("192.168.152.45", 9999) text.flatMap(_.split(",")) .map((_,1)) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print() .setParallelism(1) env.execute("StreamingWCScalaApp"); } }
这种方式比java实现更加简洁。
上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。