这篇文章主要介绍“Sentinel如何拦截异常流量”,在日常操作中,相信很多人在Sentinel如何拦截异常流量问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Sentinel如何拦截异常流量”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
各位在家里用电的过程中,一定也经历过「跳闸」。这个「闸」就是在电量超过负荷的时候用来保护我们用电安全的,也被称为「断路器」,还有个响亮的英文名 -- CircuitBreaker。
和用电安全一样,对于「限流」、「降级」、「熔断」...,你我应该也都耳熟能详。我们开发的各类软件、系统、互联网应用等为了不被异常流量压垮,也需要一个断路器。
在 Spring 应用中,使用断路器很方便,我们可以使用 Spring Cloud CircuitBreaker。
Spring Cloud Circuit Breaker 是啥?如果你熟悉 Spring 是什么人的话,你能猜个八九不离十。和Spring Data JPA 这些类似,Spring 他又搞了个抽象的,标准的API 出来。这次他抽象的是关于降级熔断的「断路器」。有了这一层,具体实现是谁可以方便的更换,我们使用的代码里改动基本为0。
我们先来从官方Demo有个初步印象:
@RestControllerpublic class DemoController { private CircuitBreakerFactory circuitBreakerFactory; private HttpBinService httpBin; public DemoController(CircuitBreakerFactory circuitBreakerFactory, HttpBinService httpBinService) { this.circuitBreakerFactory = circuitBreakerFactory; this.httpBin = httpBinService; } @GetMapping("/delay/{seconds}") public Map delay(@PathVariable int seconds) { return circuitBreakerFactory.create("delay").run(httpBin.delaySuppplier(seconds), t -> { Map<String, String> fallback = new HashMap<>(); fallback.put("hello", "world"); return fallback; }); }}
千言万语,总结出来这样一句circuitBreakerFactory.create("delay").run()
因为是抽象,对应的实现就有好多种啦。
目前支持的实现有:
Hystrix
Resilience4j
Sentinel
Spring Retry
而抽象相当于定了个标准,像JDBC一样,无论我们把数据库换成了MySQL,Oracle 还是SQLite,接口等非特定类型的代码都不需要改变。断路器也一样。
这里的断路器工厂,创建方法都是标准的。具体这里执行业务逻辑的时候断路器实现要怎样进行拦截降级,就可以交给具体的实现来完成。
这次,我们以开源的 Sentinel 为例,来看看他们是怎样拦住异常流量的。
首先,因为是Spring Cloud,所以还会基于 Spring Boot 的 Autoconfiguration。以下是配置类,我们看到生成了一个工厂。
public class SentinelCircuitBreakerAutoConfiguration { @Bean @ConditionalOnMissingBean(CircuitBreakerFactory.class) public CircuitBreakerFactory sentinelCircuitBreakerFactory() { return new SentinelCircuitBreakerFactory(); } }
在我们实际代码执行逻辑的时候,create
出来的是什么呢?
是个断路器 CircuitBreaker
,用来执行代码。
public interface CircuitBreaker {
default <T> T run(Supplier<T> toRun) {
return run(toRun, throwable -> {
throw new NoFallbackAvailableException("No fallback available.", throwable);
});
};
<T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);
}
包含两个执行的方法,需要在的时候可以指定fallback
逻辑。具体到 Sentinel 是这样的:
public CircuitBreaker create(String id) { SentinelConfigBuilder.SentinelCircuitBreakerConfiguration conf = getConfigurations() .computeIfAbsent(id, defaultConfiguration); return new SentinelCircuitBreaker(id, conf.getEntryType(), conf.getRules()); }
你会看到创建了一个SentinelCircuitBreaker
。我们的业务逻辑,就会在这个断路器里执行,run
方法就是各个具体实现的舞台。
@Override public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) { Entry entry = null; try { entry = SphU.entry(resourceName, entryType); // If the SphU.entry() does not throw `BlockException`, it means that the // request can pass. return toRun.get(); } catch (BlockException ex) { // SphU.entry() may throw BlockException which indicates that // the request was rejected (flow control or circuit breaking triggered). // So it should not be counted as the business exception. return fallback.apply(ex); } catch (Exception ex) { // For other kinds of exceptions, we'll trace the exception count via // Tracer.trace(ex). Tracer.trace(ex); return fallback.apply(ex); } finally { // Guarantee the invocation has been completed. if (entry != null) { entry.exit(); } } }
OK,到此为止, Spring Cloud CircuitBreaker 已经展现完了。其它的细节都放到了具体实现的「盒子」里。下面我们把这个盒子打开。
Sentinel 是个熔断降级框架,官方这样自我介绍:
面向分布式服务架构的高可用流量控制组件,主要以流量为切入点,从流量控制、熔断降级、系统自适应保护等多个维度来帮助用户保障微服务的稳定性。
官网的这张代码截图简洁的说明了他是怎样工作的
挡在业务代码的前面,有事儿先冲它来,能通过之后才走业务逻辑,和各类闯关还真类似。
在上面 CircuitBreaker 的 run 方法里,咱们一定都注意到了这句
entry = SphU.entry(resourceName, entryType);
这就是一切拦截的秘密。
无论我们是通过前面的CircuitBreaker的方式,还是 @SentinelResource 这种注解形式,还是通过 Interceptor 的方式,没什么本质区别。只是触发点不一样。最后都是通过SphU来搞定。
既然是拦截,那一定要拦下来做这样或那样的检查。
实际检查的时候,entry 里核心代码有这些:
Entry entryWithPriority(ResourceWrapper resourceWrapper, ...) throws BlockException { ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper,...); } catch (BlockException e1) { e.exit(count, args); throw e1; } return e; }
注意这里的ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
会在请求过来处理的时候,如果未初始化处理链,则进行初始化,将各种first,next设置好,后面的请求都会按这个来处理。所有需要拦截的Slot,都会加到这个 chain 里面,再逐个执行 chain 里的 slot。和Servlet Filter 类似。
chain里都加了些啥呢?
public class HotParamSlotChainBuilder implements SlotChainBuilder { public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new ParamFlowSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; }
初始的时候,first 指向一个匿名内部类,这些加进来的slot,会在每次addLast的时候,做为链的next,
AbstractLinkedProcessorSlot<?> end = first;
@Override
public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
protocolProcessor.setNext(first.getNext());
first.setNext(protocolProcessor);
if (end == first) {
end = protocolProcessor;
}
}
@Override
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
end.setNext(protocolProcessor);
end = protocolProcessor;
}
而每个 slot,有自己的特定用处,处理完自己的逻辑之后,会通过 fireEntry 来触发下一个 slot的执行。
给你一张长长的线程调用栈就会过分的明显了:
java.lang.Thread.State: RUNNABLE at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.checkFlow(FlowSlot.java:168) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:161) at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:139) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:39) at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:36) at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:30) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:39) at com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot.entry(ParamFlowSlot.java:33) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:57) at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:50) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:35) at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:29) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:101) at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:47) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot.entry(NodeSelectorSlot.java:171) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain$1.entry(DefaultProcessorSlotChain.java:31) at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40) at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain.entry(DefaultProcessorSlotChain.java:75) at com.alibaba.csp.sentinel.CtSph.entryWithPriority(CtSph.java:148) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:347) at com.alibaba.csp.sentinel.CtSph.entryWithType(CtSph.java:340) at com.alibaba.csp.sentinel.SphU.entry(SphU.java:285)
降级有三种类型
每种类型,都会根据对应的配置项数据比对,不符合就中断,中断之后也不能一直断着,啥时候再恢复呢?就根据配置的时间窗口,会启动一个恢复线程,到时间就会调度,把中断标识恢复。
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { if (cut.get()) { return false; } ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); if (clusterNode == null) { return true; } if (grade == RuleConstant.DEGRADE_GRADE_RT) { double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true; } // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { double exception = clusterNode.exceptionQps(); double success = clusterNode.successQps(); double total = clusterNode.totalQps(); // If total amount is less than minRequestAmount, the request will pass. if (total < minRequestAmount) { return true; } // In the same aligned statistic time window, // "success" (aka. completed count) = exception count + non-exception count (realSuccess) double realSuccess = success - exception; if (realSuccess <= 0 && exception < minRequestAmount) { return true; } if (exception / success < count) { return true; } } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { double exception = clusterNode.totalException(); if (exception < count) { return true; } } if (cut.compareAndSet(false, true)) { ResetTask resetTask = new ResetTask(this); pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); } return false; }
恢复做了两件事:一、把passCount设置成0,二、中断标识还原
上面介绍了对请求的拦截处理,这其中最核心的,也就是我们最主要配置的,一个是「流控」,一个是「降级」。这两个对应的Slot,会在处理请求的时候,根据配置好的 「规则」rule 来判断。比如我们上面看到的时间窗口、熔断时间等,以及流控的线程数,QPS数这些。
这些规则默认的配置在内存里,也可以通过不同的数据源加载进来。同时启用了Sentinel 控制台的话,在控制台 也可以配置规则。这些规则,会通过 HTTP 发送给对应使用了 sentinel 的应用实例节点。
收到更新内容后,实例里的「规则管理器」会重新加载一次 rule,下次请求处理就直接生效了。
到此,关于“Sentinel如何拦截异常流量”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。