val acc = sc.accumulator(0, “Error Accumulator”)
val data = sc.parallelize(1 to 10)
val newData = data.map(x => {
if (x % 2 == 0) {
accum += 1
}
})
newData.count
acc.value
newData.foreach(println)
acc.value
上述现象,会造成acc.value的最终值变为10
Spark中的一系列transform操作都会构造成一长串的任务链,此时就需要通过一个action操作来触发(lazy的特性),accumulator也是如此。
原因就在于第二次action操作的时候,又执行了一次累加器的操作,同个累加器,在原有的基础上又加了5,从而变成了10
通过上述的现象描述,我们可以很快知道解决的方法:只进行一次action操作。基于此,我们只要切断任务之间的依赖关系就可以了,即使用cache、persist。这样操作之后,那么后续的累加器操作就不会受前面的transform操作影响了
需求
使用Accumulators统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息
数据
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
遇到的坑 & 解决方法
现象描述 & 原因分析:
我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时就需要通过一个action操作来触发; accumulator也是一样的,只有当action操作执行时,才会触发accumulator的执行; 因此在一个action操作之前,我们调用accumulator的value方法是无法查看其数值的,肯定是没有任何变化的; 所以在对normalData进行foreach操作之后,即action操作之后,我们会发现累加器的数值就变成了11; 之后,我们对normalData再进行一次count操作之后,即又一次的action操作之后,其实这时候,又去执行了一次前面的transform操作; 因此累加器的值又增加了11,变成了22
解决办法:
经过上面的分析,我们可以知道,使用累加器的时候,我们只有使用一次action操作才能够保证结果的准确性 因此,我们面对这种情况,是有办法的,做法就是切断它们相互之间的依赖关系即可 因此对normalData使用cache方法,当RDD第一次被计算出来时,就会被直接缓存起来 再调用时,相同的计算操作就不会再重新计算一遍
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark Accumulators完成Job的数据量处理
* 统计emp表中NULL出现的次数以及正常数据的条数 & 打印正常数据的信息
*/
object AccumulatorsApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")
val sc = new SparkContext(conf)
val lines = sc.textFile("E:/emp.txt")
// long类型的累加器值
val nullNum = sc.longAccumulator("NullNumber")
val normalData = lines.filter(line => {
var flag = true
val splitLines = line.split("\t")
for (splitLine <- splitLines){
if ("".equals(splitLine)){
flag = false
nullNum.add(1)
}
}
flag
})
// 使用cache方法,将RDD的第一次计算结果进行缓存;防止后面RDD进行重复计算,导致累加器的值不准确
normalData.cache()
// 打印每一条正常数据
normalData.foreach(println)
// 打印正常数据的条数
println("NORMAL DATA NUMBER: " + normalData.count())
// 打印emp表中NULL出现的次数
println("NULL: " + nullNum.value)
sc.stop()
}
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。