如何使用spark-core实现广度优先搜索,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
数据源是一批网络日志数据,每条数据都有两个字段srcip和dstip,字段之间以逗号分隔,问题的需求是给定一个srcip和dstip,在给定的搜索深度下检索这两个ip之间所有的通联路径。这个问题是网络日志处理中的一个实际需求,之前在单机的程序中实现过,但是需要将所有的ip对加载到内存中。考虑到如果数据量太大的情况,可能单节点的内存无法支撑这样的操作,但是如果不将ip对全加载内存中,使用深度优先遍历的方法,搜索过程又会很慢。最近在学习spark框架,刚接触RDD,就是这用RDD来解决这个问题。以下是scala代码
package com.pxu.spark.core
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
/**
* pxu
* 2021-01-29 16:57
*/
object FindIpRel {
def main(args: Array[String]): Unit = {
val srcIp = args(0) // 源ip
val dstIp = args(1) // 目标ip
val depth = args(2).toInt //搜索深度
val resPath = args(3) //搜索结果的输出位置
val conf = new SparkConf().setAppName("findIpRel")
val sc = new SparkContext(conf)
/**
* 从数据源中构建原始rdd,每一行的数据形式为a,b
*/
val ori = sc.textFile("hdfs://master:9000/submitTest/input/ipconn/srcdst.csv")
/**
* 对原始Rdd进行元组形式转化,现在每一行的数据形式为(a,b)
* 除此之外还对数据进行了去重处理,并显示使用hash分区器对RDD中的数据进行分区
* 为后面的join操作,做一些优化
*/
val base = ori.map(a => {
val tmpArr = a.split(",")
(tmpArr(0), tmpArr(1))
}).distinct().partitionBy(new HashPartitioner(10))
/**
* 这是一个用于保存结果的RDD,其中每一行的形式为(dstIp,List(ip on path))
* 在查找过程中,发现了搜索结果后,就会将其并入到res中
*/
var res = sc.makeRDD[(String,List[String])](List())
/**
* 这是一个用于迭代的RDD,其初始化的内容是,首先从baseRdd中过滤出元组第一个元素a是参数SrcIp的,
* 然后将其转化成(b,List(a))的格式,其中b总是代表当前搜索路径上的尾ip,list中的其他内容代表搜索
* 路径上其他的ip
*/
var iteration = base.filter(_._1.equals(srcIp)).map(a => (a._2,List(a._1)))
for(i <- 2 to depth){
/**
* 1.首先iteration和base按照key进行join,这个操作的意义就是更深一层的搜索,结果RDD的格式是(b,(List(ip on path),c))
* 2.对数据进行一次过滤,过去掉那些路径已经形成环的元素,成环的判据就是List(ip on path)中的数据已经包含c了
* 3.进行map操作,b并入到List(ip on path),将c作为新的key,因此此时更深一层的搜索,导致c成为了当前搜索路径中的尾节点,
* 此时RDD中的每一个元素的格式应该是(c,(List(ip on path))
*/
val tmp = iteration.join(base).filter(a => !a._2._1.contains(a._2._2)).map(a => (a._2._2,a._2._1:+a._1))
/**
* 将tmp中已经成功搜索的路径筛选出来,成功搜索的判据是(c,(List(ip on path)),c与dstIp相等
*/
val success = tmp.filter(a => a._1.equals(dstIp))
/**
* 将成功搜索的数据合并到res中
*/
res = res.union(success)
/**
* 更新iteration
*/
iteration = tmp.subtract(success)
}
/**
* 将成功搜索的路径并入到res中
*/
res.union(iteration.filter(a => a._1.equals(dstIp)))
/**
* 执行一次转换操作,将res中的元素从(c,(List(ip on path))格式转换成List(all ip on path)
*/
val finalResult = res.map(a => a._2 :+ a._1)
finalResult.saveAsTextFile(resPath)
}
}
关于如何使用spark-core实现广度优先搜索问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4489002/blog/4940341