nacos中notifyConfigInfo有什么用,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/controller/CommunicationController.java
@Controller @RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH) public class CommunicationController { private final DumpService dumpService; private final LongPollingService longPollingService; private String trueStr = "true"; @Autowired public CommunicationController(DumpService dumpService, LongPollingService longPollingService) { this.dumpService = dumpService; this.longPollingService = longPollingService; } /** * 通知配置信息改变 */ @RequestMapping(value = "/dataChange", method = RequestMethod.GET) @ResponseBody public Boolean notifyConfigInfo(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam(value = "tag", required = false) String tag) { dataId = dataId.trim(); group = group.trim(); String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED); long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified); String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP); String isBetaStr = request.getHeader("isBeta"); if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) { dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true); } else { dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp); } return true; } //...... }
notifyConfigInfo方法主要是执行dumpService.dump方法,只是是否beta调用的dump方法不同
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpService.java
@Service public class DumpService { @Autowired private Environment env; @Autowired PersistService persistService; @PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this); dumpTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpTaskManager"); dumpTaskMgr.setDefaultTaskProcessor(processor); dumpAllTaskMgr = new TaskManager( "com.alibaba.nacos.server.DumpAllTaskManager"); dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor); //...... } /** * 全量dump间隔 */ static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60; /** * 全量dump间隔 */ static final int INITIAL_DELAY_IN_MINUTE = 6 * 60; private TaskManager dumpTaskMgr; private TaskManager dumpAllTaskMgr; private static final Logger log = LoggerFactory.getLogger(DumpService.class); static final AtomicInteger FINISHED = new AtomicInteger(); static final int INIT_THREAD_COUNT = 10; int total = 0; private final static String TRUE_STR = "true"; private final static String BETA_TABLE_NAME = "config_info_beta"; private final static String TAG_TABLE_NAME = "config_info_tag"; Boolean isQuickStart = false; private int retentionDays = 30; //...... public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); } public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) { dump(dataId, group, tenant, tag, lastModified, handleIp, false); } public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); } //...... }
dump方法最后是往dumpTaskMgr添加DumpTask;dumpTaskMgr的defaultTaskProcessor为dumpProcessor
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/manager/TaskManager.java
public final class TaskManager implements TaskManagerMBean { private static final Logger log = LogUtil.defaultLog; private final ConcurrentHashMap<String, AbstractTask> tasks = new ConcurrentHashMap<String, AbstractTask>(); private final ConcurrentHashMap<String, TaskProcessor> taskProcessors = new ConcurrentHashMap<String, TaskProcessor>(); private TaskProcessor defaultTaskProcessor; Thread processingThread; private final AtomicBoolean closed = new AtomicBoolean(true); private String name; class ProcessRunnable implements Runnable { @Override public void run() { while (!TaskManager.this.closed.get()) { try { Thread.sleep(100); TaskManager.this.process(); } catch (Throwable e) { } } } } ReentrantLock lock = new ReentrantLock(); Condition notEmpty = this.lock.newCondition(); public TaskManager() { this(null); } public AbstractTask getTask(String type) { return this.tasks.get(type); } public TaskProcessor getTaskProcessor(String type) { return this.taskProcessors.get(type); } @SuppressWarnings("PMD.AvoidManuallyCreateThreadRule") public TaskManager(String name) { this.name = name; if (null != name && name.length() > 0) { this.processingThread = new Thread(new ProcessRunnable(), name); } else { this.processingThread = new Thread(new ProcessRunnable()); } this.processingThread.setDaemon(true); this.closed.set(false); this.processingThread.start(); } //...... /** * 将任务加入到任务Map中 * * @param type * @param task */ public void addTask(String type, AbstractTask task) { this.lock.lock(); try { AbstractTask oldTask = tasks.put(type, task); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (null != oldTask) { task.merge(oldTask); } } finally { this.lock.unlock(); } } protected void process() { for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) { AbstractTask task = null; this.lock.lock(); try { // 获取任务 task = entry.getValue(); if (null != task) { if (!task.shouldProcess()) { // 任务当前不需要被执行,直接跳过 continue; } // 先将任务从任务Map中删除 this.tasks.remove(entry.getKey()); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); } } finally { this.lock.unlock(); } if (null != task) { // 获取任务处理器 TaskProcessor processor = this.taskProcessors.get(entry.getKey()); if (null == processor) { // 如果没有根据任务类型设置的处理器,使用默认处理器 processor = this.getDefaultTaskProcessor(); } if (null != processor) { boolean result = false; try { // 处理任务 result = processor.process(entry.getKey(), task); } catch (Throwable t) { log.error("task_fail", "处理task失败", t); } if (!result) { // 任务处理失败,设置最后处理时间 task.setLastProcessTime(System.currentTimeMillis()); // 将任务重新加入到任务Map中 this.addTask(entry.getKey(), task); } } } } if (tasks.isEmpty()) { this.lock.lock(); try { this.notEmpty.signalAll(); } finally { this.lock.unlock(); } } } //...... }
TaskManager的addTask方法往tasks添加AbstractTask;其构造器启动了ProcessRunnable,其run方法主要是执行TaskManager.this.process()方法;该方法会遍历tasks,取出任务,然后通过TaskProcessor的process方法来执行任务
nacos-1.1.3/config/src/main/java/com/alibaba/nacos/config/server/service/dump/DumpTask.java
class DumpProcessor implements TaskProcessor { DumpProcessor(DumpService dumpService) { this.dumpService = dumpService; } @Override public boolean process(String taskType, AbstractTask task) { DumpTask dumpTask = (DumpTask)task; String[] pair = GroupKey2.parseKey(dumpTask.groupKey); String dataId = pair[0]; String group = pair[1]; String tenant = pair[2]; long lastModified = dumpTask.lastModified; String handleIp = dumpTask.handleIp; boolean isBeta = dumpTask.isBeta; String tag = dumpTask.tag; if (isBeta) { // beta发布,则dump数据,更新beta缓存 ConfigInfo4Beta cf = dumpService.persistService.findConfigInfo4Beta(dataId, group, tenant); boolean result; if (null != cf) { result = ConfigService.dumpBeta(dataId, group, tenant, cf.getContent(), lastModified, cf.getBetaIps()); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeBeta(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { if (StringUtils.isBlank(tag)) { ConfigInfo cf = dumpService.persistService.findConfigInfo(dataId, group, tenant); if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) { if (null != cf) { AggrWhitelist.load(cf.getContent()); } else { AggrWhitelist.load(null); } } if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { if (null != cf) { ClientIpWhiteList.load(cf.getContent()); } else { ClientIpWhiteList.load(null); } } if (dataId.equals(SwitchService.SWITCH_META_DATAID)) { if (null != cf) { SwitchService.load(cf.getContent()); } else { SwitchService.load(null); } } boolean result; if (null != cf) { result = ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.remove(dataId, group, tenant); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } else { ConfigInfo4Tag cf = dumpService.persistService.findConfigInfo4Tag(dataId, group, tenant, tag); // boolean result; if (null != cf) { result = ConfigService.dumpTag(dataId, group, tenant, tag, cf.getContent(), lastModified); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, cf.getContent().length()); } } else { result = ConfigService.removeTag(dataId, group, tenant, tag); if (result) { ConfigTraceService.logDumpEvent(dataId, group, tenant, null, lastModified, handleIp, ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } } } final DumpService dumpService; }
DumpProcessor实现了TaskProcessor接口,其process方法主要是根据不同条件执行ConfigService.dump或者remove方法
看完上述内容,你们掌握nacos中notifyConfigInfo有什么用的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。