温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java中怎么用nacos配置中心

发布时间:2021-11-01 14:37:26 阅读:376 作者:iii 栏目:编程语言

这篇文章主要介绍“Java中怎么用nacos配置中心”,在日常操作中,相信很多人在Java中怎么用nacos配置中心问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java中怎么用nacos配置中心”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

配置的发布与订阅

我们先来看看如何使用nacos提供的api来实现配置的发布与订阅
发布配置:
public class ConfigPub {    public static void main(String[] args) throws NacosException {        final String dataId="test";        final String group="DEFAULT_GROUP";        ConfigService configService= NacosFactory.createConfigService("localhost:8848");        configService.publishConfig(dataId,group,"test config body");    }}
订阅配置:
   public static void main(String[] args) throws NacosExceptionInterruptedException {        final String dataId="test";        final String group="DEFAULT_GROUP";        ConfigService configService= NacosFactory.createConfigService("localhost:8848");        configService.addListener(dataId, group, new Listener() {            @Override            public Executor getExecutor() {                return null;            }            @Override            public void receiveConfigInfo(String configInfo) {                System.out.println("receiveConfigInfo:"+configInfo);            }        });        Thread.sleep(Integer.MAX_VALUE);    }}
根据上面的demo可以看到通过dataId和group可以定位一个配置文件。

Java中怎么用nacos配置中心

深入了解配置发布

1-发布的配置信息会通过http请求调用具体的服务

agent.httpPost(url, headers, params, encode, POST_TIMEOUT);
服务类为 ConfigController:处理配置相关的http请求
persistService      .insertOrUpdateTag(configInfo, tag, srcIp, srcUser, timefalse);EventDispatcher.fireEvent(      new ConfigDataChangeEvent(false, dataId, group, tenant, tag,            time.getTime()));

可以看到发布的配置首先会进行持久化,然后会触发变更通知。

持久化这里就不做分析,我们来看看fireEvent这个方法:

EventDispatcher.fireEvent:static public void fireEvent(Event event) {    if (null == event) {        throw new IllegalArgumentException("event is null");    }    for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {        try {            listener.onEvent(event);        } catch (Exception e) {            log.error(e.toString(), e);        }    }}这里可以看到具体调用了listener.onEvent(event);这里只要找到AbstractEventListener 具体的实现类是哪个就可以。AbstractEventListener主要有两个实现类:AsyncNotifyServiceLongPollingService我们可以通过event的类型去判断,因为这里onEvent的参数类型为ConfigDataChangeEvent,所以我们可以清楚的知道我们要找的实现类是AsyncNotifyService。每个AbstractEventListener初始化的时候都会先将自己加入到listeners中final CopyOnWriteArrayList<AbstractEventListener> listeners;public AbstractEventListener(
) {    /**     * automatic register     */    EventDispatcher.addEventListener(this);}我们可以直接看看AsyncNotifyService的onEvent方法:public void onEvent(Event event) {   // 并发产生 ConfigDataChangeEvent   if (event instanceof ConfigDataChangeEvent) {      ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;      long dumpTs = evt.lastModifiedTs;      String dataId = evt.dataId;      String group = evt.group;      String tenant = evt.tenant;      String tag = evt.tag;      //Member{address='192.168.31.192:8848'}      Collection<Member> ipList = memberManager.allMembers();      // 其实这里任何类型队列都可以      Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();      for (Member member : ipList) {         queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs,               member.getAddress(), evt.isBeta));      }      EXECUTOR.execute(new AsyncTask(httpclient, queue));   }}上面的方法主要实现的是:获取所有的nacos服务节点,然后对其执行异步任务AsyncTask。AsyncTask中会从队列中获取每个节点的NotifySingleTask信息,然后进行http请求,调用通知配置信息改变的服务。具体服务在CommunicationController中实现。/** * 通知配置信息改变 */@GetMapping("/dataChange")这个方法放在后面分析。

深入了解配置订阅

初始化:

NacosConfigService初始化的时候构造了ClientWorker,并且通过ClientWorker启动了两个线程池。worker = new ClientWorker(agent, configFilterChainManager, properties);第一个线程池每10ms执行一次checkConfigInfo();executor.scheduleWithFixedDelay(new Runnable() {    @Override    public void run() {        try {            checkConfigInfo();        } catch (Throwable e) {           LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check            error", e);        }    }}, 1L, 10L, TimeUnit.MILLISECONDS);我们来看看checkConfigInfo具体是做什么的public void checkConfigInfo() {    // 分任务    int listenerSize = cacheMap.get().size();    // 向上取整为批数,限制LongPollingRunnable处理配置的个数。    int longingTaskCount =(int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());    if (longingTaskCount > currentLongingTaskCount) {        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {            // 要判断任务是否在执行 这块需要好好想想。             //任务列表现在是无序的。变化过程可能有问题            executorService.execute(new LongPollingRunnable(i));            //这里的i就代表taskId        }        currentLongingTaskCount = longingTaskCount;    }}这里主要的作用是提交LongPollingRunnable任务到第二个线程池中去运行。并且每个LongPollingRunnable只会处理3000个配置。我们来看看LongPollingRunnable的实现List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {    // check failover config    for (CacheData cacheData : cacheMap.get().values()) {        if (cacheData.getTaskId() == taskId) {            cacheDatas.add(cacheData);            ...        }    }cacheMap中保存了配置信息,从磁盘中加载获取。通过taskId从 cacheMap中获取需要被当前LongPollingRunnable任务处理的配置,放入到cacheDatas集合。我们来看看是在哪里设置的taskIdint taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();cache.setTaskId(taskId);可以看到这里和上面相对应,每3000个配置的taskId是相同的。因为每个LongPollingRunnable线程会处理3000个配置。
// check server config  向服务端请求变化的配置List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);//从Server获取值变化了的DataID列表。返回的对象里只有dataId和group是有效的。 保证不返回NULL。return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);这里订阅配置的客户端会向服务端发送http长轮询请求,来获取变化的配置信息长轮询请求不会立刻返回结果,而是当有配置发生变化时返回,设置了超时时间30s,如果超过了设置的超时时间没有配置更新,则会默认返回。然后重新发起一次长轮询的请求。HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,    agent.getEncode(), readTimeoutMs);长轮询的周期默认为30s:timeout=Math.max(NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),    Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);具体服务实现类在ConfigController中:@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)      throws ServletException, IOException {   ....   // do long-polling   inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}doPollingConfig方法:// 服务端处理长轮询请求if (LongPollingService.isSupportLongPolling(request)) {    longPollingService.addLongPollingClient(request, response, clientMd5Map,     probeRequestSize);    return HttpServletResponse.SC_OK + "";}使用线程池处理请求:scheduler.execute(    new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout,     appName, tag));接着来看ClientLongPolling是一个线程实现类首先会触发一个延时任务,然后将自己加入到队列:allSubs.add(this);allSubs中维护了所有长轮训请求。那么肯定会有一个地方去消费allSubs队列中的请求.这个消费的地方就是onEvent方法:LongPollingService其实就是我们上面提到的AbstractEventListener,因此也实现了onEvent方法。@Overridepublic void onEvent(Event event) {    if (isFixedPolling()) {        // ignore    } else {        if (event instanceof LocalDataChangeEvent) {            LocalDataChangeEvent evt = (LocalDataChangeEvent)event;            scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta,             evt.betaIps));        }    }}这个event方法就是去处理配置变化的情况,主要逻辑在DataChangeTask中:从allSubs获取维护的请求中相同dataId+group的请求,比如:(test+DEFAULT_GROUP)然后进行这个对长轮询的请求进行返回。for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {    ClientLongPolling clientSub = iter.next();    //groupKey test+DEFAULT_GROUP    if (clientSub.clientMd5Map.containsKey(groupKey)) {        ......        iter.remove(); // 删除订阅关系        LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",        (System.currentTimeMillis() - changeTime),        "in-advance",        RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),            "polling",            clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);        clientSub.sendResponse(Arrays.asList(groupKey));    }}
那是哪里触发了LongPollingService里面的onEvent 方法呢?当然是在配置发布后进行触发的,还记得CommunicationController中的dataChange服务吗?配置发布后会通过http请求调用nacos服务中的dataChange服务。通过dataChange服务就可以通知nacos服务中保存的长轮训的请求了。并且这个方法是获取所有nacos服务节点去遍历执行的,因此不管变更配置对应的长轮询保存在哪个节点,都会可以被获取到。/** * 通知配置信息改变 */@GetMapping("/dataChange")此处会调用DumpService中的方法保存配置文件到磁盘,并缓存md5.DiskUtil.saveToDisk(dataId, group, tenant, content);public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {    CacheItem cache = makeSure(groupKey);    if (cache.md5 == null || !cache.md5.equals(md5)) {        cache.md5 = md5;        cache.lastModifiedTs = lastModifiedTs;        EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));    }}

可以看到当配置变更,就会触发fireEvent的LocalDataChangeEvent事件。

到此,关于“Java中怎么用nacos配置中心”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI