温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark2.x由浅入深深到底系列六之RDD java api用JdbcRDD读取关系型数据库

发布时间:2020-07-22 00:30:35 来源:网络 阅读:2629 作者:tangweiqun 栏目:大数据

学习任何的spark技术之前,请先正确理解spark,可以参考:正确理解spark



以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:

package com.twq.javaapi.java7;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.JdbcRDD;

import java.io.Serializable;
import java.sql.*;

public class JavaJdbcRDDSuite implements Serializable {

    public static void prepareData() throws ClassNotFoundException, SQLException {
        //使用本地数据库derby,当然可以使用mysql等关系型数据库
        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
        Connection connection =
                DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");

        try {
            //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列
            Statement create = connection.createStatement();
            create.execute(
                    "CREATE TABLE FOO(" +
                            "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +
                            "DATA INTEGER)");
            create.close();

            //插入数据
            PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");
            for (int i = 1; i <= 5; i++) {
                insert.setInt(1, i * 2);
                insert.executeUpdate();
            }
            insert.close();
        } catch (SQLException e) {
            // If table doesn't exist...
            if (e.getSQLState().compareTo("X0Y32") != 0) {
                throw e;
            }
        } finally {
            connection.close();
        }
    }

    public static void shutdownDB() throws SQLException {
        try {
            DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");
        } catch (SQLException e) {
            // Throw if not normal single database shutdown
            // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html
            if (e.getSQLState().compareTo("08006") != 0) {
                throw e;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");

        //准备数据
        prepareData();

        //构建JdbcRDD
        JavaRDD<Integer> rdd = JdbcRDD.create(
                sc,
                new JdbcRDD.ConnectionFactory() {
                    @Override
                    public Connection getConnection() throws SQLException {
                        return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
                    }
                },
                "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
                1, 5, 1,
                new Function<ResultSet, Integer>() {
                    @Override
                    public Integer call(ResultSet r) throws Exception {
                        return r.getInt(1);
                    }
                }
        );
        //结果: [2, 4, 6, 8, 10]
        System.out.println(rdd.collect());

        shutdownDB();

        sc.stop();
    }
}


详细了解RDD的api的话,可以参考: spark core RDD api原理详解

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI