本篇文章为大家展示了Spark RDD的collect action 不适用于单个element size过大的示例分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
collect是Spark RDD一个非常易用的action,通过collect可以轻易获得一个RDD当中所有的elements。当这些elements是String类型的时候,可以轻易将整个RDD转化成一个List<String>,简直不要太好用。
不过等一等,这么好用的action有一个弱点,它不适合size比较的element。举个例子来说吧。请看下面这段代码:
... ...
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>(){
@Override
public Void call(JavaRDD<String> strJavaRDD) throws Exception {
List<String> messages = strJavaRDD.collect();
List<String> sizeStrs = new ArrayList<String>();
for (String message: messages) {
if (message== null)
continue;
String logStr = "message size is " + message.length();
strs.add(logStr);
}
saveToLog(outputLogPath, strs);
return null;
}
});
... ...
上述这段代码当Kafka中单个message(也就是)的size很小(比如200Bytes)的时候,运行得很好。可是当单个message size变大到一定程度(例如10MB),就会抛出以下异常:
sparkDriver-akka.actor.default-dispatcher-18 2015-10-15 21:52:28,606 ERROR JobSc
heduler - Error running job streaming job 1444971120000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 238.0 failed 4 times, most recent failure: Lost task 0.3 in stage 238.0 (TID421, 127.0.0.1): ExecutorLostFailure (executor 123 lost)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1215)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1204)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1203)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1404)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1365)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
原因很简单,collect()无法handle“大数据”。对于10MB size这样的单条message。我们可以用下面这段代码替代上面最后一部分:
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> strJavaRDD) throws Exception {
JavaRDD<String> sizeRDD = strJavaRDD.map(new Function<String, String>() {
@Override
public String call(String message) throws Exception {
if (message == null)
return null;
String logStr = "Message size is " + message.length();
return logStr;
}
});
List<String> sizeStrs = sizeRDD.collect();
saveToLog(outputLogPat, sizeStrs);
return null;
}
});
上述内容就是Spark RDD的collect action 不适用于单个element size过大的示例分析,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。