这篇文章主要介绍“spark读取hbase的数据实例代码”,在日常操作中,相信很多人在spark读取hbase的数据实例代码问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”spark读取hbase的数据实例代码”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
package hgs.spark.hbase
//https://blog.csdn.net/mlljava1111/article/details/52675901
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.apache.hadoop.hbase.filter.RowFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.filter.LongComparator
object HbaseToSpark {
def main(args: Array[String]): Unit = {
//System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
val conf = new SparkConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.setMaster("local").setAppName("hbasedata")
val context = new SparkContext(conf)
//hbase配置
val hconf = new HBaseConfiguration
hconf.set("hbase.zookeeper.quorum", "bigdata00:2181,bigdata01:2181,bigdata02:2181")
hconf.set("hbase.zookeeper.property.clientPort", "2181")
hconf.set(TableInputFormat.INPUT_TABLE, "test")
val scan = new Scan
//扫描的表rowkey的开始和结束
scan.setStartRow("1991".getBytes)
scan.setStopRow("3000".getBytes)
//val list = new FilterList(Operator.MUST_PASS_ALL)
//val filter1 = new RowFilter(CompareOp.GREATER_OR_EQUAL,new LongComparator(1991))
//val filter2 = new RowFilter(CompareOp.LESS_OR_EQUAL,new RegexStringComparator("3000*"))
// list.addFilter(filter1)
// list.addFilter(filter2)
//scan.setFilter(list)
//添加scan
hconf.set(TableInputFormat.SCAN, convertScanToString(scan))
val hrdd = context.newAPIHadoopRDD(hconf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val resultrdd = hrdd.repartition(2)
//打印结果
resultrdd.foreach{case(_,value)=>{
val key = Bytes.toString(value.getRow)
val name = Bytes.toString(value.getValue("cf1".getBytes, "name".getBytes))
val age = Bytes.toString(value.getValue("cf1".getBytes, "age".getBytes))
println("rowkey:"+key+" "+"name:"+name+" "+"age:"+age)
}
}
context.stop()
}
def convertScanToString(scan: Scan) = {
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray)
}
}
到此,关于“spark读取hbase的数据实例代码”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/31506529/viewspace-2640482/