Flink开发如何批处理应用程序,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
词频统计,即给一个文件,统计文件中每个单词出现的次数,分隔符是\t。这个文件内容如下:
hello world welcome hello welcome
统计结果直接打印在控制台。生产环境下一般Sink到目的地。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train version:1.0 这样就创建了一个项目,使用Idea导入这个项目,项目结构如下:
里面有两个自动为我们准备好的java类。
第一步:创建批处理上下文环境
// set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
第二步:读取数据
env.readTextFile(textPath);
第三步:transform operations,例如 filter() flatMap() join() coGroup(),这是开发的核心所在,一般就是业务逻辑
第四步:execute program
第一步:读取数据
hello welcome
第二步:每一行的数据按照指定的分隔符拆分
hello welcome
第三步:为每一个单词赋上次数为1
(hello,1) (welcome,1)
第四步:合并操作
/** * 使用Java API来开发Flink的批处理应用程序 */ public class BatchWCJavaApp { public static void main(String[] args) throws Exception { String input = "E:/test/input/test.txt"; // step1: 获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // step2: 读取数据 DataSource<String> text = env.readTextFile(input); // step3: transform // FlatMapFunction<String, Tuple2<String, Integer>表示进来一个String, 转换成一个<String, Integer>类型 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { /** * * @param value 就是一行一行的字符串 * @param out 转换成(单词,次数) * @throws Exception */ @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\t"); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).groupBy(0).sum(1).print(); } }
(world,1) (hello,2) (welcome,2)
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 这样就创建了一个项目,使用Idea导入这个项目:
接下来的开发步骤与使用java实现的开发步骤是一样的:这里给出
import org.apache.flink.api.scala.ExecutionEnvironment /** * 使用Scala开发Flink的批处理应用程序 */ object BatchWCScalaApp { def main(args: Array[String]): Unit = { val input = "E:/test/input/test.txt" val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(input) // 引入隐式转换 import org.apache.flink.api.scala._ text.flatMap(_.toLowerCase.split("\t")) .filter(_.nonEmpty) .map((_, 1)) .groupBy(0) .sum(1) .print() } }
也就是transform部分虽然原理是一样的,但是实现的方式不一样,scala更加简洁
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。