常见限流算法与开源实现

随着微服务的流行,服务之间的稳定性也变得越来越重要。在一个复杂的服务网络中,如果一个服务节点被流量打垮,那么很可能会造成其他多个服务都不可用的连锁事件。因此在微服务中,如何把流量控制在系统能够承受的范围内,不让过大的流量直接把服务打垮,是确保系统稳定性的重要部分。

限流是一种常见的控制流量的方法。当流量超过服务处理能力时,超过的部分会被限流组件拦截。被拦截的请求可能被丢弃,也可能做降级处理,总之,都会比直接打崩系统好。常见的限流算法有:漏桶法、令牌桶和滑动窗口,分别适用于不同的场景。除此之外,还有些开源组件中的限流实现,也会在下文一并讲解。

漏桶法

漏桶算法概念如下:

  • 请求到来时,将每个请求放入「漏桶」进行存储;
  • 「漏桶」以固定速率向外「漏」出请求来执行;
  • 如果请求进入的速率大于「漏」出的速率,则「漏桶」中的请求会越来越多;当「漏桶」满了的时候,多余的请求会被丢弃。

漏桶算法多使用队列实现,服务的请求会存到队列中,服务的提供方则按照固定的速率从队列中取出请求并执行,过多的请求则会被丢弃。

漏桶算法的特点是流量的消费速率取决于「漏桶」向外「漏」水的速率,这种方式的好处是能保证系统的压力处于一个比较平稳的水位,缺点是当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

令牌桶

令牌桶算法概念如下:

  • 令牌以固定速率生成,生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃;
  • 当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行,取不到令牌的请求会被丢弃。

令牌桶算法的特点是会将用不到的令牌先存起来,这种方式的好处是能应对突发流量,缺点是流量进入的速率无法被控制,短期内系统的压力可能会有比较大的变化。

滑动窗口

漏桶法和令牌桶能够控制某个时间周期内进入系统的最大请求数,但会面临一个临界值的问题:假设我们将漏桶法和令牌桶的时间周期设置为 1s,那么这两者可以确保 [0s,1s) 和 [1s,2s) 这两个时间周期内通过的请求数,但无法确保 [0.2s,1.2s) 内的请求数。要控制任意一个连续时间内通过的请求数,就需要使用滑动窗口算法:

我们可以看上图,整个 1s 被划分成 5 个格子,每个格子代表 200ms。[0.8s,1s) 到达的 80 个请求落在蓝色的格子中,而 [1s,1.2s) 到达的请求会落在红色的格 子中。当时间到达 1s 时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是 150 个,超过了限定的 100 个,所以超出的 50 个请求会被丢弃。

从上面可以看出,当滑动窗口的格子划分的越多,那么滑动窗口的滑动就越平滑,限流的统计就会越精确。

Hystrix 中的限流

Hystrix 是 Netflix 开源的一款容错系统,可以帮助我们的应用系统提高稳定性。限流是 Hystrix 中非常重要的一个部分,Hystrix 中提供了两种限流方式,分别是线程池信号量。Hystrix 使用命令模式将被限流代码的业务逻辑和调用逻辑解耦,框架已经实现了使用线程池限流和使用信号量限流的调用逻辑,使用者只需要专注业务逻辑即可。

使用时先定义一个类继承HystrixCommand,实现其抽象方法run(),在该方法中写入需要被限流的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HelloCommand extends HystrixCommand<String> {
private String userId;

protected HelloCommand(Setter setter, String userId) {
super(setter);
// 通过这种方式传入业务参数,每次调用都要创建一个实例
this.userId = userId;
}

@Override
protected String run() throws Exception {
// 将需要被限流的代码写在这里
System.out.println("hello user:" + userId);
return "ok";
}
}

然后在主函数中配置限流的参数,调用HystrixCommandexecute()方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
// 使用线程池限流的配置
Setter threadSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-1"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用线程池限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
// 设置核心线程数量为 5
.withCoreSize(5)
// 设置最大线程数量为 10
.withMaximumSize(10)
// 设置最大排队长度为 20,该值即 blockingQueue 的大小,创建后不能改动
.withMaxQueueSize(20)
// 设置排队长度,超过排队长度时会拒绝,该值可以动态调整
.withQueueSizeRejectionThreshold(15)
);
// 使用信号量限流的配置
Setter semaphoreSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-2"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用信号量限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// 设置最大并发量为 10,同时请求超过 10 个会拒绝
.withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
);
// 传入配置和业务参数
HelloCommand threadHelloCommand = new HelloCommand(threadSetter, "user1");
HelloCommand semaphoreHelloCommand = new HelloCommand(semaphoreSetter, "user2");
// 使用线程池限流的方式执行
threadHelloCommand.execute();
// 使用信号量限流的方式执行
semaphoreHelloCommand.execute();

}

  • 使用线程池的方式进行限流时,会先在线程池中创建若干线程,被限流的代码会被放进线程池中执行,如果线程满了,就会触发限流;
  • 使用信号量的方式进行限流时,会维护一个信号量,当进入被限流的方法时,信号量加 1,出被限流的方法时,信号量减 1,信号量超过阈值就会触发限流。

下面来看 Hystrix 中两种限流方式的源码实现。

信号量限流

Hystrix 底层使用 Rxjava 编写,因此会有很多陌生的概念比如 Observable、Subscriber 等,我会在下文简单介绍一下这些概念,没弄懂也没关系,因为这些概念不影响我们阅读限流方面的代码。

这是 Hystrix 的一次完整调用的链路(译自官方,有所改动)。上述的 demo 中我们直接调用了 execute() 方法,所以调用的路径为.execute() -> .queue() -> .toObservable() -> .toBlocking() -> .toFuture() -> .get()。这里的.xxx()都是 HystrixCommand 的方法,含义如下:

  • execute()会对 HystrixCommand 的 run() 方法进行一次阻塞调用,返回调用结果或者抛出错误;
  • queue()会返回一个 Future 对象,hystrix 会把执行后的结果放在里面,本质上执行.execute()就是在执行.queue().get()
  • toObservable()会把一个 HystrixCommand 对象转化成一个 Cold Observable 对象。Observable 是一个可观察对象,可以理解为一个数据发射器。Observable 分为 Cold 和 Hot 两种,Cold Observable 只有在被订阅者(Subscriber)订阅时才会发射数据,且每次订阅都会发送全量数据;Hot Observable 不需要订阅者就可以发射数据,订阅之后只会收到后续的数据。打个比方,Cold Observable 相当于一张 CD,Hot Observable 相当于一个电台;
  • toBlocking()会把一个 Observable 对象转化成一个 BlockingObservable 对象。当 BlockingObservable 被订阅时,会通过 CountDownLatch 和 BlockingQueue 控制,将订阅者一直阻塞在主线程直到特定的数据到来;
  • toFuture()是 BlockingObservable 的一个方法,它会触发订阅,同时它是一个单数据订阅者,它只对发射 1 个数据的 BlockingObservable 生效,返回一个 Future 用于存放数据。当 BlockingObservable 发射多个或 0 个数据时,该方法会报错。本质上执行.queue().get()就是在执行.toObservable().toBlocking().toFuture().get()

本质上,hystrix 的执行流程就是一次订阅的过程,hystrix 在整个订阅的链路中添加了缓存、断路、限流、容错、降级的逻辑,其中限流的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
AbstractCommand
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//...
// 判断断路器是否打开
if (circuitBreaker.allowRequest()) {
// 获取信号量对象
final AbstractCommand.TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 执行用户代码
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 未获取信号量,执行降级逻辑
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器打开,执行降级逻辑
return handleShortCircuitViaFallback();
}
}

AbstractCommand
protected AbstractCommand.TryableSemaphore getExecutionSemaphore() {
// 判断限流策略是否是信号量
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
// 从缓存中获取,这里缓存的 key 默认情况下和 Command 的类名相关,同一个类的所有实例共用一个信号量
AbstractCommand.TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// 创建信号量,并放入缓存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new AbstractCommand.TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
// 限流策略非信号量时,返回 DEFAULT
return AbstractCommand.TryableSemaphoreNoOp.DEFAULT;
}
}

从源码可以看到,在一个请求要执行我们在 HystrixCommand 中自定义的代码前,会先请求信号量,默认情况下同一个 HystrixCommand 类的所有实例共用一个信号量。在创建信号量时,会根据限流策略是『信号量限流』还是『线程池限流』创建不同类型的信号量。当限流策略是『信号量』时,创建的信号量的核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
AbstractCommand
static class TryableSemaphoreActual implements TryableSemaphore {
// 并发请求的上限
protected final HystrixProperty<Integer> numberOfPermits;
// 当前请求的数量
private final AtomicInteger count = new AtomicInteger(0);
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
// 判断当前请求数量是否大于请求上限
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
@Override
public void release() {
// 释放信号量
count.decrementAndGet();
}
}

从信号量的获取和释放可以看出,主要是使用 AtomicInteger 的原子操作来进行信号量的获取和释放。

线程池限流

上面是当限流策略是『信号量』时的情况,而当限流策略是『线程池』时,会返回一个 DEFAULT 信号量,这个信号量比较特殊,它的 tryAcquire() 方法返回值恒为 true, release() 方法是空方法:

1
2
3
4
5
6
7
8
9
10
11
AbstractCommand
static class TryableSemaphoreNoOp implements TryableSemaphore {
public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
@Override
public boolean tryAcquire() {
return true;
}
@Override
public void release() {}
}
}

这符合我们的预期,因为当限流策略为『线程池』时,不需要在信号量中安排实际的限流逻辑。那么线程池这种限流策略的实现又在哪里呢?答案在创建 Observable 的逻辑里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
AbstractCommand
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 隔离策略为线程池
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
}
})
// 当这个 Observable 被订阅时,会从 threadPool 中选择一个线程来执行订阅行为
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 线程中断的条件
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT;
}
}));
} else {
// 隔离策略不为线程池时,在原线程执行
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
}
});
}
}

从源码中我们可以看到,当限流策略为线程池时,Hystrix 会从内部的线程池 threadPool 中选择一个线程来执行订阅行为。和信号量一样,线程池在默认情况下也是和类型关联,一个 HystrixCommand 类的所有实例共用一个线程池。当并发请求数超过线程数+等待队列大小的时候,线程池会抛异常,这个异常会在外层被捕捉到,触发降级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
AbstractCommand
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//...
// 执行降级的逻辑
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// 接收到的内部抛出的异常
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
// 被抛异常为线程池拒绝异常时,触发降级逻辑
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
}
// 被抛异常为执行超时异常时,也会触发降级逻辑
else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//...
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
// 接收内部抛出的异常,执行降级逻辑
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

源码中我们可以看到,当线程池池拒绝抛出异常时,这个异常会在外部被捕捉到,触发降级。另外,除了线程池拒绝这种情况外,执行超时也会触发降级(只有在线程池限流下才可以设置超时时间,信号量限流时不支持设置超时时间;且 Hystrix 中超时的实现比较特别,并不是 future.get(timeout) 这种方式,而是通过定时任务定时中断的方式,个人猜测可能是出于到在主线程挂掉后仍能中断超时任务的考虑)。

Guava 中的限流

Guava 是一个 Google 开发的基于 Java 的扩展项目,内部提供了各种实用的工具。RateLimiter 是 Guava 提供的用于限流的工具。RateLimiter 基于令牌桶算法,可以有效限定单个 JVM 实例上某个接口的流量。

RateLimiter 使用的一个例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RateLimiterExample {
public static void main(String[] args) throws Exception {
// QPS 设置为 5,代表一秒钟只允许处理五个请求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger();
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
// 向令牌桶请求令牌
rateLimiter.acquire(1);
latch.countDown();
System.out.println("第 "+count.incrementAndGet()+" 个请求完成," +
"距离开始时间 "+(System.currentTimeMillis()-startTime)+" ms");
});
}
latch.await();
System.out.println("共计用时:" + (System.currentTimeMillis() - startTime)+"ms");
executorService.shutdown();
}
}

运行结果:

从运行结果可以看到,在 qps 设置为 5 的条件下,10 个请求共计花费 1800ms。另外一点值得注意的是,每个请求之间的间隔十分均匀,都在 200ms 左右。之所以会发生这种情况,主要是得益于 Guava 实现的平滑限流算法。该算法的核心实现位于 SmoothRateLimiter 这个类下,这是一个基于令牌桶实现的限流工具类:令牌会被匀速地补充到桶内,当一个请求过来时,通过 acquire() 方法从桶中取得令牌,如果此时桶中没有足够令牌,则根据不足的令牌数量和补充令牌的速度计算出需要等待的时间,当前请求会被放行,但下一个请求将阻塞该等待时间。

SmoothRateLimiter 是一个抽象类,它有两个子类:SmoothBursty 和 SmoothWarmup。它们分别适用于不同的场景,下文会一一讲到。

SmoothBursty

上文我们通过 RateLimiter.create(5) 创建的限流器,实际上是 SmoothBursty。从名字可以看出,这是一个在平滑限流的基础上,能够应对突发流量的限流器。

先看 SmoothBursty 中一些重要的字段:

1
2
3
4
5
6
7
8
9
10
// 桶中最多存放多少秒的令牌数
final double maxBurstSeconds;
// 桶中的令牌个数
double storedPermits;
// 桶中最多能存放多少个令牌,maxPermits = maxBurstSeconds * 每秒生成令牌个数
double maxPermits;
// 加入令牌的平均间隔,单位为微秒,如果加入令牌速度为每秒 5 个,则该值为 1000 * 1000 / 5
double stableIntervalMicros;
// 下一个请求需要等待的时间点
private long nextFreeTicketMicros = 0L;

再来看 SmoothBursty 的创建逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
RateLimiter
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, RateLimiter.SleepingStopwatch.createFromSystemTimer());
}
RateLimiter
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// 创建一个 SmoothBursty 对象
double maxBurstSeconds = 1.0;
RateLimiter rateLimiter = new SmoothBursty(stopwatch, maxBurstSeconds);
// 设置令牌生成速率
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

我们看到,当调用 RateLimiter.create() 方法时,实际是创建了一个 SmoothBursty 的实例,并调用了其 setRate() 方法。需要注意的是,SmoothBursty 是个非 public 的类,只能通过 RateLimiter.create() 创建,而这里的 maxBurstSeconds 写死为 1.0,因此实际上 maxBurstSeconds 并非是一个可以自定义的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
RateLimiter
public final void setRate(double permitsPerSecond) {
checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
SmoothRateLimiter
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 根据当前时间更新桶内的令牌数
resync(nowMicros);
// 根据 qps 算出生成令牌的间隔
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
// 调用子类的更具体的实现
doSetRate(permitsPerSecond, stableIntervalMicros);
}
SmoothRateLimiter
void resync(long nowMicros) {
// 只有 当前时间>下一个请求需要等待的时间点 时,桶内才会补充令牌
if (nowMicros > nextFreeTicketMicros) {
// 计算这段时间内需要补充的令牌,coolDownIntervalMicros() 返回的是 stableIntervalMicros
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 更新桶中的令牌,不能超过 maxPermits
storedPermits = min(maxPermits, storedPermits + newPermits);
// 将下一个请求需要等待的时间点设置为当前时间
nextFreeTicketMicros = nowMicros;
}
}
SmoothBursty
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
// 保存旧的令牌上限
double oldMaxPermits = this.maxPermits;
// 计算新的令牌上限
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = maxPermits;
} else {
// 按 maxPermits / oldMaxPermits 的比例折算桶内已有的令牌数
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // 当创建时,桶内令牌数为 0
: storedPermits * maxPermits / oldMaxPermits;
}
}

在创建 SmoothBursty 时,它会根据我们设置的 qps 计算出生成令牌的速率和桶中令牌数的上限。同时我们看到 setRate() 是一个 public 方法,这意味着限流器的 qps 在创建后是可以更改的。更改 qps 时,首先会根据当前时间在桶内补充令牌,然后计算出新的生成令牌的速率和桶中令牌数的上限,最后根据上限数的变化按比例折算桶内已有的令牌数。

到此为止 SmoothBursty 已经创建完成了,接下来我们看 acquire() 的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
RateLimiter
public double acquire(int permits) {
// 本次请求所需等待的时间点
long microsToWait = reserve(permits);
// 开始阻塞等待(如果 microsToWait 不在当前时间点之后,则不需要等待)
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
RateLimiter
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
RateLimiter
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
SmoothRateLimiter
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里调用了上面提到的 resync 方法,会根据当前时间更新桶中的令牌数和 nextFreeTicketMicros
resync(nowMicros);
// 这次请求所需等待的时间点。(这个时间点仅和上次请求相关)
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 缺少的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// waitMicros 为下一次请求需要等待的时间长度;SmoothBursty的storedPermitsToWaitTime 返回 0
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新 nextFreeTicketMicros
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 减少桶内令牌
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

RateLimiter 的 acquire() 逻辑可以理解为是一种「贷款」消费:当一个昂贵的请求到达一个空闲的令牌桶时,如果令牌桶中没有足够的令牌,guava 并不会阻塞该请求;但当下一个请求到达时,就需要等待一段时间,直到令牌桶生成了足以支付上一个请求的令牌。这样做的好处是,在空闲期,大额的请求可以得到立即处理,而不需要等待。

通过分析 SmoothBursty 的源码,我们看到 SmoothBursty 通过维护一个令牌桶来应对突发流量;而当流量不断流入时,SmoothBursty 根据令牌生成速率和所需令牌数控制下一个请求等待的时间,从而保证流量的平滑;另外 SmoothBursty 实行「贷款」消费,避免大额请求在空闲期做不必要的等待。

以上是 SmoothBursty 的部分。但日常实践中,有时候我们会遇到这样的情况:一个服务最多能承受的 qps 是1000,但如果这个服务长期处于低 qps 的状态,那么从低 qps 到 1000 qps 的状态是需要时间的(比如需要重新建立数据库的连接)。这种从低 qps 状态到高 qps 状态的切换时间我们称为「热身期」,在「热身期」,服务的 qps 上限会逐渐从低位增长到高位。显然,SmoothBursty 是没有考虑「热身期」的,此时我们就要用到 SmoothWarmingUp

SmoothWarmingUp

SmoothWarmingUp 的创建需要比 SmoothBursty 多输入一个参数:

RateLimiter rateLimiter = RateLimiter.create(100, Duration.ofSeconds(2));

第二个参数名是 warmupPeriod,表示「热身期」的长度,示例中的取值为 2 秒。

首先介绍一下 SmoothWarmingUp 的「热身」机制。在 SmoothWarmingUp 中,系统的「冷热」程度是通过令牌桶中的令牌数量衡量的——堆积的令牌数越多,代表系统越空闲,也就越「冷」,反之越「热」。当令牌数量超过一个阈值时,系统会动态增加获取令牌的时间间隔,从而控制系统的 qps。令牌数量和获取令牌的间隔的关系如下图:

在图中,X 轴表示桶内令牌的数量,Y 轴表示获取令牌的时间间隔。可以看到,有个分界点 thresholdPermits :低于分界点时,请求获取令牌的间隔保持在稳定的 stableInterval,此时系统的 qps 正好是我们设置的大小;高于分界点后,请求获取令牌的间隔会随着桶内令牌数量的增加而线性增加,直到令牌数到达 maxPermits,此时获取令牌的间隔最长,我们把此时的间隔称为 coldInterval,代表系统完全冷却时获取令牌的间隔。SmoothWarmingUp 中规定了 coldInterval = 3 * stableInterval。

运用微积分的知识可以知道:对 f(x) 在区间 [a,b] 上求积分即可求出将桶内令牌从 b 块取至 a 块所需时间。我们把桶内令牌从 maxPermits 块取至 thresholdPermits 块的时间(大小等于图中右边梯形的面积)称为 warmup period,代表了整个系统的「热身期」,这个时间是把令牌从 thresholdPermits 块取至 0 块的时间(大小等于图中左边矩形的面积)的 2 倍(同样也是 SmoothWarmingUp 的规定)。

为什么 SmoothBursty 从桶中获取令牌不需要时间,而 SmoothWarmingUp 需要额外的时间呢?

这是因为 SmoothWarmingUp 和 SmoothBursty 在设计理念上有极大不同。SmoothBursty 是为了应对突发流量,令牌桶在这里是「缓存」的作用,因此获取桶内的令牌不需要时间;SmoothWarmingUp 是为了系统能够进行平滑的冷热切换,令牌桶在这里是「反映系统冷热」的作用,当桶内令牌数超过一定数量时,代表系统进入「冷」状态,因此需要额外的时间。另外,SmoothWarmingUp 不具备应对突发流量的能力,这一点很重要。

好了,现在我们知道了图中各个字段的含义,也知道了各个字段之间的相互关系,那么我们就可以根据我们输入的 stableInterval(1/qps) 和 warmupPeriod 得到各个值的计算公式:

首先,coldInterval 是 stableInterval 的三倍:

coldInterval = 3 * stableInterval

其次,梯形面积为 warmupPeriod,而矩形面积为 stableInterval * thresholdPermits,又,梯形面积是矩形的两倍,即:

warmupPeriod = 2 * stableInterval * thresholdPermits

由此,我们得出 thresholdPermits 的值:

thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后我们根据梯形面积的计算公式:

warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

得出 maxPermits 为:

maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

最后,斜率 slope 为:

slope = (coldInterval - stableInterval) / (maxPermits - thresholdPermits)

至此,我们已经搞清楚了 SmoothWarmingUp 的「热身」机制中所有的关键参数和相互的关系。

另外可能我们也比较关心这套「热身」机制在实际使用中会怎样影响系统的 qps,这里我绘制了 qps=100, warmupPeriod=2s 时的系统 qps 增长曲线,方便大家有个直观的认识:

有了以上的铺垫,接下来就可以进入源码了。首先是 SmoothWarmingUp 创建的部分,它的前半部分代码和 SmoothBursty 一模一样,核心的区别在于 doSetRate() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
SmoothRateLimiter
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
// 这里的实现和 SmoothBursty 不一样
doSetRate(permitsPerSecond, stableIntervalMicros);
}
SmoothWarmingUp
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 为固定值 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// thresholdPermits 的计算公式,上文已讲过
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// maxPermits 的计算公式,上文已讲过
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// slope 的计算公式,上文已讲过
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = 0.0;
} else {
// 按 maxPermits / oldMaxPermits 的比例折算桶内已有的令牌数
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // 当创建时,令牌数为 maxPermits,表示系统处于冷状态
: storedPermits * maxPermits / oldMaxPermits;
}
}

doSetRate() 中主要计算了 SmoothWarmingUp 的「热身」机制的关键参数,这些关键参数的计算公式我们前面已经推导过。注意一个细节:SmoothWarmingUp 创建之后桶内的令牌数为 maxPermits,而在 SmoothBursty 中是 0。

下面来看 acquire() 的源码,同样地,大部分代码和 SmoothBursty 一样,核心的区别在于 storedPermitsToWaitTime() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SmoothRateLimiter
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
// 这里的实现和 SmoothBursty 不一样
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

storedPermitsToWaitTime() 方法的含义是计算在当前桶中令牌数为 storedPermits 的情况下,取出 permitsToTake 块令牌所需的时间。由于在 SmoothBursty 中取令牌不需要时间,因此这个方法的返回值恒为 0,而在 SmoothWarmingUp 中则需要花费一定的时间。具体地,它需要计算下图阴影部分的面积:

具体代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SmoothWarmingUp
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 计算 thresholdPermits 右边梯形的面积
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 thresholdPermits 左边矩形的面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

以上就是 SmoothWarmingUp 限流的原理和源码实现。

Sentinel 中的限流