让我们首先使用 Spring Initializr 创建一个简单的 Spring Boot 应用
com.redis.rl
fixed-window-rate-limiter
fixed-window-rate-limiter
com.redis.rl
点击 GENERATE(生成),Initializr 将生成项目的 zip 文件并提示您将其下载到本地机器。将下载的文件(命名为fixed-window-rate-limiter.zip
)解压到合适的目录。例如在 Mac 上
cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip
现在,打开位于fixed-window-rate-limiter/src/main/java/com/redis/rl/
下的文件FixedWindowRateLimiterApplication.java
package com.redis.rl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FixedWindowRateLimiterApplication {
public static void main(String[] args) {
SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
}
}
然后我们添加带@Bean
注解的方法routes()
来创建最简单的端点;一个“ping”端点,它会回复文本响应“PONG”
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).build();
}
您还需要以下导入
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;
让我们使用 curl 测试新创建的端点
/>curl http://localhost:8080/api/ping
PONG
我们将把速率限制器实现为一个 Spring WebFlux 过滤器。过滤器允许我们拦截请求并修改响应。对于我们的需求,我们将使用HandlerFilterFunction
,它可以过滤一个RouterFunction
,例如我们的routes()
端点。
为了使示例更容易管理,我们将所有内容保存在一个文件中。在文件FixedWindowRateLimiterApplication.java
的末尾,我们添加一个名为RateLimiterHandlerFilterFunction
的类,如下所示
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
// TODO Auto-generated method stub
return null;
}
}
该类实现了filter
方法,该方法当前返回null
。我们修改它以简单地传递请求。
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
return next.handle(request);
}
此外,我们将其应用于我们的 routes
方法:
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction()).build();
}
使用 curl 测试该端点应该仍然能像以前一样工作。
Spring Data Redis 提供了一个响应式 API,可以很好地与其他 Spring 框架的响应式部分配合使用。为了使用 Redis,我们将使用ReactiveRedisTemplate
,这个恰如其分地命名为reactiveRedisTemplate
的方法根据一个ReactiveRedisConnectionFactory
(由框架注入)配置一个响应式模板。
模板配置为使用字符串键和长整型值,如使用StringRedisSerializer
和GenericToStringSerializer
所示。
该模板提供了 Redis 交互的高级抽象,我们稍后会将其传递给速率限制器过滤器,以便使用 Redis 作为我们的速率限制后端
@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
.key(stringRedisSerializer).value(longToStringSerializer).build());
return template;
}
@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;
下面是添加的必需导入
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.beans.factory.annotation.Autowired;
接下来,我们修改 RateLimiterHandlerFilterFunction
类以包含模板,并添加一个构造函数以便我们可以正确初始化它:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private ReactiveRedisTemplate<String, Long> redisTemplate;
public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
this.redisTemplate = redisTemplate;
}
我们还需要修改routes
方法以在调用RateLimiterHandlerFilterFunction
的构造函数时包含redisTemplate
。
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}
正如速率限制简介中所述,识别请求来源对于速率限制实现的有效性至关重要。在这个简单的实现中,我们将专注于速率限制的机制,并采取最简单的方式来识别请求者。
filter
方法在RateLimiterHandlerFilterFunction
中接受一个ServerRequest
实例,从中我们可以获取请求者的 IP 地址。向过滤器函数类添加私有方法requestAddress
private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}
有了requestAddress
,我们可以构建用于检查请求者配额的键。为了计算该键,我们将连接请求者的地址和当前小时的分钟数。我们将键前缀设置为rl_
表示“速率限制”
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
System.out.println(">>>> key " + key);
return next.handle(request);
}
调用端点应该会在控制台上显示类似如下的键
>>>> key rl_localhost:34
现在我们需要实现执行INCR
和EXPIRE
的逻辑,如固定窗口实现中使用 Spring Data Redis Reactive 所概述的
MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC
“基本速率限制”方案建议使用Redis 事务,其中命令被发送到服务器,按顺序累积并依次执行,不会受到其他客户端请求的任何中断。
基本上,我们希望更新每单位时间请求计数的INCR
和EXPIRE
调用能够原子地发生,或者完全不发生。考虑到响应式 API 和 Redis 事务 (MULTI/EXEC) 是不兼容的范例
,这意味着在响应式命令链中“您无法监听未来将要执行的命令”。使用响应式 API 实现此行为的“最佳可行”方法是使用ReactiveRedisTemplate
的execute
方法,该方法接受一个ReactiveRedisCallback
,保证至少命令会在同一个 Redis 连接上运行,但这绝不是一个真正的“事务”
private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
HandlerFunction<ServerResponse> next) {
return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
@Override
public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
return Mono.zip( //
connection.numberCommands().incr(bbKey), //
connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
).then(Mono.empty());
}
}).then(next.handle(request));
}
让我们分解这个复杂的响应式方法
Mono
(一个专门的Publisher<T>
,在此例中最多发出一个项,即一个ServerResponse
)key
,原始服务器request
和next
处理器函数doInRedis
方法中,我们将键转换为一个ByteBuffer
,以便与ReactiveRedisConnection
命令一起使用INCR
和EXPIRE
命令上的顺序执行。Mono.empty()
为了完成过滤器实现,我们将添加一个常量来限制每分钟的请求数,该常量可以从应用的属性中可选加载
@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;
让我们分解最终的filter
方法实现
opsForValue()
来检索计算出的键下存储的值。incrementAndExpireKey
incrementAndExpireKey
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
return redisTemplate //
.opsForValue().get(key) //
.flatMap( //
value -> value >= MAX_REQUESTS_PER_MINUTE ? //
ServerResponse.status(TOO_MANY_REQUESTS).build() : //
incrAndExpireKey(key, request, next) //
).switchIfEmpty(incrAndExpireKey(key, request, next));
}
测试 API 速率限制器的一种简单方法是使用 curl 在循环中测试,因为我们正在测试每单位时间的固定请求数,下面的 curl 循环就足够了:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
我们循环 22 次,示例代码设置为 20 次,所以 22 次将使我们看到两个 429 响应。使用的 curl 标志如下:首先是-s
,它会使 curl 静默(隐藏进度条和错误),-w
是 write out 选项,我们可以传递一个包含插值变量的字符串。然后我们在循环之间休眠 1/2 秒。
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s
如果我们以监控模式运行 Redis,应该会看到拒绝的请求调用了GET
,允许的请求则会调用同样的命令以及INCR
和EXPIRE
1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"