常见限流算法与开源实现

随着微服务的流行,服务之间的稳定性也变得越来越重要。在一个复杂的服务网络中,如果一个服务节点被流量打垮,那么很可能会造成其他多个服务都不可用的连锁事件。因此在微服务中,如何把流量控制在系统能够承受的范围内,不让过大的流量直接把服务打垮,是确保系统稳定性的重要部分。

限流是一种常见的控制流量的方法。当流量超过服务处理能力时,超过的部分会被限流组件拦截。被拦截的请求可能被丢弃,也可能做降级处理,总之,都会比直接打崩系统好。常见的限流算法有:漏桶法、令牌桶和滑动窗口,分别适用于不同的场景。除此之外,还有些开源组件中的限流实现,也会在下文一并讲解。

漏桶法

漏桶算法概念如下:

  • 请求到来时,将每个请求放入“漏桶”进行存储;
  • “漏桶”以固定速率向外“漏”出请求来执行;
  • 如果请求进入的速率大于“漏”出的速率,则“漏桶”中的请求会越来越多;当“漏桶”满了的时候,多余的请求会被丢弃。

漏桶算法多使用队列实现,服务的请求会存到队列中,服务的提供方则按照固定的速率从队列中取出请求并执行,过多的请求则会被丢弃。

漏桶算法的特点是流量的消费速率取决于“漏桶”向外“漏”水的速率,这种方式的好处是能保证系统的压力处于一个比较平稳的水位,缺点是当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

令牌桶

令牌桶算法概念如下:

  • 令牌以固定速率生成,生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃;
  • 当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行,取不到令牌的请求会被丢弃。

令牌桶算法的特点是会将用不到的令牌先存起来,这种方式的好处是能应对突发流量,缺点是流量进入的速率无法被控制,短期内系统的压力可能会有比较大的变化。

滑动窗口

漏桶法和令牌桶能够控制某个时间周期内进入系统的最大请求数,但会面临一个临界值的问题:假设我们将漏桶法和令牌桶的时间周期设置为 1s,那么这两者可以确保 [0s,1s) 和 [1s,2s) 这两个时间周期内通过的请求数,但无法确保 [0.2s,1.2s) 内的请求数。要控制任意一个连续时间内通过的请求数,就需要使用滑动窗口算法:

我们可以看上图,整个 1s 被划分成 5 个格子,每个格子代表 200ms。[0.8s,1s) 到达的 80 个请求落在蓝色的格子中,而 [1s,1.2s) 到达的请求会落在红色的格 子中。当时间到达 1s 时,我们的窗口会往右移动一格,那么此时时间窗口内的总请求数量一共是 150 个,超过了限定的 100 个,所以超出的 50 个请求会被丢弃。

从上面可以看出,当滑动窗口的格子划分的越多,那么滑动窗口的滑动就越平滑,限流的统计就会越精确。

Hystrix 中的限流

Hystrix 是 Netflix 开源的一款容错系统,可以帮助我们的应用系统提高稳定性。限流是 Hystrix 中非常重要的一个部分,Hystrix 中提供了两种限流方式,分别是线程池信号量。Hystrix 使用命令模式将被限流代码的业务逻辑和调用逻辑解耦,框架已经实现了使用线程池限流和使用信号量限流的调用逻辑,使用者只需要专注业务逻辑即可。

使用时先定义一个类继承HystrixCommand,实现其抽象方法run(),在该方法中写入需要被限流的业务代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class HelloCommand extends HystrixCommand<String> {
private String userId;

protected HelloCommand(Setter setter, String userId) {
super(setter);
// 通过这种方式传入业务参数,每次调用都要创建一个实例
this.userId = userId;
}

@Override
protected String run() throws Exception {
// 将需要被限流的代码写在这里
System.out.println("hello user:" + userId);
return "ok";
}
}

然后在主函数中配置限流的参数,调用HystrixCommandexecute()方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
// 使用线程池限流的配置
Setter threadSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-1"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用线程池限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
// 设置核心线程数量为 5
.withCoreSize(5)
// 设置最大线程数量为 10
.withMaximumSize(10)
// 设置最大排队长度为 20,该值即 blockingQueue 的大小,创建后不能改动
.withMaxQueueSize(20)
// 设置排队长度,超过排队长度时会拒绝,该值可以动态调整
.withQueueSizeRejectionThreshold(15)
);
// 使用信号量限流的配置
Setter semaphoreSetter = Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test-group-2"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
// 使用信号量限流
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
// 设置最大并发量为 10,同时请求超过 10 个会拒绝
.withExecutionIsolationSemaphoreMaxConcurrentRequests(10)
);
// 传入配置和业务参数
HelloCommand threadHelloCommand = new HelloCommand(threadSetter, "user1");
HelloCommand semaphoreHelloCommand = new HelloCommand(semaphoreSetter, "user2");
// 使用线程池限流的方式执行
threadHelloCommand.execute();
// 使用信号量限流的方式执行
semaphoreHelloCommand.execute();

}

  • 使用线程池的方式进行限流时,会先在线程池中创建若干线程,被限流的代码会被放进线程池中执行,如果线程满了,就会触发限流;
  • 使用信号量的方式进行限流时,会维护一个信号量,当进入被限流的方法时,信号量加 1,出被限流的方法时,信号量减 1,信号量超过阈值就会触发限流。

下面来看 Hystrix 中两种限流方式的源码实现。

线程池限流

Hystrix 底层使用 Rxjava 编写,因此会有很多陌生的概念比如 Observable、Subscriber 等,我会在下文简单介绍一下这些概念,没弄懂也没关系,因为这些概念不影响我们阅读限流方面的代码。

这是 Hystrix 的一次完整调用的链路(译自官方,有所改动)。上述的 demo 中我们直接调用了 execute() 方法,所以调用的路径为.execute() -> .queue() -> .toObservable() -> .toBlocking() -> .toFuture() -> .get()。这里的.xxx()都是 HystrixCommand 的方法,含义如下:

  • execute()会对 HystrixCommand 的 run() 方法进行一次阻塞调用,返回调用结果或者抛出错误;
  • queue()会返回一个 Future 对象,hystrix 会把执行后的结果放在里面,本质上执行.execute()就是在执行.queue().get()
  • toObservable()会把一个 HystrixCommand 对象转化成一个 Cold Observable 对象。Observable 是一个可观察对象,可以理解为一个数据发射器。Observable 分为 Cold 和 Hot 两种,Cold Observable 只有在被订阅者(Subscriber)订阅时才会发射数据,且每次订阅都会发送全量数据;Hot Observable 不需要订阅者就可以发射数据,订阅之后只会收到后续的数据。打个比方,Cold Observable 相当于一张 CD,Hot Observable 相当于一个电台;
  • toBlocking()会把一个 Observable 对象转化成一个 BlockingObservable 对象。当 BlockingObservable 被订阅时,会通过 CountDownLatch 和 BlockingQueue 控制,将订阅者一直阻塞在主线程直到特定的数据到来;
  • toFuture()是 BlockingObservable 的一个方法,它会触发订阅,同时它是一个单数据订阅者,它只对发射 1 个数据的 BlockingObservable 生效,返回一个 Future 用于存放数据。当 BlockingObservable 发射多个或 0 个数据时,该方法会报错。本质上执行.queue().get()就是在执行.toObservable().toBlocking().toFuture(),get()

本质上,hystrix 的执行流程就是一次订阅的过程,hystrix 在整个订阅的链路中添加了缓存、断路、限流、容错、降级的逻辑,其中限流的核心代码如下:

1
2