ZooKeeper构建配置服务
* 配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信息中那些公共的部分。
* 简单的说,ZooKeeper可以作为一个具有高可用性的配置存储器,允许分布式应用的参与者检索和更新配置文件。
* 使用ZooKeeper中的观察机制,可以建立一个活跃的配置服务,使那些感兴趣的客户端能够获得配置信息修改的通知。
在每个znode上存储一个键值对,ActiveKeyValueStore 提供了从zookeeper服务上写和读取键值方法。
public class ActiveKeyValueStore extends ConnectionWatcher{ private static final Charset CHARSET =Charset.forName("GBk"); private static final int MAX_RETRIES = 5; private static final long RETRY_PERIOD_SECONDS = 60; public void write(String path, String value) throws Exception{ int retries = 0; while(true){ try { Stat stat = zk.exists(path, false); if(stat == null){ zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }else{ zk.setData(path, value.getBytes(CHARSET), -1); } } catch (KeeperException.SessionExpiredException e) { throw e; }catch(KeeperException e){ if(retries++ == MAX_RETRIES){ throw e; } TimeUnit.SECONDS.sleep(RETRY_PERIOD_SECONDS); } } } public String read(String path, Watcher watcher) throws Exception{ byte[] data = zk.getData(path, watcher, null); return new String(data, CHARSET); } }
与zookeeper服务创建连接
public class ConnectionWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws Exception{ //创建zookeeper实例的时候会启动一个线程连接到zookeeper服务。 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } //当客户端已经与zookeeper建立连接后,Watcher的process方法会被调用。 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } public void close() throws Exception{ zk.close(); } }
ResilientConfigUpdater类提供了管理更新配置信息方法。
public class ResilientConfigUpdater { public static final String PATH = "/config"; private ActiveKeyValueStore store; private Random random = new Random(); public ResilientConfigUpdater(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void run() throws Exception{ while(true){ String value = random.nextInt(100)+""; store.write(PATH, value); System.out.printf("Set %s to %s\n", PATH, value); TimeUnit.SECONDS.sleep(random.nextInt(10)); } } public static void main(String[] args) throws Exception { while(true){ try{ ResilientConfigUpdater updater = new ResilientConfigUpdater("192.168.44.231"); updater.run(); }catch(KeeperException.SessionExpiredException e){ //start a new session }catch(KeeperException e){ e.printStackTrace(); break; } } } }
ConfigWatcher类提供了配置信息变更观察器,在信息修改后会触发显示方法被调用。
public class ConfigWatcher implements Watcher{ private ActiveKeyValueStore store; public ConfigWatcher(String hosts) throws Exception{ store = new ActiveKeyValueStore(); store.connect(hosts); } public void displayConfig() throws Exception{ String value = store.read(ConfigUpdater.PATH, this); System.out.printf("Read %s as %s\n", ConfigUpdater.PATH, value); } public void process(WatchedEvent event) { // TODO Auto-generated method stub if(event.getType() == EventType.NodeDataChanged){ try { displayConfig(); } catch (Exception e) { System.out.println("Interrupted. Exiting."); Thread.currentThread().interrupt(); } } } public static void main(String[] args) throws Exception { ConfigWatcher watcher = new ConfigWatcher("192.168.44.231"); watcher.displayConfig(); Thread.sleep(Long.MAX_VALUE); } }
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。