小编给大家分享一下Spring Cloud中Zuul重试机制的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
具体内容如下:
开启Zuul功能
通过源码了解Zuul的一次转发
怎么开启zuul的重试机制
Edgware.RC1版本的优化
开启Zuul的功能
首先如何使用spring cloud zuul完成路由转发的功能,这个问题很简单,只需要进行如下准备工作即可:
注册中心(Eureka Server)
zuul(同时也是Eureka Client)
应用服务(同时也是Eureka Client)
我们希望zuul和后端的应用服务同时都注册到Eureka Server上,当我们访问Zuul的某一个地址时,对应其实访问的是后端应用的某个地址,从而从这个地址返回一段内容,并展现到浏览器上。
注册中心(Eureka Server)
创建一个Eureka Server只需要在主函数上添加@EnableEurekaServer,并在properties文件进行简单配置即可,具体内容如下:
@EnableEurekaServer @RestController @SpringBootApplication public class EurekaServerApplication { public static void main(String[] args) { SpringApplication.run(EurekaServerApplication.class, args); } }
server.port=8761 eureka.client.register-with-eureka=false eureka.client.fetch-registry=false
Zuul
主函数添加@EnableZuulProxy注解(因为集成Eureka,需要另外添加@EnableDiscoveryClient注解)。并配置properties文件,具体内容如下所示:
@EnableZuulProxy @EnableDiscoveryClient @SpringBootApplication public class ZuulDemoApplication { /** * 省略代码... */ }
server.port=8081 spring.application.name=ZUUL-CLIENT zuul.routes.api-a.serviceId=EUREKA-CLIENT zuul.routes.api-a.path=/api-a/** eureka.client.service-url.defaultZone=http://localhost:8761/eureka
应用服务
@RestController @EnableEurekaClient @SpringBootApplication public class EurekaClientApplication { public static void main(String[] args) { SpringApplication.run(EurekaClientApplication.class, args); } @RequestMapping(value = "/hello") public String index() { return "hello spring..."; } }
spring.application.name=EUREKA-CLIENT eureka.client.service-url.defaultZone=http://localhost:8761/eureka
三个工程全部启动,这时当我们访问localhost:8081/api-a/hello时,你会看到浏览器输出的内容是hello spring...
通过源码了解Zuul的一次转发
接下来我们通过源码层面来了解下,一次转发内部都做了哪些事情。
首先我们查看Zuul的配置类ZuulProxyAutoConfiguration在这个类中有一项工作是初始化Zuul默认自带的Filter,其中有一个Filter很重要,它就是RibbonRoutingFilter。它主要是完成请求的路由转发。接下来我们看下他的run方法
@Override public Object run() { RequestContext context = RequestContext.getCurrentContext(); try { RibbonCommandContext commandContext = buildCommandContext(context); ClientHttpResponse response = forward(commandContext); setResponse(response); return response; } catch (ZuulException ex) { throw new ZuulRuntimeException(ex); } catch (Exception ex) { throw new ZuulRuntimeException(ex); } }
可以看到进行转发的方法是forward,我们进一步查看这个方法,具体内容如下:
省略部分代码
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception { RibbonCommand command = this.ribbonCommandFactory.create(context); try { ClientHttpResponse response = command.execute(); return response; } catch (HystrixRuntimeException ex) { return handleException(info, ex); } }
ribbonCommandFactory指的是HttpClientRibbonCommandFactory这个类是在RibbonCommandFactoryConfiguration完成初始化的(触发RibbonCommandFactoryConfiguration的加载动作是利用ZuulProxyAutoConfiguration类上面的@Import标签),具体代码如下:
@Configuration @ConditionalOnRibbonHttpClient protected static class HttpClientRibbonConfiguration { @Autowired(required = false) private Set<ZuulFallbackProvider> zuulFallbackProviders = Collections.emptySet(); @Bean @ConditionalOnMissingBean public RibbonCommandFactory<?> ribbonCommandFactory( SpringClientFactory clientFactory, ZuulProperties zuulProperties) { return new HttpClientRibbonCommandFactory(clientFactory, zuulProperties, zuulFallbackProviders); } }
知道了这个ribbonCommandFactory具体的实现类(HttpClientRibbonCommandFactory),接下来我们看看它的create方法具体做了那些事情
@Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { ZuulFallbackProvider zuulFallbackProvider = getFallbackProvider(context.getServiceId()); final String serviceId = context.getServiceId(); final RibbonLoadBalancingHttpClient client = this.clientFactory.getClient( serviceId, RibbonLoadBalancingHttpClient.class); client.setLoadBalancer(this.clientFactory.getLoadBalancer(serviceId)); return new HttpClientRibbonCommand(serviceId, client, context, zuulProperties, zuulFallbackProvider, clientFactory.getClientConfig(serviceId)); }
这个方法按照我的理解主要做了以下几件事情:
@Override public HttpClientRibbonCommand create(final RibbonCommandContext context) { /** *获取所有ZuulFallbackProvider,即当Zuul *调用失败后的降级方法 */ ZuulFallbackProvider = xxxxx /** *创建处理请求转发类,该类会利用 *Apache的Http client进行请求的转发 */ RibbonLoadBalancingHttpClient = xxxxx /** *将降级方法、处理请求转发类、以及其他一些内容 *包装成HttpClientRibbonCommand(这个类继承了HystrixCommand) */ return new HttpClientRibbonCommand(xxxxx); }
到这里我们很清楚的知道了RibbonRoutingFilter类的forward方法中RibbonCommand command = this.ribbonCommandFactory.create(context);这一行代码都做了哪些内容.
接下来调用的是command.execute();方法,通过刚刚的分析我们知道了command其实指的是HttpClientRibbonCommand,同时我们也知道HttpClientRibbonCommand继承了HystrixCommand所以当执行command.execute();时其实执行的是HttpClientRibbonCommand的run方法。查看源码我们并没有发现run方法,但是我们发现HttpClientRibbonCommand直接继承了AbstractRibbonCommand。所以其实执行的是AbstractRibbonCommand的run方法,接下来我们看看run方法里面都做了哪些事情:
@Override protected ClientHttpResponse run() throws Exception { final RequestContext context = RequestContext.getCurrentContext(); RQ request = createRequest(); RS response = this.client.executeWithLoadBalancer(request, config); context.set("ribbonResponse", response); if (this.isResponseTimedOut()) { if (response != null) { response.close(); } } return new RibbonHttpResponse(response); }
可以看到在run方法中会调用client的executeWithLoadBalancer方法,通过上面介绍我们知道client指的是RibbonLoadBalancingHttpClient,而RibbonLoadBalancingHttpClient里面并没有executeWithLoadBalancer方法。(这里面会最终调用它的父类AbstractLoadBalancerAwareClient的executeWithLoadBalancer方法。)
具体代码如下:
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { /** * 创建一个RetryHandler,这个很重要它是用来 * 决定利用RxJava的Observable是否进行重试的标准。 */ RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig); /** * 创建一个LoadBalancerCommand,这个类用来创建Observable * 以及根据RetryHandler来判断是否进行重试操作。 */ LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder() .withLoadBalancerContext(this) .withRetryHandler(handler) .withLoadBalancerURI(request.getUri()) .build(); try { /** *command.submit()方法主要是创建了一个Observable(RxJava) *并且为这个Observable设置了重试次数,这个Observable最终 *会回调AbstractLoadBalancerAwareClient.this.execute() *方法。 */ return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
下面针对于每一块内容做详细说明:
首先getRequestSpecificRetryHandler(request, requestConfig);这个方法其实调用的是RibbonLoadBalancingHttpClient的getRequestSpecificRetryHandler方法,这个方法主要是返回一个RequestSpecificRetryHandler
@Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler(RibbonApacheHttpRequest request, IClientConfig requestConfig) { /** *这个很关键,请注意该类构造器中的前两个参数的值 *正因为一开始我也忽略了这两个值,所以后续给我造 *成一定的干扰。 */ return new RequestSpecificRetryHandler(false, false, RetryHandler.DEFAULT, requestConfig); }
接下来创建LoadBalancerCommand并将上一步获得的RequestSpecificRetryHandler作为参数内容。
最后调用LoadBalancerCommand的submit方法。该方法内容太长具体代码细节就不在这里贴出了,按照我个人的理解,只贴出相应的伪代码:
public Observable<T> submit(final ServerOperation<T> operation) { //相同server的重试次数(去除首次请求) final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); //集群内其他Server的重试个数 final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); /** *创建一个Observable(RxJava),selectServer()方法是 *利用Ribbon选择一个Server,并将其包装成Observable */ Observable<T> o = selectServer().concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { /** *这里会回调submit方法入参ServerOperation类的call方法, */ return operation.call(server).doOnEach(new Observer<T>() {} } } if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { /** *转发请求失败时,会进入此方法。通过此方法进行判断 *是否超过重试次数maxRetrysSame、maxRetrysNext。 */ } }); }
operation.call()方法最终会调用RibbonLoadBalancingHttpClient的execute方法,该方法内容如下:
@Override public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { /** * 组装参数(RequestConfig) */ final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme("https").build().toUri(); request = request.withNewUri(secureUri); } final HttpUriRequest httpUriRequest = request.toRequest(requestConfig); /** * 发送转发请求 */ final HttpResponse httpResponse = this.delegate.execute(httpUriRequest); /** * 返回结果 */ return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); }
可以看到上面方法主要做的就是组装请求参数(包括各种超时时间),然后发起转发请求,最终获取相应结果。
说到这里,zuul转发一次请求的基本原理就说完了。让我们再回顾下整个流程。
zuul的转发是通过RibbonRoutingFilter这个Filter进行操作的。
在转发之前,zuul利用Hystrix将此次转发请求包装成一个HystrixCommand,正应为这样才使得zuul具有了降级(Fallback)的功能,同时HystrixCommand是具备超时时间的(默认是1s)。而且Zuul默认采用的隔离级别是信号量模式。
在HystrixCommand内部zuul再次将请求包装成一个Observable,(有关RxJava的知识请参照其官方文档)。并且为Observable设置了重试次数。
事实真的是这样吗?当我看到源码中为Observable设置重试次数的时候,我以为这就是zuul的重试逻辑。遗憾的是我的想法是错误的。还记得上面我说的getRequestSpecificRetryHandler(request, requestConfig);这个方法吗?(不记得的同学可以回过头来再看下),这个方法返回的是RequestSpecificRetryHandler这个类,而且在创建该类时,构造器的前两个参数都为false。(这一点非常重要)。这两个参数分别是okToRetryOnConnectErrors和okToRetryOnAllErrors。
我原本的想法是这个请求被包装成Observable,如果这次请求因为超时出现异常或者其他异常,这样就会触发Observable的重试机制(RxJava),但是事实并非如此,为什么呢?原因就是上面的那两个参数,当出现了超时异常的时候,在触发重试机制之前会调用RequestSpecificRetryHandler的isRetriableException()方法,该方法的作用是用来判断是否执行重试动作,具体代码如下:
@Override public boolean isRetriableException(Throwable e, boolean sameServer) { //此时该值为false if (okToRetryOnAllErrors) { return true; } else if (e instanceof ClientException) { ClientException ce = (ClientException) e; if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { return !sameServer; } else { return false; } } else { //此时该值为false return okToRetryOnConnectErrors && isConnectionException(e); } }
说道这里zuul转发一次请求的基本原理大概了解了,同时也验证了一个事实就是实现zuul进行重试的逻辑并不是Observable的重试机制。那么问题来了?是什么使zuul具有重试功能的呢?
怎么开启zuul的重试机制
开启Zuul重试的功能在原有的配置基础上需要额外进行以下设置:
在pom中添加spring-retry的依赖(maven工程)
设置zuul.retryable=true(该参数默认为false)
具体properties文件内容如下:
server.port=8081 spring.application.name=ZUUL-CLIENT #路由信息 zuul.routes.api-a.serviceId=EUREKA-CLIENT zuul.routes.api-a.path=/api-a/** #是否开启重试功能 zuul.retryable=true #同一个Server重试的次数(除去首次) ribbon.MaxAutoRetries=3 #切换相同Server的次数 ribbon.MaxAutoRetriesNextServer=0 eureka.client.service-url.defaultZone=http://localhost:8761/eureka
为了模拟出Zuul重试的功能,需要对后端应用服务进行改造,改造后的内容如下:
@RequestMapping(value = "/hello") public String index() { System.out.println("request is coming..."); try { Thread.sleep(100000); } catch (InterruptedException e) { System.out.println("线程被打断... " + e.getMessage()); } return "hello spring ..."; }
通过使用Thread.sleep(100000)达到Zuul转发超时情况(Zuul默认连接超时未2s、read超时时间为5s),从而触发Zuul的重试功能。这时候在此访问localhost:8081/api-a/hello时,查看应用服务后台,会发现最终打印三次"request is coming..."
通过现象看本质,接下来简单介绍下Zuul重试的原理。首先如果你工程classpath中存在spring-retry,那么zuul在初始化的时候就不会创建RibbonLoadBalancingHttpClient而是创建RetryableRibbonLoadBalancingHttpClient具体源代码如下:
@ConditionalOnClass(name = "org.apache.http.client.HttpClient") @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true) public class HttpClientRibbonConfiguration { @Value("${ribbon.client.name}") private String name = "client"; @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate") public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler) { RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient( config, serverIntrospector); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } @Bean @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class) @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") public RetryableRibbonLoadBalancingHttpClient retryableRibbonLoadBalancingHttpClient( IClientConfig config, ServerIntrospector serverIntrospector, ILoadBalancer loadBalancer, RetryHandler retryHandler, LoadBalancedRetryPolicyFactory loadBalancedRetryPolicyFactory) { RetryableRibbonLoadBalancingHttpClient client = new RetryableRibbonLoadBalancingHttpClient( config, serverIntrospector, loadBalancedRetryPolicyFactory); client.setLoadBalancer(loadBalancer); client.setRetryHandler(retryHandler); Monitors.registerObject("Client_" + this.name, client); return client; } }
所以请求到来需要转发的时候(AbstractLoadBalancerAwareClient类中executeWithLoadBalancer方法会调用AbstractLoadBalancerAwareClient.this.execute())其实调用的是RetryableRibbonLoadBalancingHttpClient的execute方法(而不是没有重试时候RibbonLoadBalancingHttpClient的execute方法),源码内容如下:
@Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this.config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this.connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this.readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this.followRedirects)); final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this); RetryCallback retryCallback = new RetryCallback() { @Override public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception { //on retries the policy will choose the server and set it in the context //extract the server and update the request being made RibbonApacheHttpRequest newRequest = request; if(context instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance(); if(service != null) { //Reconstruct the request URI using the host and port set in the retry context newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(), newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(), newRequest.getURI().getPath(), newRequest.getURI().getQuery(), newRequest.getURI().getFragment())); } } if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(newRequest.getUri()) .scheme("https").build().toUri(); newRequest = newRequest.withNewUri(secureUri); } HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig); final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest); if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { if(CloseableHttpResponse.class.isInstance(httpResponse)) { ((CloseableHttpResponse)httpResponse).close(); } throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName, httpResponse.getStatusLine().getStatusCode()); } return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } }; return this.executeWithRetry(request, retryPolicy, retryCallback); }
executeWithRetry方法内容如下:
private RibbonApacheHttpResponse executeWithRetry(RibbonApacheHttpRequest request, LoadBalancedRetryPolicy retryPolicy, RetryCallback<RibbonApacheHttpResponse, IOException> callback) throws Exception { RetryTemplate retryTemplate = new RetryTemplate(); boolean retryable = request.getContext() == null ? true : BooleanUtils.toBooleanDefaultIfNull(request.getContext().getRetryable(), true); retryTemplate.setRetryPolicy(retryPolicy == null || !retryable ? new NeverRetryPolicy() : new RetryPolicy(request, retryPolicy, this, this.getClientName())); return retryTemplate.execute(callback); }
按照我的理解,主要逻辑如下:
@Override public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { /** *创建RequestConfig(请求信息) */ final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this); /** * 创建RetryCallbck的实现类,用来完成重试逻辑 */ RetryCallback retryCallback = new RetryCallback() {}; //创建Spring-retry的模板类,RetryTemplate。 RetryTemplate retryTemplate = new RetryTemplate(); /** *设置重试规则,即在什么情况下进行重试 *什么情况下停止重试。源码中这部分存在 *一个判断,判断的根据就是在zuul工程 *的propertris中配置的zuul.retryable *该参数内容为true才可以具有重试功能。 */ retryTemplate.setRetryPolicy(xxx); /** *发起请求 */ return retryTemplate.execute(callback); }
以上是“Spring Cloud中Zuul重试机制的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。