这篇文章主要为大家展示了“Spark-sql如何创建外部分区表”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Spark-sql如何创建外部分区表”这篇文章吧。
一、Spark-sql创建外部分区表
1.使用spark-sql
spark-sql --queue spark --master yarn --deploy-mode client --num-executors 10 --executor-cores 2 --executor-memory 3G
2.spark-sql中创建parquet分区表:
create external table pgls.convert_parq( bill_num string, logis_id string, store_id string, store_code string, creater_id string, order_status INT, pay_status INT, order_require_varieties INT, order_require_amount decimal(19,4), order_rec_amount decimal(19,4), order_rec_gpf decimal(19,4), deli_fee FLOAT, order_type INT, last_modify_time timestamp, order_submit_time timestamp ) partitioned by(order_submit_date date) row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' stored as parquetfile location '/test/spark/convert/parquet/bill_parq/';
二、CSV转Parquet
代码:org.apache.spark.ConvertToParquet.scala
package org.apache.spark import com.ecfront.fs.operation.HDFSOperation import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ /** * CSV 转换为 parquet * 参数:输入路径, 输出路径, 分区数 */ object ConvertToParquet{ def main(args: Array[String]) { if(args.length != 3){ println("jar args: inputFiles outPath numpartitions") System.exit(0) } val inputPath = args(0) val outPath = args(1) val numPartitions = args(2).toInt println("==========================================") println("=========input: "+ inputPath ) println("=========output: "+ outPath ) println("==numPartitions: "+ numPartitions ) println("==========================================") //判断输出目录是否存在,存在则删除 val fo = HDFSOperation(new Configuration()) val existDir = fo.existDir(outPath) if(existDir) { println("HDFS exists outpath: " + outPath) println("start to delete ...") val isDelete = fo.deleteDir(outPath) if(isDelete){ println(outPath +" delete done. ") } } val conf = new SparkConf() val sc = new SparkContext(conf) //参数SparkConf创建SparkContext, val sqlContext = new SQLContext(sc) //参数SparkContext创建SQLContext val schema = StructType(Array( StructField("bill_num",DataTypes.StringType,false), StructField("logis_id",DataTypes.StringType,false), StructField("store_id",DataTypes.StringType,false), StructField("store_code",DataTypes.StringType,false), StructField("creater_id",DataTypes.StringType,false), StructField("order_status",DataTypes.IntegerType,false), StructField("pay_status",DataTypes.IntegerType,false), StructField("order_require_varieties",DataTypes.IntegerType,false), StructField("order_require_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_amount",DataTypes.createDecimalType(19,4),false), StructField("order_rec_gpf",DataTypes.createDecimalType(19,4),false), StructField("deli_fee",DataTypes.FloatType,false), StructField("order_type",DataTypes.IntegerType,false), StructField("last_modify_time",DataTypes.TimestampType,false), StructField("order_submit_time",DataTypes.TimestampType,false), StructField("order_submit_date",DataTypes.DateType,false))) convert(sqlContext, inputPath, schema, outPath, numPartitions) } //CSV转换为parquet def convert(sqlContext: SQLContext, inputpath: String, schema: StructType, outpath: String, numPartitions: Int) { // 将text导入到DataFrame val df = sqlContext.read.format("com.databricks.spark.csv"). schema(schema).option("delimiter", ",").load(inputpath) // 转换为parquet // df.write.parquet(outpath) // 转换时以block数为分区数 df.coalesce(numPartitions).write.parquet(outpath) //自定义分区数 } }
打包后jar上传至本地目录: /soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar 事先在HDFS上生成CSV文件,HDFS目录: /test/spark/convert/data/order/2016-05-01/ 执行命令:
spark-submit --queue spark --master yarn --num-executors 10 --executor-cores 2 --executor-memory 3G --class org.apache.spark.ConvertToParquet --name ConvertToParquet file:/soft/sparkTest/convert/spark.mydemo-1.0-SNAPSHOT.jar /test/spark/convert/data/order/2016-05-01/ /test/spark/convert/parquet/bill_parq/order_submit_date=2016-05-01
pom.xml相关内容:
1.依赖包:
<dependencies> <!-- 操作HDFS --> <dependency> <groupId>com.ecfront</groupId> <artifactId>ez-fs</artifactId> <version>0.9</version> </dependency> <!--spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.1</version> </dependency> <!--spark csv--> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.11</artifactId> <version>1.4.0</version> </dependency> <!--hadoop --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0</version> </dependency> </dependencies>
2.plugins(含打入依赖包)
<build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.1</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.apache.spark.ConvertToParquet</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build>
三、表添加分区
spark-sql下执行
alter table pgls.convert_parq add partition(order_submit_date='2016-05-01');
可通过sql查询到相应数据:
select * from pgls.convert_parq where order_submit_date='2016-05-01' limit 5;
以上是“Spark-sql如何创建外部分区表”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。