温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark常用算子有哪些

发布时间:2021-12-17 13:46:40 阅读:183 作者:柒染 栏目:大数据
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章将为大家详细讲解有关spark常用算子有哪些,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

一些经常用到的RDD算子
map:将rdd的值输入,并返回一个自定义的类型,如下输入原始类型,输出一个tuple类型的数组
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd1.map((_,1)).collect
res1Array[(StringInt)] = Array((a,1), (b,1), (c,1), (d,1))                  
-----------------------------------------------------------------------------------------------------------------
mapPartitionsWithIndex:输出数据对应的分区以及分区的值
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala>     val func = (xpar:Int,y:Iterator[String])=>{
     |       y.toList.map(x=>"partition:"+xpar+" value:"+x).iterator
     |     }
func(Int, Iterator[String]) => Iterator[String] = <function2>
scala> rdd1.mapPartitionsWithIndex(func).collect
res2Array[String] = Array(partition:0 value:a, partition:0 value:b, partition:1 value:c, partition:1 value:d)
----------------------------------------------------------------------------------------------------------------------
aggregate(zeroValue)(seqOp, combOp):对rdd的数据先按照分区汇总然后将分区的数据在汇总(迭代汇总,seqOp或者combOp的值会和下一个值进行比较)
scala> val rdd1 = sc.parallelize(List("a","b","c","d"),2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd1.aggregate("")(_+_,_+_)
res3String = abcd
-----------------------------------------------------------------------------------------------------------------------
aggregateByKey:适用于那种键值对类型的RDD,会根据key进行对value的操作,类似aggregate
scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,2),(2,3)), 2)
rdd: org.apache.spark.rdd.RDD[(IntInt)] = ParallelCollectionRDD[12] at parallelize at <console>:24
scala> rdd.aggregateByKey(0)((x,y)=>x+y, (x,y)=>(x+y)).collect
res36Array[(IntInt)] = Array((2,5), (1,3))
-------------------------------------------------------------------------------------------------------------------------
coalesce, repartition:repartition与coalesce相似,只不过repartition内部调用了coalesce,coalesce传入的参数比repartition传入的参数多一个,repartition有该参数的默认值,即:是否进行shuffule
scala> val rdd = sc.parallelize(List(1,2,3,4,5), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
scala> rdd.repartition(3)
res42: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at repartition at <console>:27
scala> res42.partitions.length
res43Int = 3
-----------------------------------------------------------------------------------------------------------------------
collectAsMap:将结果一map方式展示
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22)), 2)
rdd: org.apache.spark.rdd.RDD[(StringInt)] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> rdd.collectAsMap
res44: scala.collection.Map[String,Int] = Map(b -> 10, a -> 2, x -> 22)
-----------------------------------------------------------------------------------------------------------------------
combineByKey : 和reduceByKey是相同的效果。需要三个参数 第一个每个key对应的value 第二个,局部的value操作, 第三个:全局value操作
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(StringInt)] = ParallelCollectionRDD[30] at parallelize at <console>:24
scala> rdd.combineByKey(x=>x, (a:Int,b:Int)=>a+b, (a:Int,b:Int)=>a+b)
res45: org.apache.spark.rdd.RDD[(StringInt)] = ShuffledRDD[31] at combineByKey at <console>:27
scala> res45.collect
res46Array[(StringInt)] = Array((x,111), (b,10), (a,202))
---------------------------------------------------------------------------------------------------------------------------
countByKey:通过Key统计条数
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(StringInt)] = ParallelCollectionRDD[33] at parallelize at <console>:24
scala> rdd.countByKey
res49: scala.collection.Map[String,Long] = Map(x -> 2, b -> 1, a -> 2)
------------------------------------------------------------------------------------------------------------------------
filterByRange:返回符合过滤返回的数据
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(StringInt)] = ParallelCollectionRDD[36] at parallelize at <console>:24
scala> rdd.filterByRange("a","b")
res51: org.apache.spark.rdd.RDD[(StringInt)] = MapPartitionsRDD[37] at filterByRange at <console>:27
scala> res51.collect
res52Array[(StringInt)] = Array((a,2), (b,10), (a,200))
------------------------------------------------------------------------------------------------------------
flatMapValues
scala>  val rdd = sc.parallelize(List(("a"->"1 2 3 "),("b"->"1 2 3 "),("x"->"1 2 3 "),("a"->"1 2 3 "),("x"->"1 2 3 ")), 2)
rdd: org.apache.spark.rdd.RDD[(StringString)] = ParallelCollectionRDD[39] at parallelize at <console>:24
scala>  rdd.flatMapValues(x=>x.split(" ")).collect
res53Array[(StringString)] = Array((a,1), (a,2), (a,3), (b,1), (b,2), (b,3), (x,1), (x,2), (x,3), (a,1), (a,2), (a,3), (x,1), (x,2), (x,3))
----------------------------------------------------------------------------------------------------------------
foldByKey:通过key聚集数据然后做操作
scala> val rdd = sc.parallelize(List(("a",2),("b",10),("x",22),("a",200),("x",89)), 2)
rdd: org.apache.spark.rdd.RDD[(StringInt)] = ParallelCollectionRDD[41] at parallelize at <console>:24
scala> rdd.foldByKey(0)(_+_).collect
res55Array[(StringInt)] = Array((x,111), (b,10), (a,202))
----------------------------------------------------------------------------------------------------------------
keyBy : 以传入的参数做key
scala> val rdd1 = sc.parallelize(List("dog""salmon""salmon""rat""elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[43] at parallelize at <console>:24
scala> val rdd2 = rdd1.keyBy(_.length).collect
rdd2Array[(IntString)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
----------------------------------------------------------------------------------------------------------------
keys values
scala> val rdd1 = sc.parallelize(List("dog""salmon""salmon""rat""elephant"), 3)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[45] at parallelize at <console>:24
scala> val rdd2 = rdd1.map(x=>(x.length,x))
rdd2: org.apache.spark.rdd.RDD[(IntString)] = MapPartitionsRDD[47] at map at <console>:26
scala> rdd2.keys
res63: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[48] at keys at <console>:29
scala> rdd2.keys.collect
res64Array[Int] = Array(36638)
scala> rdd2.values.collect
res65Array[String] = Array(dog, salmon, salmon, rat, elephant)

关于spark常用算子有哪些就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:http://blog.itpub.net/31506529/viewspace-2215929/

AI

开发者交流群×