这篇文章主要介绍Flink 1.11中流批一体Hive数仓的示例分析,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
数仓架构
离线数仓
传统的离线数仓是由 Hive 加上 HDFS 的方案,Hive 数仓有着成熟和稳定的大数据分析能力,结合调度和上下游工具,构建一个完整的数据处理分析平台,流程如下:
Flume 把数据导入 Hive 数仓
调度工具,调度 ETL 作业进行数据处理
在 Hive 数仓的表上,可以进行灵活的 Ad-hoc 查询
调度工具,调度聚合作业输出到BI层的数据库中
这个流程下的问题是:
导入过程不够灵活,这应该是一个灵活 SQL 流计算的过程
基于调度作业的级联计算,实时性太差
ETL 不能有流式的增量计算
实时数仓
针对离线数仓的特点,随着实时计算的流行,越来越多的公司引入实时数仓,实时数仓基于 Kafka + Flink streaming,定义全流程的流计算作业,有着秒级甚至毫秒的实时性。
但是,实时数仓的一个问题是历史数据只有 3-15 天,无法在其上做 Ad-hoc 的查询。如果搭建 Lambda 的离线+实时的架构,维护成本、计算存储成本、一致性保证、重复的开发会带来很大的负担。
Hive 实时化
Hive streaming sink
-- 结合Hive dialect使用Hive DDL语法SET table.sql-dialect=hive;CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE) PARTITIONED BY ( dt STRING, hour STRING) STORED AS PARQUET TBLPROPERTIES ( -- 使用partition中抽取时间,加上watermark决定partiton commit的时机 'sink.partition-commit.trigger'='partition-time', -- 配置hour级别的partition时间抽取策略,这个例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小时,timestamp-pattern定义了如何从这两个partition字段推出完整的timestamp 'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’, -- 配置dalay为小时级,当 watermark > partition时间 + 1小时,会commit这个partition 'sink.partition-commit.delay'='1 h', -- partitiion commit的策略是:先更新metastore(addPartition),再写SUCCESS文件 'sink.partition-commit.policy.kind’='metastore,success-file') SET table.sql-dialect=default;CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND) -- 可以结合Table Hints动态指定table properties [3]INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
Hive streaming source
SELECT * FROM hive_table/*+ OPTIONS('streaming-source.enable'=’true’,'streaming-source.consume-start-offset'='2020-05-20') */;
实时数据关联 Hive 表
SELECT o.amout, o.currency, r.rate, o.amount * r.rateFROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency
Hive 增强
Hive Dialect 语法兼容
向量化读取
简化 Hive 依赖
Flink 增强
Flink Filesystem connector
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
hour STRING
) PARTITIONED BY (dt, hour) WITH (
’connector’=’filesystem’,
’path’=’...’,
’format’=’parquet’,
'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
'sink.partition-commit.delay'='1 h',
‘sink.partition-commit.policy.kind’='success-file')
)
-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
-- 通过 Partition 查询
SELECT * FROM fs_table WHERE dt=’2020-05-20’ and hour=’12’;
引入 Max Slot
slotmanager.number-of-slots.max
以上是“Flink 1.11中流批一体Hive数仓的示例分析”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。