学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark
我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:
一、非lambda实现的java spark wordcount程序:
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//JavaPairRDD<LongWritable, Text> inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",
// TextInputFormat.class, LongWritable.class, Text.class);
JavaRDD<String> inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");
JavaRDD<String> wordsRDD = inputRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<String, Integer> keyValueWordsRDD
= wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> wordCountRDD =
keyValueWordsRDD.reduceByKey(new HashPartitioner(2),
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
});
//如果输出文件存在的话需要删除掉
File outputFile = new File("/Users/tangweiqun/wordcount");
if (outputFile.exists()) {
File[] files = outputFile.listFiles();
for(File file: files) {
file.delete();
}
outputFile.delete();
}
wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");
System.out.println(wordCountRDD.collect());
}
}
二、java8 lambda实现的wordcount代码
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("appName").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//JavaPairRDD<LongWritable, Text> inputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt",
// TextInputFormat.class, LongWritable.class, Text.class);
JavaRDD<String> inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt");
JavaRDD<String> wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator());
JavaPairRDD<String, Integer> keyValueWordsRDD
= wordsRDD.mapToPair(word -> new Tuple2<String, Integer>(word, 1));
JavaPairRDD<String, Integer> wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b);
//如果输出文件存在的话需要删除掉
File outputFile = new File("/Users/tangweiqun/wordcount");
if (outputFile.exists()) {
File[] files = outputFile.listFiles();
for(File file: files) {
file.delete();
}
outputFile.delete();
}
wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount");
System.out.println(wordCountRDD.collect());
}
}
从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。
我们在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代码:
JavaPairRDD<String, Integer> javaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);
//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function<Integer, Tuple2<Integer, Integer>> createCombiner = new Function<Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Integer value) throws Exception {
return new Tuple2<>(value, 1);
}
};
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue =
new Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc, Integer value) throws Exception {
return new Tuple2<>(acc._1() + value, acc._2() + 1);
}
};
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners =
new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> acc1, Tuple2<Integer, Integer> acc2) throws Exception {
return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
}
};
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD =
javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
可以写成如下的lambda实现的combineByKey:
JavaPairRDD<String, Integer> javaPairRDD =
sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2),
new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2);
//当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数
Function<Integer, Tuple2<Integer, Integer>> createCombiner = value -> new Tuple2<>(value, 1);
//当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数
Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>> mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1);
//当需要对不同分区的数据进行聚合的时候应用这个函数
Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2());
JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD =
javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners);
//结果:[(coffee,(12,3)), (panda,(3,1))]
System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。