这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。
//自定义分区
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
object PrimitivePartitionTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf.setMaster("local[2]").setAppName("Partitioner")
val context = new SparkContext(conf)
val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
//实例化类,并设置分区类
val partitioner = new CustomPartitioner(2)
val rdd1 = rdd.partitionBy(partitioner)
rdd1.saveAsTextFile("c:\\partitioner")
context.stop()
}
}
//自定义分区类继承spark的Partitioner
class CustomPartitioner(val partitions:Int ) extends Partitioner{
def numPartitions: Int= this.partitions
def getPartition(key: Any): Int={
if(key.toString().length()<=2)
0
else
1
}
}
//自定义排序
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.math.Ordered
//自定义排序第一种实现方式,通过继承ordered
class Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{
def compare(that: Student): Int={
return this.age-that.age
}
}
class Boy(val name:String,var age:Int) extends Serializable{
}
//第二种方式通过实现隐式转换实现
object MyPredef{
implicit def toOrderBoy = new Ordering[Boy]{
def compare(x: Boy, y: Boy): Int={
x.age - y.age
}
}
}
//引入隐式转换
import MyPredef._
object CutstomOrder {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]").setAppName("CutstomOrder")
val context = new SparkContext(conf)
val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
//下面的第二个参数false为降序排列
//val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1)
val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1)
rdd_sorted.saveAsTextFile("d:\\ordered")
context.stop()
}
}
//JDBC
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import scala.collection.mutable.ListBuffer
object DataFromJdbcToSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]").setAppName("BroadCastTest")
val context = new SparkContext(conf)
val sql = "select name,age from test where id>=? and id <=?"
var list = new ListBuffer[(String,Int)]()
//第七个参数是一个自定义的函数,spark会调用该函数,完成自定义的逻辑,y的数据类型是ResultSet,该函数不可以想自己定义的数组添加数据,
//应为应用的函数会将结果保存在JdbcRDD中
val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{
(y.getString(1),y.getInt(2))
})
println(jdbcRDD.collect().toBuffer)
context.stop()
}
def getConnection():Connection={
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
conn
}
}
//----------------------------------------------------------------------
package hgs.spark.othertest
import java.sql.Connection
import java.sql.DriverManager
import org.apache.commons.dbutils.QueryRunner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//将spark计算后的结果录入数据库
object DataFromSparktoJdbc {
def main(args: Array[String]): Unit = {
val conf = new SparkConf
conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc")
val context = new SparkContext(conf)
val addressrdd= context.textFile("d:\\words")
val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
//println(words.partitions.size)
var p:Int =0
words.foreachPartition(iter=>{
//每个分区一个链接
val qr = new QueryRunner()
val conn = getConnection
println(conn)
val sql = s"insert into words values(?,?)"
//可以修改为批量插入效率更高
while(iter.hasNext){
val tpm = iter.next()
val obj1 :Object = tpm._1
val obj2 :Object = new Integer(tpm._2)
//obj1+conn.toString()可以看到数据库的插入数据作用有三个不同的链接
qr.update(conn, sql,obj1+conn.toString(),obj2)
}
//println(conn)
//println(p)
conn.close()
})
words.saveAsTextFile("d:\\wordresult")
}
def getConnection():Connection={
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
conn
}
}
//广播变量
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object BroadCastTest{
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[2]").setAppName("BroadCastTest")
val context = new SparkContext(conf)
val addressrdd= context.textFile("d:\\address")
val splitaddrdd = addressrdd.map(x=>{
val cs = x.split(",")
(cs(0),cs(1))
}).collect().toMap
//广播变量,数据被缓存在每个节点,减少了节点之间的数据传送,可以有效的增加效率,广播出去的可以是任意的数据类型
val maprdd = context.broadcast(splitaddrdd)
val namerdd = context.textFile("d:\\name")
val result = namerdd.map(x=>{
//该出使用了广播的出去的数组
maprdd.value.getOrElse(x, "UnKnown")
})
println(result.collect().toBuffer)
context.stop()
}
}
其他一些知识点
1.spark 广播变量 rdd.brodcastz(rdd),广播变量的用处是将数据汇聚传输到各个excutor上面
,这样在做数据处理的时候减少了数据的传输
2.wordcount程序
context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
wordcount程序代码,一个wordcount会产生5个RDD
sc.textFile() 会产生两个RDD 1.HadoopRDD-> MapPartitionsRDD
flatMap() 会产生MapPartitionsRDD
map 会产生MapPartitionsRDD
reduceByKey 产生ShuuledRDD
saveAsTextFile
3.缓存数据到内存 rdd.cache 清理缓存 rdd.unpersist(true),rdd.persist存储及级别 cache方法调用的是persist方法
4.spark 远程debug,需要设置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")
关于怎么理解spark的自定义分区和排序及spark与jdbc就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2216316/