这篇文章给大家分享的是有关如何使用java写spark的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
package hgs.spark; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; public class JavaRDDWC { public static void main(String[] args) { //System.setProperty("HADOOP_USER_NAME","administrator"); //需要hadoop windows的winutils.exe System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.1"); SparkConf conf = new SparkConf().setAppName("javawc").setMaster("local[2]"); @SuppressWarnings("resource") JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> rdd = context.textFile("D:\\test.txt"); //split成数组 JavaRDD<String[]> rdd1 = rdd.map(s -> s.split(",")); //只有pairrdd才可以reducebykey JavaPairRDD<String, Integer> rdd2 = rdd1.flatMapToPair(new flatMapFunc()); JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey(new reducefunc()); rdd3.saveAsTextFile("D:\\fff"); context.stop(); } } class reducefunc implements Function2<Integer, Integer, Integer>{ /** * */ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } } class flatmf implements FlatMapFunction<String[], String>{ /** * */ private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String[] t) throws Exception { List<String> list = new ArrayList<>(); for(String str : t) { list.add(str); } return list.iterator(); } } class flatMapFunc implements PairFlatMapFunction<String[], String, Integer>{ /** * */ private static final long serialVersionUID = 1L; @Override public Iterator<Tuple2<String, Integer>> call(String[] t) throws Exception { List<Tuple2<String,Integer>> list = new ArrayList<>(); for(String str : t) { list.add(new Tuple2<String, Integer>(str, 1)); } return list.iterator(); } }
感谢各位的阅读!关于“如何使用java写spark”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。