本篇内容主要讲解“Dubbo服务导出到本地的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Dubbo服务导出到本地的方法”吧!
context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); // 删除了一些步骤 public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { try { // 1. 里面的核心代码就是初始化了applicationEventMulticaster,用于后面发布事件使用 // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); initApplicationEventMulticaster(); // 2. 初始化非延迟加载的bean,这里就会初始化dubbo配置的一些bean,包括ServiceBean,用于服务导出 finishBeanFactoryInitialization(beanFactory); // 3. 发布容器刷新事件,这里面是服务导出的入口 finishRefresh(); } } }
// 步骤2分析 // 这里Spring容器会初始化非延迟加载的bean,包括<dubbo:service/>表示的bean // <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean时会创建ServiceBean对象,由于ServiceBean实现了 // ApplicationContextAware接口,所以Spring容器会先调用setApplicationContext给其注入Spring容器 class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 给SpringExtensionFactory注入Spring容器 SpringExtensionFactory.addApplicationContext(applicationContext); if (applicationContext != null) { SPRING_CONTEXT = applicationContext; try { // method是addListener方法,调用该方法用于给applicationEventMulticaster // 添加listener method.invoke(applicationContext, new Object[]{this}); supportedApplicationListener = true; } } } }
// 步骤3分析,发布相关事件,这里会发布容器刷新事件 finishRefresh(); protected void finishRefresh() { initLifecycleProcessor(); getLifecycleProcessor().onRefresh(); // 1). 发布容器刷新事件,ServiceBean监听的就是该事件 // ServiceBean implements ApplicationListener<ContextRefreshedEvent> publishEvent(new ContextRefreshedEvent(this)); LiveBeansView.registerApplicationContext(this); } // 2). 步骤1会走到这里,这里会获取之前的applicationEventMulticaster,用于发布事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); // 3). 走到这里,listener就是之前调用 method.invoke(applicationContext, new Object[]{this}); // 加进去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件 doInvokeListener(listener, event); // 4) 走到这里,最终调用ServiceBean实现的onApplicationEvent方法 listener.onApplicationEvent(event);
这样,就走到了Dubbo暴露服务的入口
的方法。这也是Dubbo官方文档中提及的入口方法,参考 服务导出
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服务没有被暴露并且服务没有被取消暴露,则打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 导出服务 export(); } }
接下来研究一下 Dubbo 导出服务的过程。Dubbo 服务导出过程始于Spring容器发布刷新事件,Dubbo在接收到事件后,会立即执行服务导出逻辑。整个逻辑大致可分为三个部分,第一部分是前置工作,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。下面将会对这三个部分代码进行详细的分析。
服务导出的入口方法是ServiceBean的onApplicationEvent
。onApplicationEvent 是一个事件响应方法,该方法会在收到Spring上下文刷新事件后执行服务导出操作。方法代码如下
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服务没有被暴露并且服务没有被取消暴露,则打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 导出服务 export(); } }
这个方法首先会根据条件决定是否导出服务,比如有些服务设置了延时导出,那么此时就不应该在此处导出。还有一些服务已经被导出了,或者当前服务被取消导出了,此时也不能再次导出相关服务。注意这里的 isDelay 方法,这个方法字面意思是“是否延迟导出服务”,返回 true 表示延迟导出,false 表示不延迟导出。但是该方法真实意思却并非如此,当方法返回 true 时,表示无需延迟导出。返回 false 时,表示需要延迟导出。与字面意思恰恰相反,这个需要大家注意一下。 前置工作主要包含两个部分,分别是配置检查,以及 URL 装配。在导出服务之前,Dubbo 需要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来需要根据这些配置组装 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作为配置载体,所有的拓展点都是通过 URL 获取配置。这一点,官方文档中有所说明。下面的export方法会走到doExport()
方法。
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
以下是配置检查的相关分析,代码比较多,需要大家耐心看一下。下面对配置检查的逻辑进行简单的总结,如下:
检测 dubbo:service 标签的 interface 属性合法性,不合法则抛出异常
检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其他配置类对象中获取相应的实例。
检测并处理泛化服务和普通服务类
检测本地存根配置,并进行相应的处理
对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试创建,若无法创建则抛出异常配置检查并非本文重点,因此这里不打算对 doExport 方法所调用的方法进行分析(doExportUrls 方法除外)。在这些方法中,除了appendProperties方法稍微复杂一些,其他方法逻辑不是很复杂。因此,大家可自行分析。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { // 抛异常 } checkDefault(); if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 检测ref是否为泛化服务类型 if (ref instanceof GenericService) { // 设置interfaceClass为GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 设置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 获得接口类型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 对interfaceClass,以及<dubbo:method>标签中的必要字段进行检查 checkInterfaceAndMethods(interfaceClass, methods); // 对ref合法性进行检测 checkRef(); generic = Boolean.FALSE.toString(); } // stub local一样都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } } checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); // 本地存根、mock合法性校验 checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 核心代码,暴露服务、注册逻辑就在其中 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
Dubbo 允许我们使用不同的协议导出服务,也允许我们向多个注册中心注册服务。Dubbo在doExportUrls方法中对多协议,多注册中心进行了支持。相关代码如下
/** * 多协议多注册中心暴露服务进行支持 */ private void doExportUrls() { // 加载注册中心链接 List<URL> registryURLs = loadRegistries(true); // 遍历protocols,并在每个协议下暴露 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代码首先是通过loadRegistries加载注册中心链接,然后再遍历ProtocolConfig集合导出每个服务。并在导出服务的过程中,将服务注册到注册中心。我们先来看一下loadRegistries方法的逻辑。先可以打开看下该方法可以得到什么。
<!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use multicast registry center to export service --> <dubbo:registry address="zookeeper://10.101.99.127:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="dubbo" port="20880"/> <dubbo:provider server="netty4"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // 如果registries为空,直接返回空集合 if (registries != null && !registries.isEmpty()) { // 遍历注册中心配置集合registries for (RegistryConfig config : registries) { // 获得地址 String address = config.getAddress(); // 若地址为空,则设置为0.0.0.0 if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 如果地址为N/A,则跳过 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加ApplicationConfig中的字段信息到map中 appendParameters(map, application); // 添加RegistryConfig字段信息到map中 appendParameters(map, config); // 添加path、协议版本 map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 如果map中没有protocol,则默认为使用dubbo协议 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析得到URL列表,address可能包含多个注册中心ip,因此解析得到的是一个URL列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍历URL 列表 for (URL url : urls) { // 将URL协议头设置为registry url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 这里将协议设置为了registry,这也是后面调用的是RegistryProtocol的export()方法原因 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 通过判断条件,决定是否添加url到registryList中,条件如下: // 如果是服务提供者,并且是注册中心服务或者是消费者端,并且是订阅服务,则加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
ProtocolConfig主要封装了<dubbo:protocol name="dubbo" port="20880"/>标签的信息,意思是使用Dubbo协议暴露服务。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 获取协议名 String name = protocolConfig.getName(); // 如果为空,则是默认的dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 设置服务提供者侧 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 这段代码其实完成了子节点配置信息对父节点的覆盖 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // 如果method的配置列表不为空 if (methods != null && !methods.isEmpty()) { // 遍历method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig对象的字段信息到map中,键=方法名.属性名 // 比如存储<dubbo:method name="sayHello" retries="2">对应的MethodConfig, // 键=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 如果retryValue为false,则不重试,设置值为0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } }
if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 暴露到远程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // 后面分析 } } this.urls.add(url); }
前置工作做完,接下来就可以进行服务导出了。服务导出分为导出到本地(JVM)和导出到远程。
// 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } private void exportLocal(URL url) { // 如果协议不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分别把协议改为injvm,设置host和port URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST).setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 通过代理工程创建invoker // 再调用export方法进行暴露服务,生成Exporter // 这里的protocol是生成的拓展代理对象,具体可看https://segmentfault.com/a/1190000020384210 // 它是在运行时才根据URL中的protocol参数去决定运行哪个Protocol实例的export方法,这里由于前面 // setProtocol(Constants.LOCAL_PROTOCOL),所以调用的是InjvmProtocol的export方法 Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); } }
下面两个是url和local的具体值,因为Dubbo采用自适应拓展机制,exportLocal(URL url)中用到的protocol是自适应拓展,protocol的export方法会用到URL中protocol参数从而决定具体生成protocol的哪个实例,所以URL的protocol值可以关注下。
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
下面分析下面这句代码。它是核心方法,分为两步。
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local) -> 返回invoker 2) protocol.export(invoker)
// 步骤1)分析 // proxyFactory也是自适应拓展代理带,它默认使用JavassistProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 这里调用的就是JavassistProxyFactory的getInvoker方法 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 创建Wrapper对象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 创建匿名Invoker类对象,并实现doInvoke方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable { // 调用Wrapper的invokeMethod方法,invokeMethod最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
在 Dubbo 中,Invoker是一个非常重要的模型
。在服务提供端,以及服务引用端均会出现Invoker。Dubbo 官方文档中对Invoker进行了说明,这里引用一下。Invoker是实体域,它是Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用
,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。这里面getInvoker方法创建了一个匿名Invoker对象,我理解是通过invoke实行远程调用时,会走wrapper.invokeMethod方法,而wrapper实际上是一个代理类,调用wrapper.invokeMethod最终会走proxy,也就是DemoService的sayHello方法。Wrapper创建比较复杂,可以参考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析。
// 步骤2分析,调用的是InjvmProtocol的export方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 该方法只是创建了一个,因为暴露到本地,所以在同一个jvm中,所以不需要其他操作 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
到此,相信大家对“Dubbo服务导出到本地的方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。