温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

基于Flink1.11的SQL构建实时数仓怎么实现

发布时间:2021-12-16 13:58:43 阅读:183 作者:iii 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

本篇内容主要讲解“基于Flink1.11的SQL构建实时数仓怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“基于Flink1.11的SQL构建实时数仓怎么实现”吧!

案例简介

本文会以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及太复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。 

架构设计

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行JOIN,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

基于Flink1.11的SQL构建实时数仓怎么实现

 

业务数据准备

  • 订单表(order_info)
CREATE TABLE `order_info` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',  `consignee` varchar(100DEFAULT NULL COMMENT '收货人',  `consignee_tel` varchar(20DEFAULT NULL COMMENT '收件人电话',  `total_amount` decimal(10,2DEFAULT NULL COMMENT '总金额',  `order_status` varchar(20DEFAULT NULL COMMENT '订单状态',  `user_id` bigint(20DEFAULT NULL COMMENT '用户id',  `payment_way` varchar(20DEFAULT NULL COMMENT '付款方式',  `delivery_address` varchar(1000DEFAULT NULL COMMENT '送货地址',  `order_comment` varchar(200DEFAULT NULL COMMENT '订单备注',  `out_trade_no` varchar(50DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',  `trade_body` varchar(200DEFAULT NULL COMMENT '订单描述(第三方支付用)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  `operate_time` datetime DEFAULT NULL COMMENT '操作时间',  `expire_time` datetime DEFAULT NULL COMMENT '失效时间',  `tracking_no` varchar(100DEFAULT NULL COMMENT '物流单编号',  `parent_order_id` bigint(20DEFAULT NULL COMMENT '父订单编号',  `img_url` varchar(200DEFAULT NULL COMMENT '图片路径',  `province_id` int(20DEFAULT NULL COMMENT '地区',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';
 
  • 订单详情表(order_detail)
CREATE TABLE `order_detail` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',  `order_id` bigint(20DEFAULT NULL COMMENT '订单编号',  `sku_id` bigint(20DEFAULT NULL COMMENT 'sku_id',  `sku_name` varchar(200DEFAULT NULL COMMENT 'sku名称(冗余)',  `img_url` varchar(200DEFAULT NULL COMMENT '图片名称(冗余)',  `order_price` decimal(10,2DEFAULT NULL COMMENT '购买价格(下单时sku价格)',  `sku_num` varchar(200DEFAULT NULL COMMENT '购买个数',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';
 
  • 商品表(sku_info)
CREATE TABLE `sku_info` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',  `spu_id` bigint(20DEFAULT NULL COMMENT 'spuid',  `price` decimal(10,0DEFAULT NULL COMMENT '价格',  `sku_name` varchar(200DEFAULT NULL COMMENT 'sku名称',  `sku_desc` varchar(2000DEFAULT NULL COMMENT '商品规格描述',  `weight` decimal(10,2DEFAULT NULL COMMENT '重量',  `tm_id` bigint(20DEFAULT NULL COMMENT '品牌(冗余)',  `category3_id` bigint(20DEFAULT NULL COMMENT '三级分类id(冗余)',  `sku_default_img` varchar(200DEFAULT NULL COMMENT '默认显示图片(冗余)',  `create_time` datetime DEFAULT NULL COMMENT '创建时间',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';
 
  • 商品一级类目表(base_category1)
CREATE TABLE `base_category1` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(10NOT NULL COMMENT '分类名称',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';
 
  • 商品二级类目表(base_category2)
CREATE TABLE `base_category2` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200NOT NULL COMMENT '二级分类名称',  `category1_id` bigint(20DEFAULT NULL COMMENT '一级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';
 
  • 商品三级类目表(base_category3)
CREATE TABLE `base_category3` (  `id` bigint(20NOT NULL AUTO_INCREMENT COMMENT '编号',  `name` varchar(200NOT NULL COMMENT '三级分类名称',  `category2_id` bigint(20DEFAULT NULL COMMENT '二级分类编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';
 
  • 省份表(base_province)
CREATE TABLE `base_province` (  `id` int(20DEFAULT NULL COMMENT 'id',  `name` varchar(20DEFAULT NULL COMMENT '省名称',  `region_id` int(20DEFAULT NULL COMMENT '大区id',  `area_code` varchar(20DEFAULT NULL COMMENT '行政区位码') ENGINE=InnoDB DEFAULT CHARSET=utf8;
 
  • 区域表(base_region)
CREATE TABLE `base_region` (  `id` int(20NOT NULL COMMENT '大区id',  `region_name` varchar(20DEFAULT NULL COMMENT '大区名称',   PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;   

数据处理流程 

ODS层数据同步

关于ODS层的数据同步参见我的另一篇文章基于Canal与Flink实现数据实时增量同步(一)。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

基于Flink1.11的SQL构建实时数仓怎么实现 

DIM层维表数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表商品维表。处理过程如下:

  • 区域维表

首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

-- ---------------------------   省份--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_province`;CREATE TABLE `ods_base_province` (  `id` INT,  `name` STRING,  `region_id` INT ,  `area_code`STRING) WITH('connector' = 'kafka''topic' = 'mydw.base_province''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------   省份--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_province`;CREATE TABLE `base_province` (    `id` INT,    `name` STRING,    `region_id` INT ,    `area_code`STRING,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_province', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------   省份--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_provinceSELECT *FROM ods_base_province;-- ---------------------------   区域--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_region`;CREATE TABLE `ods_base_region` (  `id` INT,  `region_name` STRING) WITH('connector' = 'kafka''topic' = 'mydw.base_region''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------   区域--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_region`;CREATE TABLE `base_region` (    `id` INT,    `region_name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_region', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------   区域--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_regionSELECT *FROM ods_base_region;
 

经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

-- ----------------------------------- DIM层,区域维表,-- 在MySQL中创建视图-- ---------------------------------DROP VIEW IF EXISTS dim_province;CREATE VIEW dim_province ASSELECT  bp.id AS province_id,  bp.name AS province_name,  br.id AS region_id,  br.region_name AS region_name,  bp.area_code AS area_codeFROM base_region br      JOIN base_province bp ON br.id= bp.region_id;
 

这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

-- ---------------------------  一级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category1`;CREATE TABLE `ods_base_category1` (  `id` BIGINT,  `name` STRING)WITH'connector' = 'kafka''topic' = 'mydw.base_category1''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;-- ---------------------------  一级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category1`;CREATE TABLE `base_category1` (    `id` BIGINT,    `name` STRING,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category1', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------  一级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category1SELECT *FROM ods_base_category1;-- ---------------------------  二级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category2`;CREATE TABLE `ods_base_category2` (  `id` BIGINT,  `name` STRING,  `category1_id` BIGINT)WITH('connector' = 'kafka''topic' = 'mydw.base_category2''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ;-- ---------------------------  二级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category2`;CREATE TABLE `base_category2` (    `id` BIGINT,    `name` STRING,    `category1_id` BIGINT,     PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category2', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------  二级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category2SELECT *FROM ods_base_category2;-- --------------------------- 三级类目表--   kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_base_category3`;CREATE TABLE `ods_base_category3` (  `id` BIGINT,  `name` STRING,  `category2_id` BIGINT)WITH('connector' = 'kafka''topic' = 'mydw.base_category3''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------  三级类目表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `base_category3`;CREATE TABLE `base_category3` (    `id` BIGINT,    `name` STRING,    `category2_id` BIGINT,    PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'base_category3', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------  三级类目表--   MySQL Sink Load Data-- ------------------------- INSERT INTO base_category3SELECT *FROM ods_base_category3;-- ---------------------------   商品表--   Kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_sku_info`;CREATE TABLE `ods_sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0)) WITH'connector' = 'kafka''topic' = 'mydw.sku_info''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------   商品表--   MySQL Sink-- ------------------------- DROP TABLE IF EXISTS `sku_info`;CREATE TABLE `sku_info` (  `id` BIGINT,  `spu_id` BIGINT,  `price` DECIMAL(10,0),  `sku_name` STRING,  `sku_desc` STRING,  `weight` DECIMAL(10,2),  `tm_id` BIGINT,  `category3_id` BIGINT,  `sku_default_img` STRING,  `create_time` TIMESTAMP(0),   PRIMARY KEY (tm_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'sku_info', -- MySQL中的待插入数据的表    'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'sink.buffer-flush.interval' = '1s');-- ---------------------------   商品--   MySQL Sink Load Data-- ------------------------- INSERT INTO sku_infoSELECT *FROM ods_sku_info;
 

经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

-- ----------------------------------- DIM层,商品维表,-- 在MySQL中创建视图-- ---------------------------------CREATE VIEW dim_sku_info ASSELECT  si.id AS id,  si.sku_name AS sku_name,  si.category3_id AS c3_id,  si.weight AS weight,  si.tm_id AS tm_id,  si.price AS price,  si.spu_id AS spu_id,  c3.name AS c3_name,  c2.id AS c2_id,  c2.name AS c2_name,  c3.id AS c1_id,  c3.name AS c1_nameFROM(  sku_info si   JOIN base_category3 c3 ON si.category3_id = c3.id  JOIN base_category2 c2 ON c3.category2_id =c2.id  JOIN base_category1 c1 ON c2.category1_id = c1.id);
 

至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

 

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

-- ---------------------------   订单详情--   Kafka Source-- ------------------------- DROP TABLE IF EXISTS `ods_order_detail`;CREATE TABLE `ods_order_detail`(  `id` BIGINT,  `order_id` BIGINT,  `sku_id` BIGINT,  `sku_name` STRING,  `img_url` STRING,  `order_price` DECIMAL(10,2),  `sku_num` INT,  `create_time` TIMESTAMP(0)) WITH( 'connector' = 'kafka''topic' = 'mydw.order_detail''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ---------------------------   订单信息--   Kafka Source-- -------------------------DROP TABLE IF EXISTS `ods_order_info`;CREATE TABLE `ods_order_info` (  `id` BIGINT,  `consignee` STRING,  `consignee_tel` STRING,  `total_amount` DECIMAL(10,2),  `order_status` STRING,  `user_id` BIGINT,  `payment_way` STRING,  `delivery_address` STRING,  `order_comment` STRING,  `out_trade_no` STRING,  `trade_body` STRING,  `create_time` TIMESTAMP(0) ,  `operate_time` TIMESTAMP(0) ,  `expire_time` TIMESTAMP(0) ,  `tracking_no` STRING,  `parent_order_id` BIGINT,  `img_url` STRING,  `province_id` INT) WITH('connector' = 'kafka''topic' = 'mydw.order_info''properties.bootstrap.servers' = 'kms-3:9092''properties.group.id' = 'testGroup''format' = 'canal-json' , 'scan.startup.mode' = 'earliest-offset' ) ; -- ----------------------------------- DWD层,支付订单明细表dwd_paid_order_detail-- ---------------------------------DROP TABLE IF EXISTS dwd_paid_order_detail;CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,0),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DWD层,已支付订单明细表-- 向dwd_paid_order_detail装载数据-- ---------------------------------INSERT INTO dwd_paid_order_detailSELECT  od.id,  oi.id order_id,  oi.user_id,  oi.province_id,  od.sku_id,  od.sku_name,  od.sku_num,  od.order_price,  oi.create_time,  oi.operate_timeFROM    (    SELECT *     FROM ods_order_info    WHERE order_status = '2' -- 已支付    ) oi JOIN    (    SELECT *    FROM ods_order_detail    ) od     ON oi.id = od.order_id;
 

基于Flink1.11的SQL构建实时数仓怎么实现

 

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

CREATE TABLE ads.ads_province_index(  province_id INT(10),  area_code VARCHAR(100),  province_name VARCHAR(100),  region_id INT(10),  region_name VARCHAR(100),  order_amount DECIMAL(10,2),  order_count BIGINT(10),  dt VARCHAR(100),  PRIMARY KEY (province_id, dt) ) ;
 

向MySQL的ADS层目标装载数据:

-- Flink SQL Cli操作-- ----------------------------------- 使用 DDL创建MySQL中的ADS层表-- 指标:1.每天每个省份的订单数--      2.每天每个省份的订单金额-- ---------------------------------CREATE TABLE ads_province_index(  province_id INT,  area_code STRING,  province_name STRING,  region_id INT,  region_name STRING,  order_amount DECIMAL(10,2),  order_count BIGINT,  dt STRING,  PRIMARY KEY (province_id, dt) NOT ENFORCED  ) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/ads',    'table-name' = 'ads_province_index',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe');-- ----------------------------------- dwd_paid_order_detail已支付订单明细宽表-- ---------------------------------CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,2),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_province_index-- 订单汇总临时表-- ---------------------------------CREATE TABLE tmp_province_index(    province_id INT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    pay_date DATE)WITH (    'connector' = 'kafka',    'topic' = 'tmp_province_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_province_index-- 订单汇总临时表数据装载-- ---------------------------------INSERT INTO tmp_province_indexSELECT      province_id,      count(distinct order_id) order_count,-- 订单数      sum(order_price * sku_num) order_amount, -- 订单金额      TO_DATE(pay_time,'yyyy-MM-dd') pay_dateFROM dwd_paid_order_detailGROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd');-- ----------------------------------- tmp_province_index_source-- 使用该临时汇总表,作为数据源-- ---------------------------------CREATE TABLE tmp_province_index_source(    province_id INT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    pay_date DATE,    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列 ) WITH (    'connector' = 'kafka',    'topic' = 'tmp_province_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DIM层,区域维表,-- 创建区域维表数据源-- ---------------------------------DROP TABLE IF EXISTS `dim_province`;CREATE TABLE dim_province (  province_id INT,  province_name STRING,  area_code STRING,  region_id INT,  region_name STRING ,  PRIMARY KEY (province_id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'dim_province',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'scan.fetch-size' = '100');-- ----------------------------------- 向ads_province_index装载数据-- 维表JOIN-- ---------------------------------INSERT INTO ads_province_indexSELECT  pc.province_id,  dp.area_code,  dp.province_name,  dp.region_id,  dp.region_name,  pc.order_amount,  pc.order_count,  cast(pc.pay_date as VARCHAR)FROMtmp_province_index_source pc  JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp   ON dp.province_id = pc.province_id;
 

当提交任务之后:观察Flink WEB UI:

基于Flink1.11的SQL构建实时数仓怎么实现

查看ADS层的ads_province_index表数据:

基于Flink1.11的SQL构建实时数仓怎么实现

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

CREATE TABLE ads_sku_index(  sku_id BIGINT(10),  sku_name VARCHAR(100),  weight DOUBLE,  tm_id BIGINT(10),  price DOUBLE,  spu_id BIGINT(10),  c3_id BIGINT(10),  c3_name VARCHAR(100) ,  c2_id BIGINT(10),  c2_name VARCHAR(100),  c1_id BIGINT(10),  c1_name VARCHAR(100),  order_amount DOUBLE,  order_count BIGINT(10),  sku_count BIGINT(10),  dt varchar(100),  PRIMARY KEY (sku_id,dt));
 

向MySQL的ADS层目标装载数据:

-- ----------------------------------- 使用 DDL创建MySQL中的ADS层表-- 指标:1.每天每个商品对应的订单个数--      2.每天每个商品对应的订单金额--      3.每天每个商品对应的数量-- ---------------------------------CREATE TABLE ads_sku_index(  sku_id BIGINT,  sku_name VARCHAR,  weight DOUBLE,  tm_id BIGINT,  price DOUBLE,  spu_id BIGINT,  c3_id BIGINT,  c3_name VARCHAR ,  c2_id BIGINT,  c2_name VARCHAR,  c1_id BIGINT,  c1_name VARCHAR,  order_amount DOUBLE,  order_count BIGINT,  sku_count BIGINT,  dt varchar,  PRIMARY KEY (sku_id,dt) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/ads',    'table-name' = 'ads_sku_index',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe');-- ----------------------------------- dwd_paid_order_detail已支付订单明细宽表-- ---------------------------------CREATE TABLE dwd_paid_order_detail(  detail_id BIGINT,  order_id BIGINT,  user_id BIGINT,  province_id INT,  sku_id BIGINT,  sku_name STRING,  sku_num INT,  order_price DECIMAL(10,2),  create_time STRING,  pay_time STRING ) WITH (    'connector' = 'kafka',    'topic' = 'dwd_paid_order_detail',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_sku_index-- 商品指标统计-- ---------------------------------CREATE TABLE tmp_sku_index(    sku_id BIGINT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额 order_sku_num BIGINT,    pay_date DATE)WITH (    'connector' = 'kafka',    'topic' = 'tmp_sku_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- tmp_sku_index-- 数据装载-- ---------------------------------INSERT INTO tmp_sku_indexSELECT      sku_id,      count(distinct order_id) order_count,-- 订单数      sum(order_price * sku_num) order_amount, -- 订单金额   sum(sku_num) order_sku_num,      TO_DATE(pay_time,'yyyy-MM-dd') pay_dateFROM dwd_paid_order_detailGROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd');-- ----------------------------------- tmp_sku_index_source-- 使用该临时汇总表,作为数据源-- ---------------------------------CREATE TABLE tmp_sku_index_source(    sku_id BIGINT,    order_count BIGINT,-- 订单数    order_amount DECIMAL(10,2), -- 订单金额    order_sku_num BIGINT,    pay_date DATE,    proctime as PROCTIME()   -- 通过计算列产生一个处理时间列 ) WITH (    'connector' = 'kafka',    'topic' = 'tmp_sku_index',    'scan.startup.mode' = 'earliest-offset',    'properties.bootstrap.servers' = 'kms-3:9092',    'format' = 'changelog-json');-- ----------------------------------- DIM层,商品维表,-- 创建商品维表数据源-- ---------------------------------DROP TABLE IF EXISTS `dim_sku_info`;CREATE TABLE dim_sku_info (  id BIGINT,  sku_name STRING,  c3_id BIGINT,  weight DECIMAL(10,2),  tm_id BIGINT,  price DECIMAL(10,2),  spu_id BIGINT,  c3_name STRING,  c2_id BIGINT,  c2_name STRING,  c1_id BIGINT,  c1_name STRING,  PRIMARY KEY (id) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://kms-1:3306/dim',    'table-name' = 'dim_sku_info',     'driver' = 'com.mysql.jdbc.Driver',    'username' = 'root',    'password' = '123qwe',    'scan.fetch-size' = '100');-- ----------------------------------- 向ads_sku_index装载数据-- 维表JOIN-- ---------------------------------INSERT INTO ads_sku_indexSELECT  sku_id ,  sku_name ,  weight ,  tm_id ,  price ,  spu_id ,  c3_id ,  c3_name,  c2_id ,  c2_name ,  c1_id ,  c1_name ,  sc.order_amount,  sc.order_count ,  sc.order_sku_num ,  cast(sc.pay_date as VARCHAR)FROMtmp_sku_index_source sc   JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds  ON ds.id = sc.sku_id  ;
 

当提交任务之后:观察Flink WEB UI:

基于Flink1.11的SQL构建实时数仓怎么实现

查看ADS层的ads_sku_index表数据:

基于Flink1.11的SQL构建实时数仓怎么实现

 

FineBI结果展示

基于Flink1.11的SQL构建实时数仓怎么实现 

其他注意点 

Flink1.11.0存在的bug

当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:

[ERROR] Could not execute SQL statement. Reason:org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])
 

该bug目前已被修复,修复可以在Flink1.11.1中使用。 

到此,相信大家对“基于Flink1.11的SQL构建实时数仓怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:https://my.oschina.net/maoxiang/blog/4498603

AI

开发者交流群×