这期内容当中小编将会给大家带来有关spring cloud中Hystrix指标收集原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
上一篇介绍了hystrix熔断降级的基本实现原理,着重点是从hystrix自身的能力方面着手,结合代码,做了整体介绍,那么触发熔断的指标是如何计算的,可能前面会笼统的提到metrics,至于它的metrics实现原理是怎么样的,在本章做重点介绍
官方图示:
对于使用者先构造一个HystrixCommand对象或者HystrixObservalbeCommand
选择queue或者execute,调用者决定是使用异步还是同步方式
根据commandKey看缓存中是否存在Observalbe,开启缓存是为了提升性能,直接返回输出
没有缓存,那就开始走熔断器的逻辑,先判断熔断器是不是开启状态
熔断器开启,触发快速失败,触发降级,去执行用户提供的fallback()逻辑
判断是不是并发超限,超限,触发降级,则发出执行拒绝的异常,去执行用户提供的fallback逻辑
执行用户实现的具体业务逻辑,是否出现执行异常或者超时,异常或超时,则触发降级去执行用户提供的fallback逻辑
执行结束
无论是正常结束还是执行异常,都会触发metrics的收集,收集的结果经过计算后,提供给熔断器,做开启和关闭的决策
这部分我们需要从以下几个方面做分析:指标上报、指标计算、指标使用,这期间会涉及多线程的并发写入、消息的顺序到达、滑动窗口的实现等等
指标上报
每一个请求线程,都会创建一个ExecutionResult实例,这个实例会关联一些基础事件比如开始时间、执行延迟、事件统计等基础信息,也就是在整个hystrix的生命周期里面,会通过指标上报的方式做数据的收集,下面看下数据上报的几个事件:
1.1、executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());//判断断路器未开启,并发未超限,记录执行的开始时间
1.2、executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);//执行成功会增加success的事件和耗时
1.3、HystrixEventType.SHORT_CIRCUITED//断路器打开,会收集快速熔断的事件和耗时
1.4、HystrixEventType.SEMAPHORE_REJECTED//信号量方式并发数超限,会记录该事件和耗时
1.5、HystrixEventType.THREAD_POOL_REJECTED//线程池不可用(并发超限),会记录该事件和耗时
1.6、HystrixEventType.TIMEOUT//执行超时,会收集该事件和耗时
1.7、HystrixEventType.BAD_REQUEST//参数或状态异常,会收集该事件和耗时
以上整体的事件分为两大类,成功和失败,根据用户逻辑代码的执行结果,如果是有异常,收集异常事件和耗时,执行circuitBreaker.markNonSuccess(),否则执行circuitBreaker.markNonSuccess()
另外触发熔断器开启和关闭,有且只有两个途径,如下图:
指标计算
这里简单对各个步骤中涉及到多线程并发的情况以及滑动窗口的计算做一个简单介绍:
2.1:并发(threadLocal&SerializedSubject)
同一个接口收到多个请求时候,也就是这些请求命中的都是同一个commandKey时(统计指标是按照KEY为维度),每个请求都是一个独立的线程,每个线程内会产生多个各种各样的事件,首先同一个线程内的event拼接封装成HystrixCommandCompletion,上报的是一个HystrixCommandCompletion,流计算操作的也是一个个的HystrixCommandCompletion,不存在计算时候把各线程的事件混杂在一起的可能,如何保证的在下面会讲到
2.1.1:上报者是通过threadLocal线程隔离
首先hystrix启动后会创建一个threadLocal,当一个客户端请求不管是正常结束还是异常结束,都要上报上报状态,也就是执行handleCommandEnd,都会从threadLocal中返回一个当前线程的HystrixThreadEventStream,代码如下:
private void handleCommandEnd(boolean commandExecutionStarted) { //省略部分代码 if (executionResultAtTimeOfCancellation == null) { //上报metrics metrics.markCommandDone(executionResult, commandKey, threadPoolKey, commandExecutionStarted); } else { metrics.markCommandDone(executionResultAtTimeOfCancellation, commandKey, threadPoolKey, commandExecutionStarted); } }
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) { //threadLocal中放置的是HystrixThreadEventStream,因为改写了init方法,所以无需set,直接可以获取 HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey); if (executionStarted) { concurrentExecutionCount.decrementAndGet(); } } //从threadLocal中获取事件流 public static HystrixThreadEventStream getInstance() { return threadLocalStreams.get(); } //threadLocal的定义,改写了init方法,所以不用单独调用set private static final ThreadLocal<HystrixThreadEventStream> threadLocalStreams = new ThreadLocal<HystrixThreadEventStream>() { @Override protected HystrixThreadEventStream initialValue() { return new HystrixThreadEventStream(Thread.currentThread()); } }
2.1.2:限流队列
每个线程会有唯一的HystrixThreadEventStream,因为是从theadLocal获取,每个HystrixThreadEventStream都会关联一个由Subject实现的队列,也就是每一个线程都有一个私有的队列,这里说它提供限流是因为采用了‘背压’的原理,所谓的‘背压’是指按需提供,根据消费者的能力去往队列生产,代码如下:
public void executionDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey) { //把executionResult封装成HystrixCommandCompletion,HystrixCommandCompletion是流计算操作的基本单位 HystrixCommandCompletion event = HystrixCommandCompletion.from(executionResult, commandKey, threadPoolKey); //writeOnlyCommandCompletionSubject就是一个通过RXjava实现的限流队列 writeOnlyCommandCompletionSubject.onNext(event); } //省略代码 writeOnlyCommandCompletionSubject .onBackpressureBuffer()//开启'背压功能' .doOnNext(writeCommandCompletionsToShardedStreams)//核心是这个action的call方法 .unsafeSubscribe(Subscribers.empty());
2.2:数据流串行化
每个放入队列的HystrixCommandCompletion,都会执利doOnNext的Action,通过他的call方法去调用HystrixCommandCompletionStream的write方法,相同的commandKey具有同一个HystrixCommandCompletionStream实例,具体是通过currentHashMap做的实例隔离,HystrixCommandCompletionStream内部是通过一个SerializedSubject实现多个HystrixCommandCompletion并行写入的串行化,具体代码逻辑如下:
//限流队列收到数据后会执行call方法,是通过观察者注册了doOnnext事件 private static final Action1<HystrixCommandCompletion> writeCommandCompletionsToShardedStreams = new Action1<HystrixCommandCompletion>() { @Override public void call(HystrixCommandCompletion commandCompletion) { //同一个commandkey对应同一个串行队列的实例,因为同一个commandKey必须要收集该key下所有线程的metrix事件做统计,才能准确 HystrixCommandCompletionStream commandStream = HystrixCommandCompletionStream.getInstance(commandCompletion.getCommandKey()); commandStream.write(commandCompletion);//写入串行队列,这里是核心 if (commandCompletion.isExecutedInThread() || commandCompletion.isResponseThreadPoolRejected()) { HystrixThreadPoolCompletionStream threadPoolStream = HystrixThreadPoolCompletionStream.getInstance(commandCompletion.getThreadPoolKey()); threadPoolStream.write(commandCompletion); } } }; //具体的write方法如下,需要重点关注writeOnlySubject的定义 public void write(HystrixCommandCompletion event) { writeOnlySubject.onNext(event); } //下面是writeOnlySubject的定义,是通过SerializedSubject将并行的写入变为串行化 HystrixCommandCompletionStream(final HystrixCommandKey commandKey) { this.commandKey = commandKey; this.writeOnlySubject = new SerializedSubject<HystrixCommandCompletion, HystrixCommandCompletion>(PublishSubject.<HystrixCommandCompletion>create()); this.readOnlyStream = writeOnlySubject.share(); }
2.3:消费订阅
在hystrixCommand创建的时候,会对HystrixCommandCompletionStream进行订阅,目前有:
healthCountsStream
rollingCommandEventCounterStream
cumulativeCommandEventCounterStream
rollingCommandLatencyDistributionStream
rollingCommandUserLatencyDistributionStream
rollingCommandMaxConcurrencyStream
这几个消费者通过滚动窗口的形式,对数据做统计和指标计算,下面选取具有代表意义的healthCountsStream做讲解:
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) { //统计计算指标的时间间隔-metricsHealthSnapshotIntervalInMilliseconds final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get(); if (healthCountBucketSizeInMs == 0) { throw new RuntimeException("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed"); } //熔断窗口滑动周期,默认10秒,保留10秒内的统计数据,指定窗口期内,有效进行指标计算的次数=metricsRollingStatisticalWindowInMilliseconds/metricsHealthSnapshotIntervalInMilliseconds final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs; return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs); } //继承关系HealthCountStream-》BucketedRollingCounterStream-》BucketedCounterStream //把各事件聚合成桶...省略代码,在BucketedCounterStream完成 this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext .flatMap(reduceBucketToSummary) //for a given bucket, turn it into a long array containing counts of event types .startWith(emptyEventCountsToStart); //start it with empty arrays to make consumer logic as generic as possible (windows are always full) } } //聚合成桶的逻辑代码 public static final Func2<long[], HystrixCommandCompletion, long[]> appendEventToBucket = new Func2<long[], HystrixCommandCompletion, long[]>() { @Override public long[] call(long[] initialCountArray, HystrixCommandCompletion execution) { ExecutionResult.EventCounts eventCounts = execution.getEventCounts(); for (HystrixEventType eventType: ALL_EVENT_TYPES) { switch (eventType) { case EXCEPTION_THROWN: break; //this is just a sum of other anyway - don't do the work here default: initialCountArray[eventType.ordinal()] += eventCounts.getCount(eventType);//对各类型的event做,分类汇总 break; } } return initialCountArray; } }; //生成计算指标,在BucketedRollingCounterStream完成,省略部分代码 this.sourceStream = bucketedStream //stream broken up into buckets .window(numBuckets, 1) //emit overlapping windows of buckets .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary .doOnSubscribe(new Action0() { @Override public void call() { isSourceCurrentlySubscribed.set(true); } }) //计算指标聚合实现,reduceWindowToSummary private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() { @Override public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) { return healthCounts.plus(bucketEventCounts);//重点看该方法 } }; public HealthCounts plus(long[] eventTypeCounts) { long updatedTotalCount = totalCount; long updatedErrorCount = errorCount; long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()]; long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()]; long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()]; long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()]; long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()]; //多个线程的事件,被汇总计算以后,所有的事件相加得到总和 updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); //失败的事件总和,注意只有FAIL+timeoutCount+THREAD_POOL_REJECTED+SEMAPHORE_REJECTED updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount); return new HealthCounts(updatedTotalCount, updatedErrorCount); }
指标使用
指标使用比较简单,用于控制熔断器的关闭与开启,逻辑如下:
public void onNext(HealthCounts hc) { if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { } else { if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { } else { if (status.compareAndSet(Status.CLOSED, Status.OPEN)) { circuitOpened.set(System.currentTimeMillis()); } } } }
上述就是小编为大家分享的spring cloud中Hystrix指标收集原理是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。