本篇内容主要讲解“nacos client中PushReceiver的原理和应用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“nacos client中PushReceiver的原理和应用”吧!
本文主要研究一下nacos client的PushReceiver
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/PushReceiver.java
public class PushReceiver implements Runnable { private ScheduledExecutorService executorService; private static final int UDP_MSS = 64 * 1024; private DatagramSocket udpSocket; private HostReactor hostReactor; public PushReceiver(HostReactor hostReactor) { try { this.hostReactor = hostReactor; udpSocket = new DatagramSocket(); executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.push.receiver"); return thread; } }); executorService.execute(this); } catch (Exception e) { NAMING_LOGGER.error("[NA] init udp socket failed", e); } } @Override public void run() { while (true) { try { // byte[] is initialized with 0 full filled by default byte[] buffer = new byte[UDP_MSS]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); udpSocket.receive(packet); String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim(); NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString()); PushPacket pushPacket = JSON.parseObject(json, PushPacket.class); String ack; if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) { hostReactor.processServiceJSON(pushPacket.data); // send ack to server ack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } else if ("dump".equals(pushPacket.type)) { // dump data to server ack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\", \"data\":" + "\"" + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap())) + "\"}"; } else { // do nothing send ack only ack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\", \"data\":" + "\"\"}"; } udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")), ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress())); } catch (Exception e) { NAMING_LOGGER.error("[NA] error while receiving push data", e); } } } public static class PushPacket { public String type; public long lastRefTime; public String data; } public int getUDPPort() { return udpSocket.getLocalPort(); } }
PushReceiver实现了Runnable接口,其构造器会创建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注册自己
其run方法使用while true循环来执行udpSocket.receive(packet),之后将接收到的数据解析为PushPacket,然后根据不同pushPacket.type做不同处理
当pushPacket.type为dom或者service的时候会调用hostReactor.processServiceJSON(pushPacket.data);当pushPacket.type为dump的时候会将hostReactor.getServiceInfoMap()序列化到ack中,最后将ack返回去
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java
public class HostReactor { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>(); private Map<String, ServiceInfo> serviceInfoMap; private Map<String, Object> updatingMap; private PushReceiver pushReceiver; private EventDispatcher eventDispatcher; private NamingProxy serverProxy; private FailoverReactor failoverReactor; private String cacheDir; private ScheduledExecutorService executor; public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) { this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.client.naming.updater"); return thread; } }); this.eventDispatcher = eventDispatcher; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16); } this.updatingMap = new ConcurrentHashMap<String, Object>(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); } //...... public ServiceInfo processServiceJSON(String json) { ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; if (oldService != null) { if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts)); } if (modHosts.size() > 0) { changed = true; NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { eventDispatcher.serviceChanged(serviceInfo); DiskCache.write(serviceInfo, cacheDir); } } else { changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON .toJSONString(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts())); } return serviceInfo; } public Map<String, ServiceInfo> getServiceInfoMap() { return serviceInfoMap; } //...... }
processServiceJSON方法会对比将接收到serviceInfo与本地对比,然后判断是否变更,并在需要的时候更新本地的serviceInfo并回调eventDispatcher.serviceChanged(serviceInfo)以及DiskCache.write(serviceInfo, cacheDir);HostReactor的构造器有个loadCacheAtStart参数(默认为false
),如果为true则会使用DiskCache.read(this.cacheDir)从本地文件读取serviceInfo信息来初始化serviceInfoMap
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/cache/DiskCache.java
public class DiskCache { public static void write(ServiceInfo dom, String dir) { try { makeSureCacheDirExists(dir); File file = new File(dir, dom.getKeyEncoded()); if (!file.exists()) { // add another !file.exists() to avoid conflicted creating-new-file from multi-instances if (!file.createNewFile() && !file.exists()) { throw new IllegalStateException("failed to create cache file"); } } StringBuilder keyContentBuffer = new StringBuilder(""); String json = dom.getJsonFromServer(); if (StringUtils.isEmpty(json)) { json = JSON.toJSONString(dom); } keyContentBuffer.append(json); //Use the concurrent API to ensure the consistency. ConcurrentDiskUtil.writeFileContent(file, keyContentBuffer.toString(), Charset.defaultCharset().toString()); } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to write cache for dom:" + dom.getName(), e); } } public static String getLineSeparator() { return System.getProperty("line.separator"); } public static Map<String, ServiceInfo> read(String cacheDir) { Map<String, ServiceInfo> domMap = new HashMap<String, ServiceInfo>(16); BufferedReader reader = null; try { File[] files = makeSureCacheDirExists(cacheDir).listFiles(); if (files == null || files.length == 0) { return domMap; } for (File file : files) { if (!file.isFile()) { continue; } String fileName = URLDecoder.decode(file.getName(), "UTF-8"); if (!(fileName.endsWith(Constants.SERVICE_INFO_SPLITER + "meta") || fileName.endsWith( Constants.SERVICE_INFO_SPLITER + "special-url"))) { ServiceInfo dom = new ServiceInfo(fileName); List<Instance> ips = new ArrayList<Instance>(); dom.setHosts(ips); ServiceInfo newFormat = null; try { String dataString = ConcurrentDiskUtil.getFileContent(file, Charset.defaultCharset().toString()); reader = new BufferedReader(new StringReader(dataString)); String json; while ((json = reader.readLine()) != null) { try { if (!json.startsWith("{")) { continue; } newFormat = JSON.parseObject(json, ServiceInfo.class); if (StringUtils.isEmpty(newFormat.getName())) { ips.add(JSON.parseObject(json, Instance.class)); } } catch (Throwable e) { NAMING_LOGGER.error("[NA] error while parsing cache file: " + json, e); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to read cache for dom: " + file.getName(), e); } finally { try { if (reader != null) { reader.close(); } } catch (Exception e) { //ignore } } if (newFormat != null && !StringUtils.isEmpty(newFormat.getName()) && !CollectionUtils.isEmpty( newFormat.getHosts())) { domMap.put(dom.getKey(), newFormat); } else if (!CollectionUtils.isEmpty(dom.getHosts())) { domMap.put(dom.getKey(), dom); } } } } catch (Throwable e) { NAMING_LOGGER.error("[NA] failed to read cache file", e); } return domMap; } private static File makeSureCacheDirExists(String dir) { File cacheDir = new File(dir); if (!cacheDir.exists() && !cacheDir.mkdirs()) { throw new IllegalStateException("failed to create cache dir: " + dir); } return cacheDir; } }
DiskCache的write方法会将serviceInfo写入到dir文件夹下面,文件名为serviceInfo.getKeyEncoded();read方法则是读取dir文件夹下面的文件然后解析为一个个ServiceInfo然后放到domMap,最后返回domMap
PushReceiver实现了Runnable接口,其构造器会创建udpSocket以及ScheduledThreadPoolExecutor,然后往ScheduledThreadPoolExecutor注册自己
其run方法使用while true循环来执行udpSocket.receive(packet),之后将接收到的数据解析为PushPacket,然后根据不同pushPacket.type做不同处理
当pushPacket.type为dom或者service的时候会调用hostReactor.processServiceJSON(pushPacket.data);当pushPacket.type为dump的时候会将hostReactor.getServiceInfoMap()序列化到ack中,最后将ack返回去
到此,相信大家对“nacos client中PushReceiver的原理和应用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。