学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark
以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:
package com.twq.javaapi.java7;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;
import java.io.Serializable;
import java.sql.*;
public class JavaJdbcRDDSuite implements Serializable {
public static void prepareData() throws ClassNotFoundException, SQLException {
//使用本地数据库derby,当然可以使用mysql等关系型数据库
Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
Connection connection =
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");
try {
//创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列
Statement create = connection.createStatement();
create.execute(
"CREATE TABLE FOO(" +
"ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
"DATA INTEGER)");
create.close();
//插入数据
PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
for (int i = 1; i <= 5; i++) {
insert.setInt(1, i * 2);
insert.executeUpdate();
}
insert.close();
} catch (SQLException e) {
// If table doesn't exist...
if (e.getSQLState().compareTo("X0Y32") != 0) {
throw e;
}
} finally {
connection.close();
}
}
public static void shutdownDB() throws SQLException {
try {
DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
} catch (SQLException e) {
// Throw if not normal single database shutdown
// https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
if (e.getSQLState().compareTo("08006") != 0) {
throw e;
}
}
}
public static void main(String[] args) throws Exception {
JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");
//准备数据
prepareData();
//构建JdbcRDD
JavaRDD<Integer> rdd = JdbcRDD.create(
sc,
new JdbcRDD.ConnectionFactory() {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
}
},
"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
1, 5, 1,
new Function<ResultSet, Integer>() {
@Override
public Integer call(ResultSet r) throws Exception {
return r.getInt(1);
}
}
);
//结果: [2, 4, 6, 8, 10]
System.out.println(rdd.collect());
shutdownDB();
sc.stop();
}
}
详细了解RDD的api的话,可以参考: spark core RDD api原理详解
亿速云「云数据库 MySQL」免部署即开即用,比自行安装部署数据库高出1倍以上的性能,双节点冗余防止单节点故障,数据自动定期备份随时恢复。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。