常见限流算法与开源实现

随着微服务的流行,服务之间的稳定性也变得越来越重要。在一个复杂的服务网络中,如果一个服务节点被流量打垮,那么很可能会造成其他多个服务都不可用的连锁事件。因此在微服务中,如何把流量控制在系统能够承受的范围内,不让过大的流量直接把服务打垮,是确保系统稳定性的重要部分。

限流是一种常见的控制流量的方法。当流量超过服务处理能力时,超过的部分会被限流组件拦截。被拦截的请求可能被丢弃,也可能做降级处理,总之,都会比直接打崩系统好。常见的限流算法有:漏桶法、令牌桶和滑动窗口,分别适用于不同的场景。除此之外,还有些开源组件中的限流实现,也会在下文一并讲解。

漏桶法

漏桶算法概念如下:

  • 请求到来时,将每个请求放入「漏桶」进行存储;
  • 「漏桶」以固定速率向外「漏」出请求来执行;
  • 如果请求进入的速率大于「漏」出的速率,则「漏桶」中的请求会越来越多;当「漏桶」满了的时候,多余的请求会被丢弃。

漏桶算法多使用队列实现,服务的请求会存到队列中,服务的提供方则按照固定的速率从队列中取出请求并执行,过多的请求则会被丢弃。

漏桶算法的特点是流量的消费速率取决于「漏桶」向外「漏」水的速率,这种方式的好处是能保证系统的压力处于一个比较平稳的水位,缺点是当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

令牌桶

令牌桶算法概念如下:

  • 令牌以固定速率生成,生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃;
  • 当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行,取不到令牌的请求会被丢弃。

令牌桶算法的特点是会将用不到的令牌先存起来,这种方式的好处是能应对突发流量,缺点是流量进入的速率无法被控制,短期内系统的压力可能会有比较大的变化。

滑动窗口

漏桶法和令牌桶能够控制某个时间周期内进入系统的最大请求数,但会面临一个临界值的问题:假设我们将漏桶法和令牌桶的时间周期设置为 1s,那么这两者可以确保 [0s,1s) 和 [1s,2s) 这两个时间周期内通过的请求数,但无法确保 [0.2s,1.2s) 内的请求数。要控制任意一个连续时间内通过的请求数,就需要使用滑动窗口算法:

我们可以看上图,整个 1s 被划分成 5 个格子,每个格子代表 200ms。[0.8s,1s) 到达的 80 个请求落在蓝色的格子中,而 [1s,1.2s) 到达的请求会落在红色的格 子中。当时间到达 1s 时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是 150 个,超过了限定的 100 个,所以超出的 50 个请求会被丢弃。

从上面可以看出,当滑动窗口的格子划分的越多,那么滑动窗口的滑动就越平滑,限流的统计就会越精确。

Hystrix 中的限流

Hystrix 是 Netflix 开源的一款容错系统,可以帮助我们的应用系统提高稳定性。限流是 Hystrix 中非常重要的一个部分,Hystrix 中提供了两种限流方式,分别是线程池信号量。Hystrix 使用命令模式将被限流代码的业务逻辑和调用逻辑解耦,框架已经实现了使用线程池限流和使用信号量限流的调用逻辑,使用者只需要专注业务逻辑即可。

使用时先定义一个类继承HystrixCommand,实现其抽象方法run(),在该方法中写入需要被限流的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HelloCommand extends HystrixCommand<String> {
private String userId;

protected HelloCommand(Setter setter, String userId) {
super(setter);
// 通过这种方式传入业务参数,每次调用都要创建一个实例
this.userId = userId;
}

@Override
protected String run() throws Exception {
// 将需要被限流的代码写在这里
System.out.println("hello user:" + userId);
return "ok";
}
}

然后在主函数中配置限流的参数,调用HystrixCommandexecute()方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
// 使用线程池限流的配置
Setter threadSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-1"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用线程池限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
// 设置核心线程数量为 5
.withCoreSize(5)
// 设置最大线程数量为 10
.withMaximumSize(10)
// 设置最大排队长度为 20,该值即 blockingQueue 的大小,创建后不能改动
.withMaxQueueSize(20)
// 设置排队长度,超过排队长度时会拒绝,该值可以动态调整
.withQueueSizeRejectionThreshold(15)
);
// 使用信号量限流的配置
Setter semaphoreSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-2"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用信号量限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// 设置最大并发量为 10,同时请求超过 10 个会拒绝
.withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
);
// 传入配置和业务参数
HelloCommand threadHelloCommand = new HelloCommand(threadSetter, "user1");
HelloCommand semaphoreHelloCommand = new HelloCommand(semaphoreSetter, "user2");
// 使用线程池限流的方式执行
threadHelloCommand.execute();
// 使用信号量限流的方式执行
semaphoreHelloCommand.execute();

}

  • 使用线程池的方式进行限流时,会先在线程池中创建若干线程,被限流的代码会被放进线程池中执行,如果线程满了,就会触发限流;
  • 使用信号量的方式进行限流时,会维护一个信号量,当进入被限流的方法时,信号量加 1,出被限流的方法时,信号量减 1,信号量超过阈值就会触发限流。

下面来看 Hystrix 中两种限流方式的源码实现。

信号量限流

Hystrix 底层使用 Rxjava 编写,因此会有很多陌生的概念比如 Observable、Subscriber 等,我会在下文简单介绍一下这些概念,没弄懂也没关系,因为这些概念不影响我们阅读限流方面的代码。

这是 Hystrix 的一次完整调用的链路(译自官方,有所改动)。上述的 demo 中我们直接调用了 execute() 方法,所以调用的路径为.execute() -> .queue() -> .toObservable() -> .toBlocking() -> .toFuture() -> .get()。这里的.xxx()都是 HystrixCommand 的方法,含义如下:

  • execute()会对 HystrixCommand 的 run() 方法进行一次阻塞调用,返回调用结果或者抛出错误;
  • queue()会返回一个 Future 对象,hystrix 会把执行后的结果放在里面,本质上执行.execute()就是在执行.queue().get()
  • toObservable()会把一个 HystrixCommand 对象转化成一个 Cold Observable 对象。Observable 是一个可观察对象,可以理解为一个数据发射器。Observable 分为 Cold 和 Hot 两种,Cold Observable 只有在被订阅者(Subscriber)订阅时才会发射数据,且每次订阅都会发送全量数据;Hot Observable 不需要订阅者就可以发射数据,订阅之后只会收到后续的数据。打个比方,Cold Observable 相当于一张 CD,Hot Observable 相当于一个电台;
  • toBlocking()会把一个 Observable 对象转化成一个 BlockingObservable 对象。当 BlockingObservable 被订阅时,会通过 CountDownLatch 和 BlockingQueue 控制,将订阅者一直阻塞在主线程直到特定的数据到来;
  • toFuture()是 BlockingObservable 的一个方法,它会触发订阅,同时它是一个单数据订阅者,它只对发射 1 个数据的 BlockingObservable 生效,返回一个 Future 用于存放数据。当 BlockingObservable 发射多个或 0 个数据时,该方法会报错。本质上执行.queue().get()就是在执行.toObservable().toBlocking().toFuture().get()

本质上,hystrix 的执行流程就是一次订阅的过程,hystrix 在整个订阅的链路中添加了缓存、断路、限流、容错、降级的逻辑,其中限流的核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
AbstractCommand
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
//...
// 判断断路器是否打开
if (circuitBreaker.allowRequest()) {
// 获取信号量对象
final AbstractCommand.TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
// 执行用户代码
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 未获取信号量,执行降级逻辑
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器打开,执行降级逻辑
return handleShortCircuitViaFallback();
}
}

AbstractCommand
protected AbstractCommand.TryableSemaphore getExecutionSemaphore() {
// 判断限流策略是否是信号量
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
// 从缓存中获取,这里缓存的 key 默认情况下和 Command 的类名相关,同一个类的所有实例共用一个信号量
AbstractCommand.TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
// 创建信号量,并放入缓存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new AbstractCommand.TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
// 限流策略非信号量时,返回 DEFAULT
return AbstractCommand.TryableSemaphoreNoOp.DEFAULT;
}
}

从源码可以看到,在一个请求要执行我们在 HystrixCommand 中自定义的代码前,会先请求信号量,默认情况下同一个 HystrixCommand 类的所有实例共用一个信号量。在创建信号量时,会根据限流策略是『信号量限流』还是『线程池限流』创建不同类型的信号量。当限流策略是『信号量』时,创建的信号量的核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
AbstractCommand
static class TryableSemaphoreActual implements TryableSemaphore {
// 并发请求的上限
protected final HystrixProperty<Integer> numberOfPermits;
// 当前请求的数量
private final AtomicInteger count = new AtomicInteger(0);
@Override
public boolean tryAcquire() {
int currentCount = count.incrementAndGet();
// 判断当前请求数量是否大于请求上限
if (currentCount > numberOfPermits.get()) {
count.decrementAndGet();
return false;
} else {
return true;
}
}
@Override
public void release() {
// 释放信号量
count.decrementAndGet();
}
}

从信号量的获取和释放可以看出,主要是使用 AtomicInteger 的原子操作来进行信号量的获取和释放。

线程池限流

上面是当限流策略是『信号量』时的情况,而当限流策略是『线程池』时,会返回一个 DEFAULT 信号量,这个信号量比较特殊,它的 tryAcquire() 方法返回值恒为 true, release() 方法是空方法:

1
2
3
4
5
6
7
8
9
10
11
AbstractCommand
static class TryableSemaphoreNoOp implements TryableSemaphore {
public static final TryableSemaphore DEFAULT = new TryableSemaphoreNoOp();
@Override
public boolean tryAcquire() {
return true;
}
@Override
public void release() {}
}
}

这符合我们的预期,因为当限流策略为『线程池』时,不需要在信号量中安排实际的限流逻辑。那么线程池这种限流策略的实现又在哪里呢?答案在创建 Observable 的逻辑里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
AbstractCommand
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 隔离策略为线程池
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
}
})
// 当这个 Observable 被订阅时,会从 threadPool 中选择一个线程来执行订阅行为
.subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
// 线程中断的条件
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == AbstractCommand.TimedOutStatus.TIMED_OUT;
}
}));
} else {
// 隔离策略不为线程池时,在原线程执行
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
//...
}
});
}
}

从源码中我们可以看到,当限流策略为线程池时,Hystrix 会从内部的线程池 threadPool 中选择一个线程来执行订阅行为。和信号量一样,线程池在默认情况下也是和类型关联,一个 HystrixCommand 类的所有实例共用一个线程池。当并发请求数超过线程数+等待队列大小的时候,线程池会抛异常,这个异常会在外层被捕捉到,触发降级:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
AbstractCommand
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
//...
// 执行降级的逻辑
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
// 接收到的内部抛出的异常
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
// 被抛异常为线程池拒绝异常时,触发降级逻辑
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
}
// 被抛异常为执行超时异常时,也会触发降级逻辑
else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
//...
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
// 接收内部抛出的异常,执行降级逻辑
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}

源码中我们可以看到,当线程池池拒绝抛出异常时,这个异常会在外部被捕捉到,触发降级。另外,除了线程池拒绝这种情况外,执行超时也会触发降级(只有在线程池限流下才可以设置超时时间,信号量限流时不支持设置超时时间;且 Hystrix 中超时的实现比较特别,并不是 future.get(timeout) 这种方式,而是通过定时任务定时中断的方式,个人猜测可能是出于到在主线程挂掉后仍能中断超时任务的考虑)。

Guava 中的限流

Guava 是一个 Google 开发的基于 Java 的扩展项目,内部提供了各种实用的工具。RateLimiter 是 Guava 提供的用于限流的工具。RateLimiter 基于令牌桶算法,可以有效限定单个 JVM 实例上某个接口的流量。

RateLimiter 使用的一个例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RateLimiterExample {
public static void main(String[] args) throws Exception {
// QPS 设置为 5,代表一秒钟只允许处理五个请求
RateLimiter rateLimiter = RateLimiter.create(5);
ExecutorService executorService = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(10);
long startTime = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger();
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
// 向令牌桶请求令牌
rateLimiter.acquire(1);
latch.countDown();
System.out.println("第 "+count.incrementAndGet()+" 个请求完成," +
"距离开始时间 "+(System.currentTimeMillis()-startTime)+" ms");
});
}
latch.await();
System.out.println("共计用时:" + (System.currentTimeMillis() - startTime)+"ms");
executorService.shutdown();
}
}

运行结果:

从运行结果可以看到,在 qps 设置为 5 的条件下,10 个请求共计花费 1800ms。值得注意的是,每个请求之间的间隔十分均匀,都在 200ms 左右,这主要是得益于 Guava 实现的平滑限流算法。该算法的核心实现位于 SmoothRateLimiter 这个类下,这是一个基于令牌桶实现的限流工具类:令牌会被匀速地补充到桶内,当一个请求过来时,通过 acquire() 方法从桶中取得令牌,如果此时桶中没有足够令牌,则根据不足的令牌数量和补充令牌的速度计算出需要等待的时间,当前请求会被放行,但下一个请求将阻塞该等待时间。

SmoothRateLimiter 是一个抽象类,它有两个子类:SmoothBursty 和 SmoothWarmup。它们分别适用于不同的场景,下文会一一讲到。

SmoothBursty

上文我们通过 RateLimiter.create(5) 创建的限流器,实际上是 SmoothBursty。从名字可以看出,这是一个在平滑限流的基础上,能够应对突发流量的限流器。

先看 SmoothBursty 中一些重要的字段:

1
2
3
4
5
6
7
8
9
10
// 桶中最多存放多少秒的令牌数
final double maxBurstSeconds;
// 桶中的令牌个数
double storedPermits;
// 桶中最多能存放多少个令牌,maxPermits = maxBurstSeconds * 每秒生成令牌个数
double maxPermits;
// 加入令牌的平均间隔,单位为微秒,如果加入令牌速度为每秒 5 个,则该值为 1000 * 1000 / 5
double stableIntervalMicros;
// 下一个请求需要等待的时间点
private long nextFreeTicketMicros = 0L;

再来看 SmoothBursty 的创建逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
RateLimiter
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, RateLimiter.SleepingStopwatch.createFromSystemTimer());
}
RateLimiter
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
// 创建一个 SmoothBursty 对象
double maxBurstSeconds = 1.0;
RateLimiter rateLimiter = new SmoothBursty(stopwatch, maxBurstSeconds);
// 设置令牌生成速率
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

我们看到,当调用 RateLimiter.create() 方法时,实际是创建了一个 SmoothBursty 的实例,并调用了其 setRate() 方法。需要注意的是,SmoothBursty 是个非 public 的类,只能通过 RateLimiter.create() 创建,而这里的 maxBurstSeconds 写死为 1.0,因此实际上 maxBurstSeconds 并非是一个可以自定义的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
RateLimiter
public final void setRate(double permitsPerSecond) {
checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
SmoothRateLimiter
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 根据当前时间更新桶内的令牌数
resync(nowMicros);
// 根据 qps 算出生成令牌的间隔
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
// 调用子类的更具体的实现
doSetRate(permitsPerSecond, stableIntervalMicros);
}
SmoothRateLimiter
void resync(long nowMicros) {
// 只有 当前时间>下一个请求需要等待的时间点 时,桶内才会补充令牌
if (nowMicros > nextFreeTicketMicros) {
// 计算这段时间内需要补充的令牌,coolDownIntervalMicros() 返回的是 stableIntervalMicros
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
// 更新桶中的令牌,不能超过 maxPermits
storedPermits = min(maxPermits, storedPermits + newPermits);
// 将下一个请求需要等待的时间点设置为当前时间
nextFreeTicketMicros = nowMicros;
}
}
SmoothBursty
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
// 保存旧的令牌上限
double oldMaxPermits = this.maxPermits;
// 计算新的令牌上限
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = maxPermits;
} else {
// 按 maxPermits / oldMaxPermits 的比例折算桶内已有的令牌数
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // 当创建时,桶内令牌数为 0
: storedPermits * maxPermits / oldMaxPermits;
}
}

在创建 SmoothBursty 时,它会根据我们设置的 qps 计算出生成令牌的速率和桶中令牌数的上限。同时我们看到 setRate() 是一个 public 方法,这意味着限流器的 qps 在创建后是可以更改的。更改 qps 时,首先会根据当前时间在桶内补充令牌,然后计算出新的生成令牌的速率和桶中令牌数的上限,最后根据上限数的变化按比例折算桶内已有的令牌数。

到此为止 SmoothBursty 已经创建完成了,接下来我们看 acquire() 的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
RateLimiter
public double acquire(int permits) {
// 本次请求所需等待的时间点
long microsToWait = reserve(permits);
// 开始阻塞等待(如果 microsToWait 不在当前时间点之后,则不需要等待)
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
RateLimiter
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
RateLimiter
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
SmoothRateLimiter
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 这里调用了上面提到的 resync 方法,会根据当前时间更新桶中的令牌数和 nextFreeTicketMicros
resync(nowMicros);
// 这次请求所需等待的时间点。(这个时间点仅和上次请求相关)
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 缺少的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend;
// waitMicros 为下一次请求需要等待的时间长度;SmoothBursty的storedPermitsToWaitTime 返回 0
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 更新 nextFreeTicketMicros
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 减少桶内令牌
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

RateLimiter 的 resync() 方法用于补充桶内令牌,它的触发是惰性的,在请求达到时才会触发。

RateLimiter 的 acquire() 逻辑可以理解为是一种「贷款」消费:如果桶内有足够多的令牌,那么请求会立即通过,并且下一个请求也可以立即通过;而如果令牌桶中没有足够的令牌,那么这一个请求可以立即通过,但当下一个请求就需要等待一段时间,直到令牌桶生成了足以支付上一个请求的令牌。这样做的好处是,在空闲期,大额的请求可以得到立即处理,而不需要等待。

通过分析 SmoothBursty 的源码,我们看到 SmoothBursty 通过维护一个令牌桶来应对突发流量。而当流量不断流入时,SmoothBursty 根据令牌生成速率和所需令牌数控制下一个请求等待的时间,从而保证流量的平滑。另外 SmoothBursty 实行「贷款」消费,避免大额请求在空闲期做不必要的等待。

SmoothWarmingUp

以上是 SmoothBursty 的部分。但日常实践中,有时候我们会遇到这样的情况:一个服务最多能承受的 qps 是1000,但如果这个服务长期处于低 qps 的状态,那么从低 qps 到 1000 qps 的状态是需要时间的(比如需要重新建立数据库的连接)。这种从低 qps 状态到高 qps 状态的切换时间我们称为「热身期」,在「热身期」,服务的 qps 上限会逐渐从低位增长到高位。显然,SmoothBursty 是没有考虑「热身期」的,此时我们就要用到 SmoothWarmingUp

SmoothWarmingUp 的创建需要比 SmoothBursty 多输入一个参数:

RateLimiter rateLimiter = RateLimiter.create(100, Duration.ofSeconds(2));

第二个参数名是 warmupPeriod,表示「热身期」的长度,示例中的取值为 2 秒。

首先介绍一下 SmoothWarmingUp 的「热身」机制。在 SmoothWarmingUp 中,系统的「冷热」程度是通过令牌桶中的令牌数量衡量的——堆积的令牌数越多,代表系统越空闲,也就越「冷」,反之越「热」。当令牌数量超过一个阈值时,系统会动态增加获取令牌的时间间隔,从而控制系统的 qps。令牌数量和获取令牌的间隔的关系如下图:

在图中,X 轴表示桶内令牌的数量,Y 轴表示获取令牌的时间间隔。可以看到,有个分界点 thresholdPermits :低于分界点时,请求获取令牌的间隔保持在稳定的 stableInterval,此时系统的 qps 正好是我们设置的大小;高于分界点后,请求获取令牌的间隔会随着桶内令牌数量的增加而线性增加,直到令牌数到达 maxPermits,此时获取令牌的间隔最长,我们把此时的间隔称为 coldInterval,代表系统完全冷却时获取令牌的间隔。SmoothWarmingUp 中规定了 coldInterval = 3 * stableInterval。

零点是一个特殊的点,在该点令牌桶中的令牌数为 0,但系统仍然可以接受请求,并且间隔为 stableInterval,只不过此时令牌桶中的令牌数不会再降。

运用微积分的知识可以知道:对 f(x) 在区间 [a,b] 上求积分即可求出将桶内令牌从 b 块取至 a 块所需时间。我们把桶内令牌从 maxPermits 块取至 thresholdPermits 块的时间(大小等于图中右边梯形的面积)称为 warmup period,代表了整个系统的「热身期」,这个时间是把令牌从 thresholdPermits 块取至 0 块的时间(大小等于图中左边矩形的面积)的 2 倍(同样也是 SmoothWarmingUp 的规定)。

为什么 SmoothBursty 从桶中获取令牌不需要时间,而 SmoothWarmingUp 需要额外的时间呢?

这是因为 SmoothWarmingUp 和 SmoothBursty 在设计理念上有极大不同。SmoothBursty 是为了应对突发流量,令牌桶在这里是「缓存」的作用,因此获取桶内的令牌不需要时间;SmoothWarmingUp 是为了系统能够进行平滑的冷热切换,令牌桶在这里是「反映系统冷热」的作用,当桶内令牌数超过一定数量时,代表系统进入「冷」状态,因此需要额外的时间。另外,SmoothWarmingUp 不具备应对突发流量的能力,这一点很重要。

好了,现在我们知道了图中各个字段的含义,也知道了各个字段之间的相互关系,那么我们就可以根据我们输入的 stableInterval(1/qps) 和 warmupPeriod 得到各个值的计算公式:

首先,coldInterval 是 stableInterval 的三倍:

coldInterval = 3 * stableInterval

其次,梯形面积为 warmupPeriod,而矩形面积为 stableInterval * thresholdPermits,又,梯形面积是矩形的两倍,即:

warmupPeriod = 2 * stableInterval * thresholdPermits

由此,我们得出 thresholdPermits 的值:

thresholdPermits = 0.5 * warmupPeriod / stableInterval

然后我们根据梯形面积的计算公式:

warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)

得出 maxPermits 为:

maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)

最后,斜率 slope 为:

slope = (coldInterval - stableInterval) / (maxPermits - thresholdPermits)

至此,我们已经搞清楚了 SmoothWarmingUp 的「热身」机制中所有的关键参数和相互的关系。

另外可能我们也比较关心这套「热身」机制在实际使用中会怎样影响系统的 qps,这里我绘制了 qps=100, warmupPeriod=2s 时的系统 qps 增长曲线,方便大家有个直观的认识:

有了以上的铺垫,接下来就可以进入源码了。首先是 SmoothWarmingUp 创建的部分,它的前半部分代码和 SmoothBursty 一模一样,核心的区别在于 doSetRate() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
SmoothRateLimiter
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
// 这里的实现和 SmoothBursty 不一样
doSetRate(permitsPerSecond, stableIntervalMicros);
}
SmoothWarmingUp
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 为固定值 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// thresholdPermits 的计算公式,上文已讲过
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// maxPermits 的计算公式,上文已讲过
maxPermits = thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// slope 的计算公式,上文已讲过
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
storedPermits = 0.0;
} else {
// 按 maxPermits / oldMaxPermits 的比例折算桶内已有的令牌数
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // 当创建时,令牌数为 maxPermits,表示系统处于冷状态
: storedPermits * maxPermits / oldMaxPermits;
}
}

doSetRate() 中主要计算了 SmoothWarmingUp 的「热身」机制的关键参数,这些关键参数的计算公式我们前面已经推导过。注意一个细节:SmoothWarmingUp 创建之后桶内的令牌数为 maxPermits,而在 SmoothBursty 中是 0。

下面来看 acquire() 的源码,同样地,大部分代码和 SmoothBursty 一样,核心的区别在于 storedPermitsToWaitTime() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SmoothRateLimiter
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros =
// 这里的实现和 SmoothBursty 不一样
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

storedPermitsToWaitTime() 方法的含义是计算在当前桶中令牌数为 storedPermits 的情况下,取出 permitsToTake 块令牌所需的时间。由于在 SmoothBursty 中取令牌不需要时间,因此这个方法的返回值恒为 0,而在 SmoothWarmingUp 中则需要花费一定的时间。具体地,它需要计算下图阴影部分的面积:

具体代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SmoothWarmingUp
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 计算 thresholdPermits 右边梯形的面积
if (availablePermitsAboveThreshold > 0.0) {
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 thresholdPermits 左边矩形的面积
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}

以上就是 SmoothWarmingUp 限流的原理和源码实现。

Sentinel 中的限流

Sentinel 是阿里开源的一个流量控制组件,提供了限流、熔断、降级、系统保护等多种功能。其中限流部分 Sentinel 提供了多种策略,包括基于QPS的限流和基于并发的限流。本文将从指标统计和限流算法的实现两个角度剖析源码。

使用方式

Sentinel 的使用非常简单,它的内部控制流量的粒度是resource,而 resource 由 resourceName 唯一区分,因此在创建 resource 时需要传入 resourceName。Sentinel 提供了两种风格的使用形式,分别是:

1、try-catch风格

1
2
3
4
5
6
7
8
9
10
11
12
13
// 创建资源,资源名可使用任意有业务语义的字符串
try (Entry entry = SphU.entry("resourceName")) {
// 被保护的业务逻辑
// do something here...
} catch (BlockException ex) {
// 进入catch块表示资源访问阻止,可能是被限流或被降级
// 进行相应的处理操作
}finally {
// 务必保证每个 entry 与 exit 配对
if (entry != null) {
entry.exit();
}
}

2、if 风格

1
2
3
4
5
6
7
8
9
10
11
12
13
// 资源名可使用任意有业务语义的字符串
if (SphO.entry("resourceName")) {
try {
// 被保护的业务逻辑
// do something here...
} finally {
// 务必保证exit()会被执行
SphO.exit();
}
} else {
// 进入else块表示资源访问阻止,可能是被限流或被降级
// 进行相应的处理操作
}

从使用方式可以看出 Sentinel 的核心在 entry 方法,而该方法的总体流程是一个责任链的模式,指标统计和限流算法分别是该责任链中的两个模块。除了这两个模块外,还有熔断降级、系统负载保护等其它模块,整个链条中的核心模块如下图所示:

各节点的作用分别是:

  • NodeSelectorSlot负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot用于创建簇点,这些簇点可以存储 runtime 指标;
  • StatisticSlot用于统计 runtime 指标,比如资源的 RT, QPS, thread count 等等;
  • FlowSlot用于根据预设的限流规则以及前面统计的 runtime 指标,来进行流量控制;
  • DegradeSlot通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot通过系统的状态,例如 cpu 使用率等,来控制总的入口流量;

本文主要关注StatisticSlotFlowSlot模块。

指标统计

指标统计是限流的前置操作。Sentinel 会在StatisticSlot模块中统计当前资源的线程数、rt 和 qps 等指标,然后根据这些指标决定要不要限流。这些指标都会记录在 statisticNode 中,每个资源都有自己的 statisticNode,statisticNode 并不会直接存储 rt、qps,而是通过存储更为基础的请求数,计算获得 rt、qps 等指标。StatisticNode 统计的基础指标有三个,分别是:秒级滑动窗口、分钟级滑动窗口和当前线程数。

1
2
3
4
5
6
7
8
public class StatisticNode implements Node {
// 秒级滑动窗口
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL);
// 分钟级滑动窗口
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
// 当前线程数
private LongAdder curThreadNum = new LongAdder();
}

其中,当前线程数是一个 LongAdder 对象,在执行 entry 方法时自增,在执行 exit 方法是自减,因此在写代码时必须保证每个 entry 与 exit 配对,否则统计就会出错。

秒级滑动窗口和分钟级滑动窗口的创建逻辑都在 ArrayMetric 中,但两者的底层实现有些许不同,这里先看实现较为简单的分钟级滑动窗口,再看实现较为复杂的秒级滑动窗口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ArrayMetric implements Metric {

private final LeapArray<MetricBucket> data;

public ArrayMetric(int sampleCount, int intervalInMs) {
// 秒级滑动窗口创建,默认 sampleCount=2,intervalInMs = 1000,可以修改
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
// 分钟级滑动窗口创建,默认 sampleCount=60,intervalInMs = 60*1000,enableOccupy=false,不可修改
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
//...
}

可以看到,在 ArrayMetric 中的核心是一个 LeapArray,这是 Sentinel 内部的高效滑动窗口实现。它有两个子类,会根据传入参数的不同而创建不同的子类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
// 数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
// 写锁,用于更新数据
private final ReentrantLock updateLock = new ReentrantLock();

public LeapArray(int sampleCount, int intervalInMs) {
// 时间区间长度(ms单位)
this.intervalInMs = intervalInMs;
// 划分的窗口数
this.sampleCount = sampleCount;
// 单个窗口长度
this.windowLengthInMs = intervalInMs / sampleCount;

this.array = new AtomicReferenceArray<>(sampleCount);
}
// ...
}

LeapArray 的底层是一个数组,数组中的每一个元素代表了一个窗口,intervalInMs 代表了整个时间区间的长度,sampleCount 代表了窗口个数。当创建分钟级滑动窗口时,默认 intervalInMs = 60*1000,表示时间区间的长度为 1 分钟,sampleCount = 60,表示 1 分钟的区间被划分为 60 个窗口,那么单个窗口的长度就为 1s。

图中每一个窗口都是一个 WindowWrap 对象,该对象记录了这个时间窗口的开始时间戳、窗口长度和统计数据。

1
2
3
4
5
6
7
8
9
public class WindowWrap<T> {
// 单个窗口长度
private final long windowLengthInMs;
// 开始时间戳
private long windowStart;
// 统计数据
private T value;
// ...
}

从前文可知,统计数据的类型是 MetricBucket,这是一个包含多个统计维度的结构体,这些维度分别是:PASS(通过)、BLOCK(阻塞)、EXCEPTION(异常)、SUCCESS(成功)、RT(返回时间之和)、minRt(最小RT时间)、OCCUPIED_PASS(预占通过)。每当有请求通过\拦截\异常时,都会更新相应的数据。

分钟级滑动窗口的更新策略非常简单。当添加数据时,首先查看数据落在哪个窗口(取余),随后查看该窗口的数据是否过期,如果未过期则直接添加,如果过期了则先清零再添加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
LeapArray
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 获取窗口下标
int idx = calculateTimeIdx(timeMillis);
// 计算理论窗口起始时间
long windowStart = calculateWindowStart(timeMillis);
// 使用循环来控制并发
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 不存在窗口,创建新的窗口
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 使用 CAS 创建新窗口
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
// CAS 失败,代表有并发创建,让出 CPU 使用权
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 窗口未过期,直接返回旧窗口
return old;
} else if (windowStart > old.windowStart()) {
// 窗口过期,清空当前窗口
if (updateLock.tryLock()) { // 通过加锁控制并发
try {
// 清空当前窗口
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 获取锁失败,说明有并发更新,让出 CPU 使用权
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 窗口时间小于理论时间,正常不会发生,小概率情况下发生了时钟回拨,此时请求不计入窗口,返回一个 windowWrap 用于容错
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

// 清空当前窗口
BucketLeapArray
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// 窗口时间设置为当前时间
w.resetTo(startTime);
// 数据清零
w.value().reset();
return w;
}

读取时,会依次读取所有窗口,过滤掉其中已经过期的窗口,进行计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
LeapArray
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
// 依次读取所有窗口
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// 过滤掉过期的和空的窗口
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}

// 判断当前窗口的数据是否是 60s 内的
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}

接下来看秒级滑动窗口,和分钟级不同的是,秒级滑动窗口的实现是 OccupiableBucketLeapArray,这是一种预占式的滑动窗口,允许请求“预占”未来的窗口。为什么要有这样一个设计呢?这是因为在 1.5 版本中,Sentinel 对“直接拒绝”的限流机制做了优化,添加了“预占”机制,允许在当前 QPS 已经达到限流阈值时,同个资源高优先级(prioritized = true)的请求提前占用未来时间窗口的配额数,等待到对应时间窗口到达时直接通过,从而可以实现“最终通过”的效果而不是被立即拒绝;而同个资源低优先级的请求则不能占用未来的配额,阈值达到时就会被限流。

OccupiableBucketLeapArray 内部使用 FutureBucketLeapArray 来记录这些占用了未来时间窗口的请求。FutureBucketLeapArray 仅维持当前时间以后的格子,它和 LeapArray 唯一不同就是判断数据是否过期的方法isWindowDeprecated

1
2
3
4
5
6
7
8
9
10
LeapArray
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// 当前格子不在过期范围内
return time - windowWrap.windowStart() > intervalInMs;
}
FutureBucketLeapArray
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// 当前格子也在过期范围内
return time >= windowWrap.windowStart();
}

可以看到,LeapArray 中当前格子是不在过期范围内的,而 FutureBucketLeapArray 将当前格子也纳入了过期范围。当“预占”的请求到达时,OccupiableBucketLeapArray 会将其放入 FutureBucketLeapArray 中

1
2
3
4
5
6
7
8
9
class OccupiableBucketLeapArray{
// FutureBucketLeapArray 类型的 borrowArray 用于记录预占的请求
private final FutureBucketLeapArray borrowArray;
public void addWaiting(long time, int acquireCount) { // 这个传入的 time 是当前时间+等待时间
WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
// 将预占请求记入 borrowArray 中
window.value().add(MetricEvent.PASS, acquireCount);
}
}

秒级滑动窗口的更新策略和分钟级滑动窗口完全一致,但秒级滑动窗口重写了重置窗口resetWindowTo的逻辑,在每次重置窗口时,不是直接清零,而是先清零再加上 FutureBucketLeapArray 中当前窗口预占的请求数,因为按照设计,这些请求会在当前时间窗口内通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
OccupiableBucketLeapArray
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
w.resetTo(time);
// 找到 FutureBucketLeapArray 中对应当前时间的窗口
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
// 先清零
w.value().reset();
// 再加上当前窗口对应的预占请求数
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}

同样,在创建空窗口时,秒级滑动窗口也会加上 FutureBucketLeapArray 中的预占请求数

1
2
3
4
5
6
7
8
9
10
OccupiableBucketLeapArray
public MetricBucket newEmptyBucket(long time) {
MetricBucket newBucket = new MetricBucket();
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
// 加上 FutureBucketLeapArray 中的预占请求数
newBucket.reset(borrowBucket);
}
return newBucket;
}

读取的逻辑和分钟级窗口完全一样,都是遍历并过滤掉过期的窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
LeapArray
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);
// 依次读取所有窗口
for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
// 过滤掉过期的和空的窗口
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}

// 判断当前窗口的数据是否是 1s 内的
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}

分钟级和秒级窗口的统计数值并非完全准确的,观测到的值有可能比实际的值要小。以秒级窗口为例,秒级窗口一轮是 1s,一共 2 个窗口,每个窗口是 0.5s,现在考虑这样一个场景:滑动窗口以 qps=10 匀速接收请求,第 1.0s 的时候,两个窗口记录的请求数均为 5,此时读取到的 qps=10;而当第 1.1s 时,此时第 0 个窗口被清零更新后重新写入 1(假设没有被预占的请求),因此读取到的 qps=6。可见,滑动窗口的统计可能存在误差,并且可以计算出,最大误差率为 1/N,其中 N 为窗口个数。为了减小误差,可以通过 SampleCountProperty#updateSampleCount 将窗口数量调大。

秒级窗口中可能存在高优先级请求没有被统计上的问题。如前文所述,高优先级请求在“直接拒绝”机制下不会被立马拒绝,而是有可能预占未来的配额,但这些预占的请求是惰性更新的,只有在更新窗口或者创建空窗口时才会被更新到秒级窗口中,而更新窗口或者创建空窗口只有在请求到达时才会进行,也就是说,如果第 1s 中有 n 个高优先级请求被顺延到了第 2s,而第 2s 中没有接收到任何请求,那么第 1s 顺延下来的这 n 个请求是不会统计在第 2s 的 qps 中的,此时第 2s 的 qps=0。为了避免这个问题,推荐尽量不使用 entryWithPriority,或者也可以通过 OccupyTimeoutProperty#setOccupyTimeout 将预占超时时间调小,如果调成 0,则不将会有预占发生。

限流算法

直接拒绝

直接拒绝算法既可以用于按 qps 限流,也可以用于按线程数限流。该算法的思路是当请求数超过既定的 qps 或者线程数时,将请求快速失败。核心代码位于 DefaultController 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DefaultController
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 按限流方式获取 qps/线程数
int curCount = avgUsedTokens(node);
// 计算所需令牌是否超过配额
if (curCount + acquireCount > count) {
return false;
}
return true;
}
DefaultController
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
// 按限流方式获取当前已有的线程数或已通过的 qps
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}

1.5 版本后,Sentinel 对 qps 维度的高优先级请求做了优化,允许这些请求预占后 1s 的配额。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
DefaultController
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
// 1.5版本新增部分
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
// 计算预占请求所需等待时间
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
// 如果预占请求可以在超时前通过,则进行等待
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
// sleep 等待
sleep(waitInMs);
// PriorityWaitException 会被 StatisticSlot 接住,用于统计,不会中断调用链的执行
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}

可以看到,当请求为高优先级且限流规则为 qps 限流时,“直接拒绝”算法不会立即拒绝,而是计算预占所需时间,如果这个时间小于超时时间,则会进行 sleep 等待然后放行。

默认情况下,超时时间 occupyTimeout=500,即预占请求最多预占 500ms 以内的配额。预占额度的分布和当前这一秒的请求分布有关。比如限制 qps=10,并且 1s 的区间分成 4 个窗口,现在接收 20 个请求,前 10 个请求分布为 [1,3,2,4],那么当第 11 个高优先级请求到达时,它会预占下一秒的第一个窗口,第 12-14 个高优先级请求到达时,它们会预占下一秒的第二个窗口,第 15-16 个请求,按照算法会预占下一秒的第三个窗口,但由于第三个窗口的等待时间超过 500ms,它们会被拒绝。同理第 16-20 个请求也会被拒绝。

匀速排队

匀速排队只适用于 qps 限流。它的中心思想是:以固定的间隔时间让请求通过。当请求到来的时候,如果当前请求距离上个通过的请求通过的时间间隔不小于预设值,则让当前请求通过;否则,计算当前请求的预期通过时间,如果该请求的预期通过时间小于规则预设的 timeout 时间,则该请求会等待直到预设时间到来通过(排队等待处理);若预期的通过时间超出最大排队时长,则直接拒接这个请求。它的核心代码位于RateLimiterController 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
RateLimiterController
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 省略一些有效性检验
long currentTime = TimeUtil.currentTimeMillis();
// 计算当前请求需要花费的时间
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 当前请求的预期通过时间 = 花费时间 + 上个请求通过时间
long expectedTime = costTime + latestPassedTime.get();

if (expectedTime <= currentTime) {
// 预期时间小于当前时间,直接通过
latestPassedTime.set(currentTime);
return true;
} else {
// 计算需要等待的时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
// 等待时间超过允许等待的最大时间,不通过
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 再次计算等待时间,这里再次计算是因为有并发
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 如果再次计算后等待时间超过了允许等待的最大时间,不通过,并减去这次的等待时间
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// 等待所需时间后通过
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}

由于 expectedTime 是以毫秒为单位,因此匀速排队模式不支持 QPS > 1000 的场景

预热

和 Guava 一样,Sentinel 也支持预热模式的限流。在预热模式下,系统在“冷启动”时会控制 qps 阈值使其慢慢提升,在一定时间内逐渐恢复到正常的 qps,从而给冷系统一个预热的时间,避免冷系统被压垮。预热模式只适用于 qps 限流。

Sentinel 的预热机制是建立在 Guava 算法的基础上的,但和 Guava 不同的是,Guava 调节的是请求之间的间隔,而 Sentinel 调节的是每秒的 qps。qps 和请求间隔之间有如下的换算关系:

stableInterval 表示以 ms 为单位的平稳期请求间隔,count 表示平稳期 qps,则有

stableInterval = 1.0 / count * 1000

coldInterval 表示以 ms 为单位的冷却期最大请求间隔,coldFactor 表示冷启动因子,代表平稳 qps 和冷却状态下最小 qps 的比值,则有

coldInterval = 1.0 / (count / coldFactor) * 1000

Sentinel 的算法便是在 Guava 算法的基础上代入了如上换算关系,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 初始化
WarmUpController
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
// 平稳状态下的 qps
this.count = count;
// 冷启动因子,代表平稳 qps 和冷却状态下最小 qps 的比值,无法自定义,固定为 3
this.coldFactor = coldFactor;

// 计算警戒线下的令牌数量
// Guava 中原代码:thresholdPermits = 0.5 * warmupPeriod / stableInterval
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// 计算总令牌数量
// Guava 中原代码:maxPermits = thresholdPermits + 2 * warmupPeriod / (stableInterval + coldInterval)
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

// 计算斜率
// Guava 中原代码:slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits)
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}

// 判断是否通过
WarmUpController
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 这一秒的 qps
long passQps = (long) node.passQps();
// 上一秒的 qps
long previousQps = (long) node.previousPassQps();
// 根据当前时间和上一秒 qps 更新桶内令牌数
syncToken(previousQps);
// 获取桶内剩余令牌数
long restToken = storedTokens.get();
// 如果当前剩余令牌数大于警戒值,说明系统处于冷状态
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 计算这一秒允许达到的最大 qps 值
// Guava 中原代码:current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 如果当前 qps+所需令牌数不大于允许的最大qps值,放行
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// 系统不处于冷状态,和平稳 qps 比
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}

// 更新桶内令牌数
WarmUpController
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
// 当前时间(向下模 1000 取整)
currentTime = currentTime - currentTime % 1000;
// 上次更新时间
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}

long oldValue = storedTokens.get();
long newValue = oldValue;

// 添加令牌的判断前提条件(满足一个即可):
// 1)剩余令牌数小于警戒值
// 2)上一秒 qps 小于冷却期最小 qps
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}

newValue = Math.min(newValue, maxToken);
// 更新令牌数,并减去上一秒 qps 的值
if (storedTokens.compareAndSet(oldValue, newValue)) {
if (storedTokens.addAndGet(0 - passQps) < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}

从代码可以看出,在代入 qps 和请求间隔的换算关系后,除了更新桶内令牌数的部分,Sentinel 的预热算法和 Guava 基本一致。更新桶内令牌数的部分两者不一样:在 Guava 中,只要当前时间大于下一个请求需要等待的时间点,桶内就会更新令牌;而在 Sentinel 中,由于统计的是 qps,因此只有在这一秒没有更新过时才会更新,并且只有当桶内剩余令牌数小于警戒值或上一秒 qps 小于冷却期最小 qps 时才会更新。换言之,如果桶内剩余令牌数大于警戒值并且上一秒 qps 大于冷却期最小 qps,说明此时桶内正在从冷状态“加热”到正常状态,此时为了使得这种“加热”持续进行,桶内不会再添加新的令牌。

最后,Sentinel 会根据桶内令牌数计算这一秒允许达到的最大 qps 值,再和已经统计到的 qps 作比较,如果两者的差值大于所需令牌数,则放行;反之不放行。

由于 Sentinel 是按秒级窗口更新,因此 qps 的上升并不平滑,而是呈阶梯状,这点和 Guava 不一样。如图所示(图片出自官方wiki):

预热+匀速排队

预热+匀速排队模式融合了预热和匀速排队两者各自的特点。在该模式下,系统每秒的 qps 是由预热算法计算的,而每秒内请求的间隔则是固定的。同样,该模式只适用于 qps 限流中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
WarmUpRateLimiterController
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);

long currentTime = TimeUtil.currentTimeMillis();

long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 计算这一秒允许达到的最大 qps 值,和 warmup 模式完全一样
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 根据 warmingQps 计算请求间隔
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
// 如果令牌数在警戒线之下,根据最大 qps 计算请求间隔
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
// 根据 costTime 控制请求间隔,之后逻辑和 RateLimiter 模式完全一样
expectedTime = costTime + latestPassedTime.get();

if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}

从代码可以看出,计算 warmingQps 的逻辑和 warmup 模式完全一样,计算请求间隔的逻辑和 RateLimiter 模式完全一样,这里就不多赘述了。

全文总结

本文开始介绍了令牌桶、漏斗法、滑动窗口三种基本的限流算法,随后介绍了常见开源限流组件 Hystrix、Guava 和 Sentinel 中的限流实现。每种算法都有对应的应用场景,实际生产中要灵活应用。

参考资料

Javadoop - 阿里 Sentinel 源码解析
Sentinel 官方 wiki