本篇内容介绍了“Zookeeper Queue队列怎么实现 ”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1: Barries: 栅栏,见面知意。
2:Queue:Queue也就是我们所说的队列
1:Barries:
1.1: 是指所有的现场都达到 barrier后才能进行后续的计算
1.2:所有的线程都完成自己的计算以后才能离开barrier
进入栅栏:
1,新建一个根节点 "/root"
2, 想进入barrier的线程在 “/root”下建立一个字节点"/root/c-i"
3,循环监听"/root"孩子节点数的变化,每当其达到Size的时候就说明有Size个线程都已经达到了Barrier的
要求。
2:Queue:就是指一个生产者或消费者的模型
离开Barrier
1: 想离开Barrier的现场删除掉在"/root" 下建立的子节点
2: 循环监听"/root" 孩子节点数目的变化,当Size减少到0的时候它就可以离开了。
3 :Queue 队列的实现
1 : 建立一个根节点"/root"
2 : 生产线程在"/root" 下建立一个SEQUENTAIL的节点
3 : 消费线程检查"/root" 如果没有就循环的监听"/root" 节点的变化,直到它有自己的子节点,删除序号
最小子字节点。
package sync;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
public class SyncPrimitive implements Watcher {
static ZooKeeper zk = null;
static Integer mutex;
String root;
//同步原语
SyncPrimitive(String address) {
if (zk == null) {
try {
System.out.println("Starting ZK:");
//建立Zookeeper连接,并且指定watcher
zk = new ZooKeeper(address, 3000, this);
//初始化锁对象
mutex = new Integer(-1);
System.out.println("Finished starting ZK:" + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
}
@Override
synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
//有事件发生时,调用notify,使其他wait()点得以继续
mutex.notify();
}
}
static public class Barrier extends SyncPrimitive {
int size;
String name;
Barrier(String address, String root, int size) {
super(address);
this.root = root;
this.size = size;
if (zk != null) {
try {
//一个barrier建立一个根目录
Stat s = zk.exists(root, false); //不注册watcher
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("keeper exception when instantiating queue:"
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception.");
}
}
try {
//获取自己的主机名
name = new String(InetAddress.getLocalHost()
.getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}
}
boolean enter() throws KeeperException, InterruptedException {
//在根目录下创建一个子节点.create和delete都会触发children wathes,这样getChildren就会收到通知,process()就会被调用
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
//一直等,直到根目录下的子节点数目达到size时,函数退出
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() < size) {
mutex.wait(); //释放mutex上的锁
} else {
return true;
}
}
}
}
boolean leave() throws KeeperException, InterruptedException {
//删除自己创建的节点
zk.delete(root + "/" + name, 0);
//一直等,直到根目录下有子节点时,函数退出
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}
static public class Queue extends SyncPrimitive {
Queue(String address, String name) {
super(address);
this.root = name;
if (zk != null) {
try {
//一个queue建立一个根目录
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("keeper exception when instantiating queue:"
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception.");
}
}
}
//参数i是要创建节点的data
boolean produce(int i) throws KeeperException, InterruptedException {
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
//根目录下创建一个子节点,因为是SEQUENTIAL的,所以先创建的节点具有较小的序号
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
int consume() throws KeeperException, InterruptedException {
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true); //并不能保证list[0]就是序号最小的
//如果根目录下没有子节点就一直等
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
}
//找到序号最小的节点将其删除
else {
Integer min = new Integer(list.get(0).substring(7));
for (String s : list) {
Integer tmp = new Integer(s.substring(7));
if (tmp < min)
min = tmp;
}
System.out.println("Temporary value:" + root
+ "/element" + min);
byte[] b = zk.getData(root + "/element" + min, false,
stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
}
public static void main(String[] args) {
if (args[0].equals("qTest"))
queueTest(args);
else
barrierTest(args);
}
private static void barrierTest(String[] args) {
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
try {
boolean flag = b.enter();
System.out.println("Enter barrier:" + args[2]);
if (!flag)
System.out.println("Error when entering the barrier");
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
Random rand = new Random();
int r = rand.nextInt(100);
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
try {
b.leave();
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
System.out.println("Left barrier");
}
private static void queueTest(String[] args) {
Queue q = new Queue(args[1], "/app1");
System.out.println("Input:" + args[1]);
int i;
Integer max = new Integer(args[2]);
if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++)
try {
q.produce(10 + 1);
} catch (KeeperException e) {
} catch (InterruptedException e) {
}
} else {
System.out.println("Consumer");
for (i = 0; i < max; i++)
try {
int r = q.consume();
System.out.println("Item:" + r);
} catch (KeeperException e) {
i--;
} catch (InterruptedException e) {
}
}
}
}
“Zookeeper Queue队列怎么实现 ”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/infiniteSpace/blog/340690