这篇文章主要讲解了“如何使用Hive Catalog”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何使用Hive Catalog”吧!
我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。
Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。
HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。
HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。
普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。
对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false。
尖叫提示:
由于依赖Hive Metastore,所以必须开启Hive MetaStore服务
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir); tableEnv.registerCatalog("myhive", hive); // 使用注册的catalog tableEnv.useCatalog("myhive");
在FlinkSQL Cli中使用Hive Catalog很简单,只需要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:
catalogs: - name: myhive type: hive default-database: default hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true
CREATE TABLE user_behavior ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT, -- 用户行为发生的时间戳 `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behavior', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格式为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false');
我们可以在Hive客户端中查看该表的元数据信息
hive (default)> desc formatted user_behavior;Table Parameters: ... is_generic true ...
从上面的元数据信息可以看出,is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。
上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:
CREATE TABLE hive_compatible_tbl ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT -- 用户行为发生的时间戳 ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behavior', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格式为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false', 'is_generic' = 'false');
当我们在Hive中查看该表的元数据信息时,可以看出:is_generic =false
hive (default)> desc formatted hive_compatible_tbl;Table Parameters: ... is_generic false ...
我们可以使用FlinkSQL Cli或者HiveCli向该表中写入数据,然后分别通过FlinkSQL Cli和Hive Cli去查看该表数据的变化
hive (default)> insert into hive_compatible_tbl select 2020,1221,100,'buy',11,1574330486;hive (default)> select * from hive_compatible_tbl;
再在FlinkSQL Cli中查看该表,
Flink SQL> select user_id,item_id,action from hive_compatible_tbl; user_id item_id action 2020 1221 buy
同样,我们可以在FlinkSQL Cli中去向该表中写入数据:
Flink SQL> insert into hive_compatible_tbl select 2020,1222,101,'fav',11,1574330486;Flink SQL> select user_id,item_id,action from hive_compatible_tbl; user_id item_id action 2020 1221 buy 2020 1222 fav
尖叫提示:
对于Hive兼容的表,需要注意数据类型,具体的数据类型对应关系以及注意点如下
Flink 数据类型 | Hive 数据类型 |
---|---|
CHAR(p) | CHAR(p) |
VARCHAR(p) | VARCHAR(p) |
STRING | STRING |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p, s) | DECIMAL(p, s) |
DATE | DATE |
TIMESTAMP(9) | TIMESTAMP |
BYTES | BINARY |
ARRAY | LIST |
MAP<K, V> | MAP<K, V> |
ROW | STRUCT |
注意:
CHAR(p)
类型的最大长度为255VARCHAR(p)
类型的最大长度为65535MAP
类型的key仅支持基本类型,而Flink’s
MAP
类型的key执行任意类型TIMESTAMP
的精度是 9 , Hive UDFs函数只能处理 precision <= 9的
TIMESTAMP
值TIMESTAMP_WITH_TIME_ZONE
,
TIMESTAMP_WITH_LOCAL_TIME_ZONE
, 及
MULTISET
类型INTERVAL
类型与 Hive
INTERVAL
类型不一样上面介绍了普通表和Hive兼容表,那么我们该如何使用Hive的语法进行建表呢?这个时候就需要使用Hive Dialect。
从Flink1.11.0开始,只要开启了Hive dialect配置,用户就可以使用HiveQL语法,这样我们就可以在Flink中使用Hive的语法使用一些DDL和DML操作。
Flink目前支持两种SQL方言(SQL dialects),分别为:default和hive。默认的SQL方言是default,如果要使用Hive的语法,需要将SQL方言切换到hive。
使用hive dialect只需要配置一个参数即可,该参数名称为:table.sql-dialect。我们就可以在sql-client-defaults.yaml配置文件中进行配置,也可以在具体的会话窗口中进行设定,对于SQL dialect的切换,不需要进行重启session。
execution: planner: blink type: batch result-mode: tableconfiguration: table.sql-dialect: hive
如果我们需要在SQL Cli中进行切换hive dialect,可以使用如下命令:
Flink SQL> set table.sql-dialect=hive; -- 使用hive dialectFlink SQL> set table.sql-dialect=default; -- 使用default dialect
尖叫提示:
一旦切换到了hive dialect,就只能使用Hive的语法建表,如果尝试使用Flink的语法建表,则会报错
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()...build();TableEnvironment tableEnv = TableEnvironment.create(settings);// 使用hive dialecttableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);// 使用 default dialecttableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
Flink SQL> set table.sql-dialect=hive;-- 使用Hive语法创建一张表CREATE TABLE IF NOT EXISTS `hive_dialect_tbl` ( `id` int COMMENT 'id', `name` string COMMENT '名称', `age` int COMMENT '年龄' )COMMENT 'hive dialect表测试'ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
进入Hive客户端去查看该表的元数据信息
desc formatted hive_dialect_tbl;col_name data_type comment# col_name data_type comment id int name string age int # Detailed Table Information Database: default Owner: null CreateTime: Mon Dec 21 17:23:48 CST 2020 LastAccessTime: UNKNOWN Retention: 0 Location: hdfs://kms-1.apache.com:8020/user/hive/warehouse/hive_dialect_tbl Table Type: MANAGED_TABLE Table Parameters: comment hive dialect表测试 is_generic false transient_lastDdlTime 1608542628 # Storage Information SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat Compressed: No Num Buckets: -1 Bucket Columns: [] Sort Columns: [] Storage Desc Params: field.delim , serialization.format ,
很明显,该表是一张Hive兼容表,即is_generic=false。
使用FlinkSQLCli向该表中写入一条数据:
Flink SQL> insert into hive_dialect_tbl select 1,'tom',20;
我们也可以在Hive的Cli中去操作该表
hive (default)> select * from hive_dialect_tbl;hive (default)> insert into hive_dialect_tbl select 2,'jack',22;
以下是使用Hive方言的一些注意事项。
default
在Calcite中是保留关键字,在Hive中是非保留关键字。所以,在使用Hive dialect时,必须使用反引号(`)引用此类关键字,才能将其用作标识符。当然,一旦开启了Hive dialect,我们就可以按照Hive的操作方式在Flink中去处理Hive的数据了,具体的操作与Hive一致,本文不再赘述。
感谢各位的阅读,以上就是“如何使用Hive Catalog”的内容了,经过本文的学习后,相信大家对如何使用Hive Catalog这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/maoxiang/blog/4821431