本篇文章为大家展示了如何进行SpringCloud Gateway 全链路实现分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
随着微服务架构的流行,服务按照不同的维度进行拆分,一次请求往往需要涉及到多个服务。而诸多的服务可能分布在了几千台服务器,横跨多个不同的数据中心。为了快速定位和解决故障,应用性能进行分析,全链路监控组件就在这样的问题背景下产生了。最出名的是谷歌公开的论文提到的 Google Dapper。想要在这个上下文中理解分布式系统的行为,就需要监控那些横跨了不同的应用、不同的服务器之间的关联动作。
通过业务调用过程中添加并传递调用链ID,实现应用间生成链路数据,最终串联成一条完整的调用链。 其中整个调用过程中每个请求都要透传TxId、SpanId和pSpanId。
作为Spring Cloud官方推出的第二代网关框架,Spring cloud gateway是基于Spring 5.0、Spring Boot2.0和Reactor等技术开发的网关,采用了NIO模型进行通信。
1.2.2 Mono与Flux
Mono表示的是包含 0 或者 1 个元素的异步序列,即要么成功发布元素,要么错误
Flux和Mono之间可以相互转换,比如对一个 Flux 序列进行计数操作,得到的结果是一个 Mono对象,或者把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
Spring Cloud Gateway作为入口网关,主要负责服务的路由转发。如果网关没有进行监控,则全链路会缺失网关节点,直接展示为用户访问后续应用;不能有效定位用户请求慢是网关问题还是后续节点。
3.1.1 Spring Cloud Gateway的请求入口
org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#apply 先将接收到的HttpServerRequest或者最终需要返回的HttpServerResponse包装转换为ReactorServerHttpRequest和ReactorServerHttpResponse,再处理请求。
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) { NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc()); ServerHttpRequest adaptedRequest; ServerHttpResponse adaptedResponse; try { adaptedRequest = new ReactorServerHttpRequest(request, bufferFactory); adaptedResponse = new ReactorServerHttpResponse(response, bufferFactory); } catch (URISyntaxException ex) { logger.error("Invalid URL ">
3.1.2 构造网关上下文
org.springframework.web.server.adapter.HttpWebHandlerAdapter#handle
createExchange()构造网关上下文ServerWebExchange
getDelegate()通过委托的方式获取一系列需要处理的WebHandler
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { ServerWebExchange exchange = createExchange(request, response); return getDelegate().handle(exchange) .onErrorResume(ex -> handleFailure(request, response, ex)) .then(Mono.defer(response::setComplete)); } protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) { return new DefaultServerWebExchange(request, response, this.sessionManager, getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext); }
3.1.3 进入Filter链
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle 获得 GatewayFilter 数组,并根据获得的 GatewayFilter 数组创建DefaultGatewayFilterChain,过滤处理请求。
public Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); AnnotationAwareOrderComparator.sort(combined); logger.debug("Sorted gatewayFilterFactories: "+ combined); return new DefaultGatewayFilterChain(combined).filter(exchange); }
3.1.4 执行Filter链
org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain#filter 过滤器的链式调用
public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); }
3.1.5 Gateway Filter适配器
org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter#filter GatewayFilterAdapter是GlobalFilter过滤器的包装类,最终委托Global Filter进行执行。
private static class GatewayFilterAdapter implements GatewayFilter { private final GlobalFilter delegate; public GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; } public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); } }
3.1.6 Netty路由网关过滤器
org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter
GlobalFilter实现有很多,此处只分析NettyRoutingFIlter和NettyWriteResponseFilter。而NettyRoutingFilter负责使用 Netty HttpClient 代理对下游的请求。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 获得 requestUrl URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); // 判断是否能够处理,http或https前缀 String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } // 设置已经路由 setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); // 创建Netty Request Method对象 final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); final String url = requestUrl.toString(); HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange); final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); // 请求后端服务 return this.httpClient.request(method, url, req -> { final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) .headers(httpHeaders) .chunkedTransfer(chunkedTransfer) .failOnServerError(false) .failOnClientError(false); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); proxyRequest.header(HttpHeaders.HOST, host); } return proxyRequest.sendHeaders() //发送请求头 .send(request.getBody().map(dataBuffer -> // 发送请求Body ((NettyDataBuffer)dataBuffer).getNativeBuffer())); }).doOnNext(res -> { ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); exchange.getAttributes().put("original_response_content_type", headers.getContentType()); HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE); response.getHeaders().putAll(filteredResponseHeaders); HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException("Unable to set status code on response: ">
3.1.7 Netty 回写响应网关过滤器
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter#filter NettyWriteResponseFilter 与NettyRoutingFilter成对出现,负责将代理响应写回网关客户端响应。
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // then方法实现After Filter逻辑 return chain.filter(exchange).then(Mono.defer(() -> { // 获得Netty Response HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); // 将Netty Response回写给客户端 NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux<NettyDataBuffer> body = clientResponse.receive() .retain() // ByteBufFlux => ByteBufFlux .map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer> MediaType contentType = response.getHeaders().getContentType(); return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); }
当最终将网关也进行监控,可以全局的看到应用请求流转,网关的请求业务分流,各应用的调用负载情况。
上述内容就是如何进行SpringCloud Gateway 全链路实现分析,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。