让我们首先使用 Spring Initializr 创建一个简单的 Spring Boot 应用
com.redis.rlfixed-window-rate-limiterfixed-window-rate-limitercom.redis.rl点击 GENERATE(生成),Initializr 将生成项目的 zip 文件并提示您将其下载到本地机器。将下载的文件(命名为fixed-window-rate-limiter.zip)解压到合适的目录。例如在 Mac 上
cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip现在,打开位于fixed-window-rate-limiter/src/main/java/com/redis/rl/下的文件FixedWindowRateLimiterApplication.java
package com.redis.rl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FixedWindowRateLimiterApplication {
public static void main(String[] args) {
SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
}
}然后我们添加带@Bean注解的方法routes()来创建最简单的端点;一个“ping”端点,它会回复文本响应“PONG”
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).build();
}您还需要以下导入
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;让我们使用 curl 测试新创建的端点
/>curl https://:8080/api/ping
PONG我们将把速率限制器实现为一个 Spring WebFlux 过滤器。过滤器允许我们拦截请求并修改响应。对于我们的需求,我们将使用HandlerFilterFunction,它可以过滤一个RouterFunction,例如我们的routes()端点。
为了使示例更容易管理,我们将所有内容保存在一个文件中。在文件FixedWindowRateLimiterApplication.java的末尾,我们添加一个名为RateLimiterHandlerFilterFunction的类,如下所示
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
// TODO Auto-generated method stub
return null;
}
}该类实现了filter方法,该方法当前返回null。我们修改它以简单地传递请求。
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
return next.handle(request);
}此外,我们将其应用于我们的 routes 方法:
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction()).build();
}使用 curl 测试该端点应该仍然能像以前一样工作。
Spring Data Redis 提供了一个响应式 API,可以很好地与其他 Spring 框架的响应式部分配合使用。为了使用 Redis,我们将使用ReactiveRedisTemplate,这个恰如其分地命名为reactiveRedisTemplate的方法根据一个ReactiveRedisConnectionFactory(由框架注入)配置一个响应式模板。
模板配置为使用字符串键和长整型值,如使用StringRedisSerializer和GenericToStringSerializer所示。
该模板提供了 Redis 交互的高级抽象,我们稍后会将其传递给速率限制器过滤器,以便使用 Redis 作为我们的速率限制后端
@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
.key(stringRedisSerializer).value(longToStringSerializer).build());
return template;
}
@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;下面是添加的必需导入
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.beans.factory.annotation.Autowired;接下来,我们修改 RateLimiterHandlerFilterFunction 类以包含模板,并添加一个构造函数以便我们可以正确初始化它:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private ReactiveRedisTemplate<String, Long> redisTemplate;
public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
this.redisTemplate = redisTemplate;
}我们还需要修改routes方法以在调用RateLimiterHandlerFilterFunction的构造函数时包含redisTemplate。
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}正如速率限制简介中所述,识别请求来源对于速率限制实现的有效性至关重要。在这个简单的实现中,我们将专注于速率限制的机制,并采取最简单的方式来识别请求者。
filter方法在RateLimiterHandlerFilterFunction中接受一个ServerRequest实例,从中我们可以获取请求者的 IP 地址。向过滤器函数类添加私有方法requestAddress
private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}有了requestAddress,我们可以构建用于检查请求者配额的键。为了计算该键,我们将连接请求者的地址和当前小时的分钟数。我们将键前缀设置为rl_表示“速率限制”
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
System.out.println(">>>> key " + key);
return next.handle(request);
}调用端点应该会在控制台上显示类似如下的键
>>>> key rl_localhost:34现在我们需要实现执行INCR和EXPIRE的逻辑,如固定窗口实现中使用 Spring Data Redis Reactive 所概述的
MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC“基本速率限制”方案建议使用Redis 事务,其中命令被发送到服务器,按顺序累积并依次执行,不会受到其他客户端请求的任何中断。
基本上,我们希望更新每单位时间请求计数的INCR和EXPIRE调用能够原子地发生,或者完全不发生。考虑到响应式 API 和 Redis 事务 (MULTI/EXEC) 是不兼容的范例,这意味着在响应式命令链中“您无法监听未来将要执行的命令”。使用响应式 API 实现此行为的“最佳可行”方法是使用ReactiveRedisTemplate的execute方法,该方法接受一个ReactiveRedisCallback,保证至少命令会在同一个 Redis 连接上运行,但这绝不是一个真正的“事务”
private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
HandlerFunction<ServerResponse> next) {
return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
@Override
public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
return Mono.zip( //
connection.numberCommands().incr(bbKey), //
connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
).then(Mono.empty());
}
}).then(next.handle(request));
}让我们分解这个复杂的响应式方法
Mono(一个专门的Publisher<T>,在此例中最多发出一个项,即一个ServerResponse)key,原始服务器request和next处理器函数doInRedis方法中,我们将键转换为一个ByteBuffer,以便与ReactiveRedisConnection命令一起使用INCR和EXPIRE命令上的顺序执行。Mono.empty()为了完成过滤器实现,我们将添加一个常量来限制每分钟的请求数,该常量可以从应用的属性中可选加载
@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;让我们分解最终的filter方法实现
opsForValue()来检索计算出的键下存储的值。incrementAndExpireKeyincrementAndExpireKeypublic Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
return redisTemplate //
.opsForValue().get(key) //
.flatMap( //
value -> value >= MAX_REQUESTS_PER_MINUTE ? //
ServerResponse.status(TOO_MANY_REQUESTS).build() : //
incrAndExpireKey(key, request, next) //
).switchIfEmpty(incrAndExpireKey(key, request, next));
}测试 API 速率限制器的一种简单方法是使用 curl 在循环中测试,因为我们正在测试每单位时间的固定请求数,下面的 curl 循环就足够了:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://:8080/api/ping); sleep 0.5; done我们循环 22 次,示例代码设置为 20 次,所以 22 次将使我们看到两个 429 响应。使用的 curl 标志如下:首先是-s,它会使 curl 静默(隐藏进度条和错误),-w是 write out 选项,我们可以传递一个包含插值变量的字符串。然后我们在循环之间休眠 1/2 秒。
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s如果我们以监控模式运行 Redis,应该会看到拒绝的请求调用了GET,允许的请求则会调用同样的命令以及INCR和EXPIRE
1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"