这篇文章主要讲解了“如何将数据按指定格式存入zookeeper”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何将数据按指定格式存入zookeeper”吧!
环境:
scala版本:2.11.8
zookeeper版本:3.4.5-cdh6.7.0
package com.ruozedata.zk
import java.util.concurrent.TimeUnit
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.locks.InterProcessMutex
import org.apache.curator.retry.ExponentialBackoffRetry
import org.slf4j.LoggerFactory
import scala.collection.JavaConversions._
import scala.collection.mutable
/**
* Created by ganwei on 2018/08/21
* 要求:
* 1 通过storeOffsets方法把数据存入zookeeper中。
* 存储格式:
* /consumers/G322/offsets/ruoze_offset_topic/partition/0
* /consumers/G322/offsets/ruoze_offset_topic/partition/1
* /consumers/G322/offsets/ruoze_offset_topic/partition/2
* 2 通过obtainOffsets方法把存入的数据读取出来
* 输出格式:
* topic:ruoze_offset_topic partition:0 offset:7
* topic:ruoze_offset_topic partition:1 offset:3
* topic:ruoze_offset_topic partition:2 offset:5
*/
object ZkConnectApp{
val LOG = LoggerFactory.getLogger(ZkConnectApp.getClass)
val client = {
val client = CuratorFrameworkFactory
.builder
.connectString("172.16.100.31:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("consumers")
.build()
client.start()
client
}
def lock(path: String)(body: => Unit) {
val lock = new InterProcessMutex(client, path)
lock.acquire()
try {
body
} finally {
lock.release()
}
}
def tryDo(path: String)(body: => Unit): Boolean = {
val lock = new InterProcessMutex(client, path)
if (!lock.acquire(10, TimeUnit.SECONDS)) {
LOG.info(s"不能获得锁 {$path},已经有任务在运行,本次任务退出")
return false
}
try {
LOG.info("获准运行")
body
true
} finally {
lock.release()
LOG.info(s"释放锁 {$path}")
}
}
//zookeeper创建路径
def ensurePathExists(path: String): Unit = {
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path)
}
}
/**
* OffsetRange类定义(偏移量对象)
* 用于存储偏移量
*/
case class OffsetRange(
val topic:String, // 主题
val partition:Int, // 分区
val fromOffset:Long, // 起始偏移量
val utilOffset:Long // 终止偏移量
)
/**
* zookeeper存储offset的方法
* 写入格式:
* /consumers/G322/offsets/ruoze_offset_topic/partition/0
* /consumers/G322/offsets/ruoze_offset_topic/partition/1
* /consumers/G322/offsets/ruoze_offset_topic/partition/2
* @param OffsetsRanges
* @param groupName
*/
def storeOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={
val offsetRootPath = s"/"+groupName
if (client.checkExists().forPath(offsetRootPath) == null) {
client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
}
for(els <- OffsetsRanges ){
val data = String.valueOf(els.utilOffset).getBytes
val path = s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"
// 创建路径
ensurePathExists(path)
// 写入数据
client.setData().forPath(path, data)
}
}
/**
* TopicAndPartition类定义(偏移量key对象)
* 用于提取偏移量
*/
case class TopicAndPartition(
topic:String, // 主题
partition:Int // 分区
)
/**
* zookeeper提取offset的方法
* @param topic
* @param groupName
* @return
*/
def obtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={
// 定义一个空的HashMap
val maps = mutable.HashMap[TopicAndPartition,Long]()
// offset的路径
val offsetRootPath = s"/"+groupName+"/offsets/"+topic+"/partition"
// 判断路径是否存在
val stat = client.checkExists().forPath(s"$offsetRootPath")
if (stat == null ){
println(stat) // 路径不存在 就将路径打印在控制台,检查路径
}else{
// 获取 offsetRootPath路径下一级的所有子目录
// 我们这里是获取的所有分区
val children = client.getChildren.forPath(s"$offsetRootPath")
// 遍历所有的分区
for ( lines <- children ){
// 获取分区的数据
val data = new String(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong
// 将 topic partition 和数据赋值给 maps
maps(TopicAndPartition(topic,lines.toInt)) = data
}
}
// 按partition排序后 返回map对象
maps.toList.sortBy(_._1.partition).toMap
}
def main(args: Array[String]) {
//定义初始化数据
val off1 = OffsetRange("ruoze_offset_topic",0,0,7)
val off2 = OffsetRange("ruoze_offset_topic",1,0,3)
val off3 = OffsetRange("ruoze_offset_topic",2,0,5)
val arr = Array(off1,off2,off3)
//获取到namespace
// println(client.getNamespace)
// 创建路径
// val offsetRootPath = "/G322"
// if (client.checkExists().forPath(offsetRootPath) == null) {
// client.create().creatingParentsIfNeeded().forPath(offsetRootPath)
// }
//存储值
storeOffsets(arr,"G322")
//获取值
/**
* 输出格式:
* topic:ruoze_offset_topic partition:0 offset:7
* topic:ruoze_offset_topic partition:1 offset:3
* topic:ruoze_offset_topic partition:2 offset:5
*/
val result = obtainOffsets("ruoze_offset_topic","G322")
for (map <- result){
println("topic:"+map._1.topic+"\t" +"partition:"+map._1.partition+"\t"+"offset:"+map._2)
}
}
}
感谢各位的阅读,以上就是“如何将数据按指定格式存入zookeeper”的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:http://blog.itpub.net/29609890/viewspace-2212606/