如何使用Apache Flink实现自定义Sink,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
socket发送过来的数据,把String类型转成对象,然后把Java对象保存到Mysql数据库中。
创建数据库和表
create database imooc_flink;
create table student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(25),
age int(10),
primary key(id)
)
导入mysql依赖:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
创建POJO Student
package com.vincent.course05;
public class Student {
private int id;
private String name;
private int age;
@Override
public String toString() {
return "Student{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
'}';
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
然后创建连接,SinkToMySQL.java
package com.vincent.course05;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class SinkToMySQL extends RichSinkFunction<Student> {
PreparedStatement ps;
private Connection connection;
/**
* open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = getConnection();
String sql = "insert into student(id, name, age) values(?, ?, ?);";
ps = this.connection.prepareStatement(sql);
}
@Override
public void close() throws Exception {
super.close();
//关闭连接和释放资源
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
/**
* 每条数据的插入都要调用一次 invoke() 方法
*
* @param value
* @param context
* @throws Exception
*/
@Override
public void invoke(Student value, Context context) throws Exception {
//组装数据,执行插入操作
ps.setInt(1, value.getId());
ps.setString(2, value.getName());
ps.setInt(3, value.getAge());
ps.executeUpdate();
}
private static Connection getConnection() {
Connection con = null;
try {
Class.forName("com.mysql.cj.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
} catch (Exception e) {
e.printStackTrace();
System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
}
return con;
}
}
main方法:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);
SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {
@Override
public Student map(String value) throws Exception {
String[] splits = value.split(",");
Student student = new Student();
student.setId(Integer.parseInt(splits[0]));
student.setName(splits[1]);
student.setAge(Integer.parseInt(splits[2]));
return student;
}
});
studentStream.addSink(new SinkToMySQL());
environment.execute("JavaCustomSinkToMysql");
}
从socket中获取数据,数据格式使用逗号分割,在控制台中输入:
nc -lk 9999
1,tom,23
检查数据库,数据库中多了一条数据
mysql> select * from student;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | tom | 23 |
+----+------+------+
1 row in set (0.00 sec)
这样就很方便的使用自定义的sink,写入到MySQL中去。
总结:
第一步:继承RichSinkFunction<T> T就是想要写入的对象类型
第二步:重写方法 open/close生命周期方法,invoke每条记录执行一次
默认情况下open方法的并行度不是1,跟具体的电脑有关系。
看完上述内容,你们掌握如何使用Apache Flink实现自定义Sink的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/duanvincent/blog/3104979