如何入门ApacheFlink中的Flinksink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
将DataSet中的数据Sink到哪里去。使用的是对应的OutPutFormat,也可以使用自定义的sink,有可能写到hbase中,hdfs中。
writeAsText() / TextOutputFormat ,以String的形式写入
writeAsCsv(...) / CsvOutputFormat,以CSV的方式写进去
print() / printToErr() / print(String msg) / printToErr(String msg)以标准输出
object DataSetSinkApp {
def main(args: Array[String]): Unit = {
val environment = ExecutionEnvironment.getExecutionEnvironment
val data = 1.to(10)
val text = environment.fromCollection(data)
val filePath = "E:/test"
text.writeAsText(filePath)
environment.execute("DataSetSinkApp")
}
}
如果E:/test文件或者文件夹存在,将无法执行成功。除非增加一个WriteMode.OVERWRITE
text.writeAsText(filePath, WriteMode.OVERWRITE)
这样就在E盘下新建了一个test文件,内容是1到10。
那么如何保存到文件夹中?
text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)
设置并行度为2,这样就存到test文件夹下,两个文件1和2
默认情况下,不设置并行度,会把结果写到一个文件中,如果设置并行度,那么每一个并行度都对应一个输出。
public static void main(String[] args) throws Exception {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
List<Integer> info = new ArrayList<>();
for(int i = 1;i <=10; i++) {
info.add(i);
}
DataSource<Integer> data1 = executionEnvironment.fromCollection(info);
String filePath = "E:/test2";
data1.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);
executionEnvironment.execute("JavaDataSetSinkApp");
}
看完上述内容,你们掌握如何入门ApacheFlink中的Flinksink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/duanvincent/blog/3104250