这期内容当中小编将会给大家带来有关Guava中RateLimiter的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
// RateLimiter属性 /** * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate * object to facilitate testing. * 用来计时, RateLimiter将实例化的时间设置为0值, 后续都是相对时间 */ private final SleepingStopwatch stopwatch; // Can't be initialized in the constructor because mocks don't call the constructor. // 锁, RateLimiter依赖于synchronized来控制并发, 限流器里面属性都没有加volatile修饰 private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter属性 /** * The currently stored permits. * 当前还有多少permits没有被使用, 被存下来的permits数量 */ double storedPermits; /** * The maximum number of stored permits. * 最大允许缓存的permits数量, 也就是storedPermits能达到的最大值 */ double maxPermits; /** * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits * per second has a stable interval of 200ms. * 每隔多少时间产生一个permit */ double stableIntervalMicros; /** * 下一次可以获取permits的时间, 这个时间也是相对于RateLimiter的构造时间的 * * nextFreeTicketMicros是一个很关键的属性.我们每次获取permits的时候,先拿storedPermits的值, * 如果够,storedPermits减去相应的值就可以了,如果不够,那么还需要将nextFreeTicketMicros往前推, * 表示我预占了接下来多少时间的量了.那么下一个请求来的时候,如果还没到nextFreeTicketMicros这个时间点, * 需要sleep到这个点再返回,当然也要将这个值再往前推 */ private long nextFreeTicketMicros = 0L; // could be either in the past or future
public static RateLimiter create(double permitsPerSecond) { return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { // SmoothBursty默认缓存最多1s的permits, 不可修改, 也就是说最多会缓存1 * permitsPerSecond这么多个permits到池中 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) { super(stopwatch); this.maxBurstSeconds = maxBurstSeconds; } public final void setRate(double permitsPerSecond) { // 加锁 synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { // 在关键节点, 会先更新下storedPermits到正确的值 resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; // 每隔多少时间产生一个permit this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } void resync(long nowMicros) { // if nextFreeTicket is in the past, resync to now // 如果nextFreeTicketMicros已经过掉了, 想象一下很长时间没有再次调用limiter.acquire()的场景 // 需要将nextFreeTicketMicros设置为当前时间, 重新计算下storedPermits if (nowMicros > nextFreeTicketMicros) { // 新生成的permits, 构造函数中进来时生成的newPermits为无限大 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 构造函数进来时maxPermits为0, 所以这里的storedPermits也是0 storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } } @Override double coolDownIntervalMicros() { // 构造函数进来时, 此值为0.0 return stableIntervalMicros; }
@Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; // maxPermits为1秒产生的permits maxPermits = maxBurstSeconds * permitsPerSecond; if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = maxPermits; } else { // 等比例缩放storedPermits storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } }
// 常用api @CanIgnoreReturnValue public double acquire() { return acquire(1); } @CanIgnoreReturnValue public double acquire(int permits) { // 预约, 如果当前不能直接获取到permits, 需要等待, 返回值表示需要sleep多久 long microsToWait = reserve(permits); // sleep stopwatch.sleepMicrosUninterruptibly(microsToWait); // 返回sleep的时长 return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); synchronized (mutex()) { return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { long momentAvailable = reserveEarliestAvailable(permits, nowMicros); // 返回当前线程为了拿这些许可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否则需要睡到nextFreeTicketMicros return max(momentAvailable - nowMicros, 0); } /** * 我们可以看到,获取permits的时候,其实是获取了两部分,一部分来自于存量storedPermits,存量不够的话, * 另一部分来自于预占未来的freshPermits.这里提一个关键点吧,我们看到,返回值是nextFreeTicketMicros * 的旧值,因为只要到这个时间点,就说明当次acquire可以成功返回了,而不管storedPermits够不够. * 如果storedPermits不够,会将nextFreeTicketMicros往前推一定的时间,预占了一定的量. */ @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { // 这里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要) resync(nowMicros); // 刚刚已经更新过了 long returnValue = nextFreeTicketMicros; // storedPermits可以使用多少个 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // storePermits不够的部分 double freshPermits = requiredPermits - storedPermitsToSpend; // 为了这不够的部分, 需要等待多久. // SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取 // SmoothWarmingUp的实现中,由于需要预热,所以从storedPermits中取permits需要花费一定的时间 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); // 将nextFreeTicketMicros往前推 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 减去被拿走的部分 this.storedPermits -= storedPermitsToSpend; return returnValue; } @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { // SmoothBursty中对于已经存储下来的storedPermits可以直接获取到, 不需要等待 return 0L; }
public boolean tryAcquire(Duration timeout) { return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS); } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) { long timeoutMicros = max(unit.toMicros(timeout), 0); checkPermits(permits); long microsToWait; synchronized (mutex()) { long nowMicros = stopwatch.readMicros(); // 判断一下超时时间内能不能获得到锁, 不能获得直接返回false if (!canAcquire(nowMicros, timeoutMicros)) { return false; } else { // 可以获得的话再算下为了获得这个许可需要等待多长时间 // 这边上面已经分析过了 microsToWait = reserveAndGetWaitLength(permits, nowMicros); } } stopwatch.sleepMicrosUninterruptibly(microsToWait); return true; } private boolean canAcquire(long nowMicros, long timeoutMicros) { // 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros // 意思就是看下超时时间内有没有达到可以获取令牌的时间 return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; } @Override final long queryEarliestAvailable(long nowMicros) { return nextFreeTicketMicros; }
public final void setRate(double permitsPerSecond) { synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); }
// SmoothWamingUp实现 public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) { return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS); } public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer()); } @VisibleForTesting static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor, SleepingStopwatch stopwatch) { RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) { super(stopwatch); // 为了达到从0到maxPermits花费warmupPeriodMicros的时间 this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod); this.coldFactor = coldFactor; }
public final void setRate(double permitsPerSecond) { synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } @Override final void doSetRate(double permitsPerSecond, long nowMicros) { // 实现和SmoothBursty一样 resync(nowMicros); double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } /** * 含义是storedPermits中每个permit的增长速度 * 为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间 */ @Override double coolDownIntervalMicros() { return warmupPeriodMicros / maxPermits; } @Override void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = maxPermits; // coldFactor是固定的3 // 当达到maxPermits时, 此时系统处于最冷却的时候, 获取一个permit需要coldIntervalMicros // 而如果storedPermits < thresholPermits的时候, 只需要stableIntervalMicros double coldIntervalMicros = stableIntervalMicros * coldFactor; // https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html // https://blog.csdn.net/forezp/article/details/100060686 // 梯形的面积等于2倍的长方形的面积 thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros; maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros); // 计算斜线的斜率 slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits); if (oldMaxPermits == Double.POSITIVE_INFINITY) { // if we don't special-case this, we would get storedPermits == NaN, below storedPermits = 0.0; } else { // 等比例缩放 storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold : storedPermits * maxPermits / oldMaxPermits; } }
// storedPermits是上面两个图中最右侧那条绿色线, 表示RateLimiter已经存储了多少许可证, 那么获取storedPermits许可证 // 时应当也是从最右侧开始拿, 从右往左减许可证. 因此就会出现3种情况 // 1) storedPermits已经大于thresholdPermits, 而且所需的许可证permitsToTake右侧部分已经足够提供, 对应上图2 // 2) storedPermits已经大于thresholdPermits, 而且所需的许可证右侧不够, 还需要从左侧拿, 对应上图1 // 3) storedPermits小于thresholdPermits, 此时获取每个许可证所需的时间是固定的, 对应下面if逻辑返回false的情况 @Override long storedPermitsToWaitTime(double storedPermits, double permitsToTake) { double availablePermitsAboveThreshold = storedPermits - thresholdPermits; long micros = 0; // 计算梯形部分的面积 if (availablePermitsAboveThreshold > 0.0) { double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake); double length = permitsToTime(availablePermitsAboveThreshold) + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake); micros = (long) (permitsAboveThresholdToTake * length / 2.0); permitsToTake -= permitsAboveThresholdToTake; } // 计算长方形部分的面积 micros += (long) (stableIntervalMicros * permitsToTake); return micros; }
上述就是小编为大家分享的Guava中RateLimiter的实现原理是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。