本篇内容介绍了“Zookeeper Queue队列怎么实现 ”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1: Barries: 栅栏,见面知意。
2:Queue:Queue也就是我们所说的队列
1:Barries:
1.1: 是指所有的现场都达到 barrier后才能进行后续的计算
1.2:所有的线程都完成自己的计算以后才能离开barrier
进入栅栏: 1,新建一个根节点 "/root" 2, 想进入barrier的线程在 “/root”下建立一个字节点"/root/c-i" 3,循环监听"/root"孩子节点数的变化,每当其达到Size的时候就说明有Size个线程都已经达到了Barrier的 要求。
2:Queue:就是指一个生产者或消费者的模型
离开Barrier 1: 想离开Barrier的现场删除掉在"/root" 下建立的子节点 2: 循环监听"/root" 孩子节点数目的变化,当Size减少到0的时候它就可以离开了。
3 :Queue 队列的实现
1 : 建立一个根节点"/root" 2 : 生产线程在"/root" 下建立一个SEQUENTAIL的节点 3 : 消费线程检查"/root" 如果没有就循环的监听"/root" 节点的变化,直到它有自己的子节点,删除序号 最小子字节点。
package sync; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.List; import java.util.Random; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; public class SyncPrimitive implements Watcher { static ZooKeeper zk = null; static Integer mutex; String root; //同步原语 SyncPrimitive(String address) { if (zk == null) { try { System.out.println("Starting ZK:"); //建立Zookeeper连接,并且指定watcher zk = new ZooKeeper(address, 3000, this); //初始化锁对象 mutex = new Integer(-1); System.out.println("Finished starting ZK:" + zk); } catch (IOException e) { System.out.println(e.toString()); zk = null; } } } @Override synchronized public void process(WatchedEvent event) { synchronized (mutex) { //有事件发生时,调用notify,使其他wait()点得以继续 mutex.notify(); } } static public class Barrier extends SyncPrimitive { int size; String name; Barrier(String address, String root, int size) { super(address); this.root = root; this.size = size; if (zk != null) { try { //一个barrier建立一个根目录 Stat s = zk.exists(root, false); //不注册watcher if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } try { //获取自己的主机名 name = new String(InetAddress.getLocalHost() .getCanonicalHostName().toString()); } catch (UnknownHostException e) { System.out.println(e.toString()); } } boolean enter() throws KeeperException, InterruptedException { //在根目录下创建一个子节点.create和delete都会触发children wathes,这样getChildren就会收到通知,process()就会被调用 zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); //一直等,直到根目录下的子节点数目达到size时,函数退出 while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); //释放mutex上的锁 } else { return true; } } } } boolean leave() throws KeeperException, InterruptedException { //删除自己创建的节点 zk.delete(root + "/" + name, 0); //一直等,直到根目录下有子节点时,函数退出 while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } } static public class Queue extends SyncPrimitive { Queue(String address, String name) { super(address); this.root = name; if (zk != null) { try { //一个queue建立一个根目录 Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { System.out .println("keeper exception when instantiating queue:" + e.toString()); } catch (InterruptedException e) { System.out.println("Interrupted exception."); } } } //参数i是要创建节点的data boolean produce(int i) throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(i); value = b.array(); //根目录下创建一个子节点,因为是SEQUENTIAL的,所以先创建的节点具有较小的序号 zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); return true; } int consume() throws KeeperException, InterruptedException { int retvalue = -1; Stat stat = null; while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); //并不能保证list[0]就是序号最小的 //如果根目录下没有子节点就一直等 if (list.size() == 0) { System.out.println("Going to wait"); mutex.wait(); } //找到序号最小的节点将其删除 else { Integer min = new Integer(list.get(0).substring(7)); for (String s : list) { Integer tmp = new Integer(s.substring(7)); if (tmp < min) min = tmp; } System.out.println("Temporary value:" + root + "/element" + min); byte[] b = zk.getData(root + "/element" + min, false, stat); zk.delete(root + "/element" + min, 0); ByteBuffer buffer = ByteBuffer.wrap(b); retvalue = buffer.getInt(); return retvalue; } } } } } public static void main(String[] args) { if (args[0].equals("qTest")) queueTest(args); else barrierTest(args); } private static void barrierTest(String[] args) { Barrier b = new Barrier(args[1], "/b1", new Integer(args[2])); try { boolean flag = b.enter(); System.out.println("Enter barrier:" + args[2]); if (!flag) System.out.println("Error when entering the barrier"); } catch (KeeperException e) { } catch (InterruptedException e) { } Random rand = new Random(); int r = rand.nextInt(100); for (int i = 0; i < r; i++) { try { Thread.sleep(100); } catch (InterruptedException e) { } } try { b.leave(); } catch (KeeperException e) { } catch (InterruptedException e) { } System.out.println("Left barrier"); } private static void queueTest(String[] args) { Queue q = new Queue(args[1], "/app1"); System.out.println("Input:" + args[1]); int i; Integer max = new Integer(args[2]); if (args[3].equals("p")) { System.out.println("Producer"); for (i = 0; i < max; i++) try { q.produce(10 + 1); } catch (KeeperException e) { } catch (InterruptedException e) { } } else { System.out.println("Consumer"); for (i = 0; i < max; i++) try { int r = q.consume(); System.out.println("Item:" + r); } catch (KeeperException e) { i--; } catch (InterruptedException e) { } } } }
“Zookeeper Queue队列怎么实现 ”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。