这篇文章将为大家详细讲解有关Flink SQL怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
Create Table Like
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id)) WITH ( 'connector': 'kafka') CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)LIKE base_table;AI代码助手复制代码
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id), WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH ( ‘connector’: ‘kafka’)AI代码助手复制代码
CREATE [TEMPORARY] TABLE base_table ( id BIGINT, name STRING, tstmp TIMESTAMP, PRIMARY KEY(id)) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json') CREATE [TEMPORARY] TABLE derived_table ( WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND)WITH ( 'connector.starting-offset': '0')LIKE base_table (OVERWRITING OPTIONS, EXCLUDING CONSTRAINTS);
CREATE [TEMPORARY] TABLE derived_table ( id BIGINT, name STRING, tstmp TIMESTAMP, WATERMARK FOR tstmp AS tsmp - INTERVAL '5' SECOND) WITH ( 'connector': 'kafka', 'scan.startup.specific-offsets': 'partition:0,offset:42;partition:1,offset:300', 'format': 'json')AI代码助手复制代码
Dynamic Table Options
create table kafka_table ( id bigint, age int, name STRING) WITH ( 'connector' = 'kafka', 'topic' = 'employees', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '123456', 'format' = 'csv', 'csv.ignore-parse-errors' = 'false')
在之前的版本,如果用户有如下需求:
table_name /*+ OPTIONS('k1'='v1', 'aa.bb.cc'='v2') */AI代码助手复制代码
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);-- override table options in query sourceselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- override table options in joinselect * from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1 join kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2 on t1.id = t2.id;-- override table options for INSERT target tableinsert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;AI代码助手复制代码
// instantiate table environmentTableEnvironment tEnv = ...// access flink configurationConfiguration configuration = tEnv.getConfig().getConfiguration();// set low-level key-value optionsconfiguration.setString("table.dynamic-table-options.enabled", "true");AI代码助手复制代码
SQL API 改进
Current Interface | New Interface |
tEnv.sqlUpdate("CREATE TABLE ..."); | TableResult result = tEnv.executeSql("CREATE TABLE ..."); |
tEnv.sqlUpdate("INSERT INTO ... SELECT ..."); tEnv.execute("test"); | TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ..."); |
execute vs createStatementSet
Hive 语法兼容加强
EnvironmentSettings settings = EnvironmentSettings.newInstance()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// to use hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// use the hive catalogtableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);tableEnv.useCatalog(hiveCatalog.getName());AI代码助手复制代码
create external table tbl1 ( d decimal(10,0), ts timestamp)partitioned by (p string)location '%s'tblproperties('k1'='v1'); create table tbl2 (s struct<ts:timestamp,bin:binary>) stored as orc;create table tbl3 ( m map<timestamp,binary>)partitioned by (p1 bigint, p2 tinyint)row format serde 'org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe';create table tbl4 ( x int, y smallint)row format delimited fields terminated by '|' lines terminated by '\n';AI代码助手复制代码
更简洁的 connector 属性
CREATE TABLE kafkaTable ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3)) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset')
JDBC catalog
CREATE CATALOG mypg WITH( 'type' = 'jdbc', 'default-database' = '...', 'username' = '...', 'password' = '...', 'base-url' = '...');USE CATALOG mypg;
Python UDF 增强
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");tEnv.toDataSet(table, String.class).collect();AI代码助手复制代码
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), udf_type="pandas")def add(i, j): return i + jtable_env = BatchTableEnvironment.create(env)# register the vectorized Python scalar functiontable_env.register_function("add", add)# use the vectorized Python scalar function in Python Table APImy_table.select("add(bigint, bigint)")# use the vectorized Python scalar function in SQL APItable_env.sql_query("SELECT add(bigint, bigint) FROM MyTable")AI代码助手复制代码
关于“Flink SQL怎么用”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/2828172/blog/4387467