这篇文章主要介绍“flinksql 表怎么读取外部文件”,在日常操作中,相信很多人在flinksql 表怎么读取外部文件问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”flinksql 表怎么读取外部文件”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1、TableEnvironment 可以注册目录 Catalog, 可以基于catalog 注册表
2、表 是由一个标识符 来指定的, 由三部分组成:catalog、 数据库名、对象名
3、表可以是常规的,也可以是虚拟的(视图)
4、常规表一般可以用来描述外部数据, 比如文件、数据库或消息队列的数据,也可以直接从datastream转换过来
5、视图可以从现有的表中创建,通常是table api 或者 sql 查询的一个结果集
代码:
package com.jd.data;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
import scala.Tuple3;
public class TableAipDemo03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1、创建表执行环节
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String path = "/Users/liuhaijing/Desktop/flinktestword/aaa.txt";
tableEnv.connect(new FileSystem().path(path))
.withFormat(new OldCsv()) // 定义格式化方法
.withSchema(new Schema().field("a", DataTypes.STRING()) // 定义表的结构
.field("b", DataTypes.STRING())
.field("c", DataTypes.STRING())
)
.inAppendMode()
.createTemporaryTable("xxx");
Table xxx = tableEnv.from("xxx");
xxx.printSchema();
tableEnv.toAppendStream(xxx, Row.class ).print();
env.execute("job");
}
}
到此,关于“flinksql 表怎么读取外部文件”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/captainliu/blog/4973247