温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么在Spark Core之上使用hbase-rdd扩建自己的模块

发布时间:2021-12-08 16:48:05 阅读:188 作者:小新 栏目:开发技术
开发者测试专用服务器限时活动,0元免费领,库存有限,领完即止! 点击查看>>

这篇文章主要为大家展示了“怎么在Spark Core之上使用hbase-rdd扩建自己的模块”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“怎么在Spark Core之上使用hbase-rdd扩建自己的模块”这篇文章吧。

hbase-rdd是一个构建在SparkContext基础之上的用于对Hbase进行增删改查的第三方开源模块,目前***版本为0.7.1。目前该rdd在操作hbase时,默认调用隐式方法。

implicitdef stringToBytes(s: String): Array[Byte] = {  Bytes.toBytes(s)  }

将RDD的key转换成字节b,然后调用Hbase的put(b)方法保存rowkey,之后将RDD的每一行存入hbase。

在轨迹图绘制项目数据计算中,我们考虑到hbase的rowkey的设计——尽量减少rowkey存储的开销。虽然hbase-rdd最终的rowkey默认都是采用字节数组,但这个地方我们希望按自己的方式组装rowkey。使用MD5(imei)+dateTime组成的字节数组作为rowkey。因此默认的hbase-rdd提供的方法是不满足我们存储需求的,需要对源代码进行修改。在toHbase方法中,有一个convert方法,该方法将对RDD中的每一行数据进行转化,使用RDD中的key生成Put(Bytes.toBytes(key))对象,该对象为之后存储Hbase提供rowkey。

在convert函数中,对其实现进行了改造,hbase-rdd默认使用stringToBytes隐式函数将RDD的String类型的key转换成字节数组,这里我们需要改造,不使stringToBytes隐式方法,而是直接生成字节数据。

protected def convert(id: String, values: Map[String, Map[String, A]], put: PutAdder[A]) = {  val strs = id.split(",")  val imei = strs {0}  val dateTime = strs {1}  val b1 = MD5Utils.computeMD5Hash(imei.getBytes())  val b2 = Bytes.toBytes(dateTime.toLong)  val key = b1.++(b2)  val p = new Put(key)//改造  var empty = true  for {  (family, content) <- values  (key, value) <- content  } {  empty = false  if (StrUtils.isNotEmpty(family) &&StrUtils.isNotEmpty(key)) {  put(p, family, key, value)  }  }  if (empty) None else Some(new ImmutableBytesWritable, p)  }

这样就实现了使用自己的方式构建rowkey,当然基于此思想我们可以使用任意的方式构建rowkey。

在使用hbase-rdd插件的过程中,我在思考,默认的RDD上是没有toHbase方法的,那为什么引入hbase-rdd包之后,RDD之上就有toHbase方法了?经过查看源码,发现hbase-rdd包中提供了两个隐式方法:

implicitdef toHBaseRDDSimple[A](rddRDD[(StringMap[String, A])])(implicit writerWrites[A]): HBaseWriteRDDSimple[A] =new HBaseWriteRDDSimple(rdd, pa[A]) implicit def toHBaseRDDSimpleTS[A](rddRDD[(StringMap[String, (A, Long)])])(implicit writerWrites[A]): HBaseWriteRDDSimple[(A, Long)] =new HBaseWriteRDDSimple(rdd, pa[A])

这两个方法在发现RDD上没有toHbase方法时会自动尝试调用,从隐式定义中尝试找到解决方案,尝试之后发现有定义toHBaseRDDSimple隐式方法,于是调用该隐式方法新建HBaseWriteRDDSimple类,返回hBaseWriteRDDSimple,而在hBaseWriteRDDSimple对象中是有toHbase方法的,因此在引入hbase-rdd之后,可以发现原本没有toHbase方法的RDD上有toHbase方法了。这一切都要归功于Scala强大的隐式转换功能。

那明白了原理,是否我们可以基于RDD写自己的模块,说干就干!

***步:新建Trait

traitHaha{ implicitdef gaga[A](rdd: RDD[String]): Hehe= newHehe(rdd) }

第二步:新建Hehe类

final  class Hehe(rdd:RDD[String]) { def wow(tableName:String,family:String): Unit ={ println("---------------------------------------------") println("tableName:"+tableName+" - family:"+family) println("size:"+rdd.count()) rdd.collect().foreach(data=>println(data)) println("---------------------------------------------")    } }

第三步:新建包对象

package object test extends Haha

第四步:新建test类

object Test{ def main(args: Array[String]) { valsparkConf = new SparkConf().setAppName("Test") valsc = new SparkContext(sparkConf) sc.makeRDD(Seq("one","two","three","four")).wow("taskDataPre","T")   } }

项目结构图:

怎么在Spark Core之上使用hbase-rdd扩建自己的模块

运行效果图:

怎么在Spark Core之上使用hbase-rdd扩建自己的模块

以上是“怎么在Spark Core之上使用hbase-rdd扩建自己的模块”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

原文链接:http://edu.51cto.com

AI

开发者交流群×