如何将RDD或者MLLib矩阵进行转置操作,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
如何将Spark Mllib的矩阵或者将一个RDD进行转置操作。Spark Mllib的矩阵有多种形式,分布式和非分布式,非分布式在这里浪尖就不讲了,很简单,因为他是基于数组的。而分布式存储是基于RDD的,那么问题就又变成了如何将一个RDD进行转置。
首先我们来介绍一下什么是转置操作:
百科上的定义,将一个矩阵的行列互换得到的矩阵就是该矩阵的转置。
要想把一个RDD的行列互换的话,主要思路如下:
1,先转化RDD,给每一行带上唯一的行号(row, rowIndex)。
2,针对RDD的每一行,转化为(value, colIndex),并整理的到(colIndex.toLong, (rowIndex, value))
3,进行flatmap
4,步骤3完成后,我们只需要按照3key进行分组,并按照其key进行排序就可以得到转化后列式有序。
5,完成步骤4后,我们就可以按照每一行的(rowIndex, value),使用下标和其值构建新的行,保证每一行转换后的顺序。
到此转换完成。
具体步骤如下:
def transposeRowMatrix(m: RowMatrix): RowMatrix = { val transposedRowsRDD = m.rows.zipWithIndex.map{case (row, rowIndex) => rowToTransposedTriplet(row, rowIndex)} .flatMap(x => x) // (newRowIndex, (newColIndex, value)) .groupByKey .sortByKey().map(_._2) // 对row进行排序,去除掉索引 .map(buildRow) // 利用索引和值,重新构建每一行,去掉索引 new RowMatrix(transposedRowsRDD)}//转换每一行 def rowToTransposedTriplet(row: Vector, rowIndex: Long): Array[(Long, (Long, Double))] = { val indexedRow = row.toArray.zipWithIndex indexedRow.map{case (value, colIndex) => (colIndex.toLong, (rowIndex, value))} }//构建新的行 def buildRow(rowWithIndexes: Iterable[(Long, Double)]): Vector = { val resArr = new Array[Double](rowWithIndexes.size) rowWithIndexes.foreach{case (index, value) => resArr(index.toInt) = value } Vectors.dense(resArr) }
测试
准备数据
val observations = sc.parallelize( Seq( Vectors.dense(1.0, 10.0, 100.0,2.0), Vectors.dense(2.0, 20.0, 200.0,2.0), Vectors.dense(3.0, 30.0, 300.0,2.0) ))
生成矩阵
val mat: RowMatrix = new RowMatrix(observations)
会发现行列已经互换。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4590259/blog/4600978