本篇内容介绍了“Flink如何新增connectors模块”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
Flink中的API
Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。
Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性和容错保障的状态。此外,用户可以在此层抽象中注册事件时间(event time)和处理时间(processing time)回调方法,从而允许程序可以实现复杂计算。
Flink API 第二层抽象是 Core APIs。实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:其中包含 DataStream API(应用于有界/无界数据流场景)和 DataSet API(应用于有界数据集场景)两部分。Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。此层 API 中处理的数据类型在每种编程语言中都有其对应的类。
Process Function 这类底层抽象和 DataStream API 的相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。DataSet API 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
Flink API 第三层抽象是 Table API。Table API 是以表(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。
表和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table API 与 DataStream/DataSet API 混合使用。
Flink API 最顶层抽象是 SQL。这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。
DataStream/DateSet API
Flink中的DataStream和DataSet程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。
预定义的 Source 和 Sink
一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义 data sources 支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。 预定义 data sinks 支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。
官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/
DataStream/DateSet API开发
从本篇开始,增加DataStream/DateSet API演示内容,在原有的工程基础上,扩展一个connectors模块;此模块会演示以下几个组件简单使用;
新增connectors模块
在当前工程中,创建名称为connectors的maven工程模块
pom.xml
<artifactId>connectors</artifactId> <dependencies> <!-- Flink jdbc依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.1</version> </dependency> <!-- mysql驱动包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!-- kafka依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- redis依赖 --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <!-- rabbitMq依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- elasticsearch7依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
刷新工程maven,下载相关功能依赖组件包;
创建用户表(演示使用)
-- 数所据库 flink 下创建用户表 CREATE TABLE `t_user` ( `id` int(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(3) DEFAULT NULL, `sex` int(2) DEFAULT NULL, `address` varchar(40) DEFAULT NULL, `createTime` timestamp NULL DEFAULT NULL, `createTimeSeries` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
创建实体Bean(演示使用)
TUser.java
package com.flink.examples; /** * @Description t_user表数据封装类 */ public class TUser { private Integer id; private String name; private Integer age; private Integer sex; private String address; private Long createTimeSeries; public TUser(){} public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public Integer getSex() { return sex; } public void setSex(Integer sex) { this.sex = sex; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Long getCreateTimeSeries() { return createTimeSeries; } public void setCreateTimeSeries(Long createTimeSeries) { this.createTimeSeries = createTimeSeries; } @Override public String toString() { return "TUser{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", sex=" + sex + ", address='" + address + '\'' + ", createTimeSeries=" + createTimeSeries + '}'; } }
TCount.java
package com.flink.examples; /** * @Description 统计表 */ public class TCount { /** * 性别 */ private Integer sex; /** * 数量 */ private Integer num; public TCount(){} public TCount(Integer sex, Integer num){ this.sex = sex; this.num = num; } public Integer getSex() { return sex; } public void setSex(Integer sex) { this.sex = sex; } public Integer getNum() { return num; } public void setNum(Integer num) { this.num = num; } }
工程模块
“Flink如何新增connectors模块”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。