这期内容当中小编将会给大家带来有关Guava中RateLimiter的实现原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
// RateLimiter属性
/**
* The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
* object to facilitate testing.
* 用来计时, RateLimiter将实例化的时间设置为0值, 后续都是相对时间
*/
private final SleepingStopwatch stopwatch;
// Can't be initialized in the constructor because mocks don't call the constructor.
// 锁, RateLimiter依赖于synchronized来控制并发, 限流器里面属性都没有加volatile修饰
private volatile @Nullable Object mutexDoNotUseDirectly;
// SmoothRateLimiter属性
/**
* The currently stored permits.
* 当前还有多少permits没有被使用, 被存下来的permits数量
*/
double storedPermits;
/**
* The maximum number of stored permits.
* 最大允许缓存的permits数量, 也就是storedPermits能达到的最大值
*/
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* 每隔多少时间产生一个permit
*/
double stableIntervalMicros;
/**
* 下一次可以获取permits的时间, 这个时间也是相对于RateLimiter的构造时间的
*
* nextFreeTicketMicros是一个很关键的属性.我们每次获取permits的时候,先拿storedPermits的值,
* 如果够,storedPermits减去相应的值就可以了,如果不够,那么还需要将nextFreeTicketMicros往前推,
* 表示我预占了接下来多少时间的量了.那么下一个请求来的时候,如果还没到nextFreeTicketMicros这个时间点,
* 需要sleep到这个点再返回,当然也要将这个值再往前推
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// SmoothBursty默认缓存最多1s的permits, 不可修改, 也就是说最多会缓存1 * permitsPerSecond这么多个permits到池中
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
public final void setRate(double permitsPerSecond) {
// 加锁
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 在关键节点, 会先更新下storedPermits到正确的值
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
// 每隔多少时间产生一个permit
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
// 如果nextFreeTicketMicros已经过掉了, 想象一下很长时间没有再次调用limiter.acquire()的场景
// 需要将nextFreeTicketMicros设置为当前时间, 重新计算下storedPermits
if (nowMicros > nextFreeTicketMicros) {
// 新生成的permits, 构造函数中进来时生成的newPermits为无限大
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 构造函数进来时maxPermits为0, 所以这里的storedPermits也是0
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
@Override
double coolDownIntervalMicros() {
// 构造函数进来时, 此值为0.0
return stableIntervalMicros;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// maxPermits为1秒产生的permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 等比例缩放storedPermits
storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
// 常用api
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
// 预约, 如果当前不能直接获取到permits, 需要等待, 返回值表示需要sleep多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回sleep的时长
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 返回当前线程为了拿这些许可需要睡眠多久, 如果立即可以拿到就不需要睡眠, 否则需要睡到nextFreeTicketMicros
return max(momentAvailable - nowMicros, 0);
}
/**
* 我们可以看到,获取permits的时候,其实是获取了两部分,一部分来自于存量storedPermits,存量不够的话,
* 另一部分来自于预占未来的freshPermits.这里提一个关键点吧,我们看到,返回值是nextFreeTicketMicros
* 的旧值,因为只要到这个时间点,就说明当次acquire可以成功返回了,而不管storedPermits够不够.
* 如果storedPermits不够,会将nextFreeTicketMicros往前推一定的时间,预占了一定的量.
*/
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里做一次同步, 更新storePermits和nextFreeTicketMicros(如果需要)
resync(nowMicros);
// 刚刚已经更新过了
long returnValue = nextFreeTicketMicros;
// storedPermits可以使用多少个
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storePermits不够的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 为了这不够的部分, 需要等待多久.
// SmoothBursty中storedPermitsToWaitTime返回0, 直接就可以取
// SmoothWarmingUp的实现中,由于需要预热,所以从storedPermits中取permits需要花费一定的时间
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 将nextFreeTicketMicros往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 减去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
// SmoothBursty中对于已经存储下来的storedPermits可以直接获取到, 不需要等待
return 0L;
}
public boolean tryAcquire(Duration timeout) {
return tryAcquire(1, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
// 判断一下超时时间内能不能获得到锁, 不能获得直接返回false
if (!canAcquire(nowMicros, timeoutMicros)) {
return false;
} else {
// 可以获得的话再算下为了获得这个许可需要等待多长时间
// 这边上面已经分析过了
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
// 就是看下nowMicros + timeoutMicros >= nextFreeTicketMicros
// 意思就是看下超时时间内有没有达到可以获取令牌的时间
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
public final void setRate(double permitsPerSecond) {
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
// SmoothWamingUp实现
public static RateLimiter create(double permitsPerSecond, Duration warmupPeriod) {
return create(permitsPerSecond, toNanosSaturated(warmupPeriod), TimeUnit.NANOSECONDS);
}
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {
return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());
}
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit,
double coldFactor, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothWarmingUp( SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
super(stopwatch);
// 为了达到从0到maxPermits花费warmupPeriodMicros的时间
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
this.coldFactor = coldFactor;
}
public final void setRate(double permitsPerSecond) {
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 实现和SmoothBursty一样
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
/**
* 含义是storedPermits中每个permit的增长速度
* 为了达到从 0 到 maxPermits 花费 warmupPeriodMicros 的时间
*/
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor是固定的3
// 当达到maxPermits时, 此时系统处于最冷却的时候, 获取一个permit需要coldIntervalMicros
// 而如果storedPermits < thresholPermits的时候, 只需要stableIntervalMicros
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// https://www.fangzhipeng.com/springcloud/2019/08/20/ratelimit-guava-sentinel.html
// https://blog.csdn.net/forezp/article/details/100060686
// 梯形的面积等于2倍的长方形的面积
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 计算斜线的斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
// 等比例缩放
storedPermits = (oldMaxPermits == 0.0) ? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
// storedPermits是上面两个图中最右侧那条绿色线, 表示RateLimiter已经存储了多少许可证, 那么获取storedPermits许可证
// 时应当也是从最右侧开始拿, 从右往左减许可证. 因此就会出现3种情况
// 1) storedPermits已经大于thresholdPermits, 而且所需的许可证permitsToTake右侧部分已经足够提供, 对应上图2
// 2) storedPermits已经大于thresholdPermits, 而且所需的许可证右侧不够, 还需要从左侧拿, 对应上图1
// 3) storedPermits小于thresholdPermits, 此时获取每个许可证所需的时间是固定的, 对应下面if逻辑返回false的情况
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 计算梯形部分的面积
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
double length = permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 计算长方形部分的面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
上述就是小编为大家分享的Guava中RateLimiter的实现原理是什么了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/liwanghong/blog/4536144