让我们首先使用 Spring Initializr 创建一个简单的 Spring Boot 应用程序
com.redis.rl
fixed-window-rate-limiter
fixed-window-rate-limiter
com.redis.rl
单击生成,Initializr 将生成项目的 zip 文件,并提示您将其下载到本地计算机。将下载的文件(名为 fixed-window-rate-limiter.zip
)解压缩到合适的目录。例如在 Mac 上
cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip
现在,打开位于 fixed-window-rate-limiter/src/main/java/com/redis/rl/
下的文件 FixedWindowRateLimiterApplication.java
package com.redis.rl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class FixedWindowRateLimiterApplication {
public static void main(String[] args) {
SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
}
}
让我们添加 @Bean
注释的方法 routes()
来创建最简单的端点;一个“ping”端点,它用文本响应“PONG”进行回复
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).build();
}
您还需要以下导入
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;
让我们使用 curl 测试新创建的端点
/>curl https://localhost:8080/api/ping
PONG
我们将速率限制器实现为 Spring WebFlux 过滤器。过滤器允许我们拦截请求并修改响应。对于我们的需求,我们将使用 HandlerFilterFunction
;我们可以过滤 RouterFunction
,例如我们的 routes()
端点。
为了使示例更易于管理,我们将所有内容都保存在一个文件中。在文件 FixedWindowRateLimiterApplication.java
的末尾,让我们添加一个名为 RateLimiterHandlerFilterFunction
的类,如下所示
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
// TODO Auto-generated method stub
return null;
}
}
该类实现了 filter
方法,该方法目前返回 null
。让我们修改它以简单地将请求传递。
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
return next.handle(request);
}
并且,让我们将它应用于我们的 routes
方法:
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction()).build();
}
使用 curl 测试端点应该仍然像以前一样工作。
Spring Data Redis 提供了一个响应式 API,它与 Spring 框架的所有其他响应式部分配合良好。为了使用 Redis,我们将使用 ReactiveRedisTemplate
,恰如其分地命名为 reactiveRedisTemplate
方法配置一个响应式模板,前提是给定一个 ReactiveRedisConnectionFactory
(它将由框架注入)。
该模板被配置为使用 String 键和 Long 值进行工作,如 StringRedisSerializer
和 GenericToStringSerializer
的使用所示。
该模板为 Redis 交互提供了高级抽象,我们稍后会将其传递给我们的速率限制器过滤器,以便使用 Redis 作为速率限制的后端
@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
.key(stringRedisSerializer).value(longToStringSerializer).build());
return template;
}
@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;
以下是添加的必需导入
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.beans.factory.annotation.Autowired;
接下来,让我们修改 RateLimiterHandlerFilterFunction
类以包含模板,并添加一个构造函数以便我们能够正确地初始化它:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private ReactiveRedisTemplate<String, Long> redisTemplate;
public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
this.redisTemplate = redisTemplate;
}
我们还需要修改 routes
方法以在调用 RateLimiterHandlerFilterFunction
构造函数时包含 redisTemplate
@Bean
RouterFunction<ServerResponse> routes() {
return route() //
.GET("/api/ping", r -> ok() //
.contentType(TEXT_PLAIN) //
.body(BodyInserters.fromValue("PONG")) //
).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}
如速率限制介绍中所述,识别请求的来源对于速率限制实现的有效性至关重要。在这个简单的实现中,我们将重点关注速率限制的机制,并做我们所能做的最简单的事情来识别请求者。
RateLimiterHandlerFilterFunction
中的 filter
方法接受一个 ServerRequest
实例,我们可以从中获取请求者的 IP 地址。将私有方法 requestAddress
添加到过滤器函数类
private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}
有了 requestAddress
,我们就可以构建用于检查请求者配额的键。为了计算键,我们将连接请求者的地址和当前小时的分钟数。我们将键名前缀为 rl_
,代表“速率限制”
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
System.out.println(">>>> key " + key);
return next.handle(request);
}
调用端点应该在控制台中显示键,例如
>>>> key rl_localhost:34
现在我们需要实现执行INCR
和EXPIRE
逻辑的代码,这些逻辑在使用 Spring Data Redis Reactive 的固定窗口实现中概述。
MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC
“基本速率限制”方案要求使用Redis 事务,其中命令被发送到服务器,以串行方式累积并按顺序执行,而不会被来自其他客户端的请求中断。
基本上,我们希望INCR
和EXPIRE
调用以原子方式更新每单位时间的请求计数器,或者完全不更新。鉴于Reactive API 和 Redis 事务 (MULTI/EXEC) 是不兼容的范式
,这归结为“你无法监听将在未来执行的命令”在一个反应式命令链中,使用ReactiveRedisTemplate
execute
方法是使用反应式 API 实现这种行为的“最佳”方法,该方法接受一个ReactiveRedisCallback
,确保命令至少在同一个 Redis 连接上运行,但这绝不是真正的“事务”。
private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
HandlerFunction<ServerResponse> next) {
return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
@Override
public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
return Mono.zip( //
connection.numberCommands().incr(bbKey), //
connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
).then(Mono.empty());
}
}).then(next.handle(request));
}
让我们分解这个反应式方法的庞然大物。
Mono
(一个专门的Publisher<T>
,在本例中最多发射一个项目,即一个ServerResponse
)key
、原始服务器request
和next
处理程序函数。doInRedis
方法中,我们将键转换为ByteBuffer
,以便与ReactiveRedisConnection
命令一起使用。INCR
和EXPIRE
命令上进行顺序执行。Mono.empty()
。为了完成过滤器实现,我们将添加一个常量来限制每分钟的请求数,该常量可以从应用程序的属性中可选地加载。
@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;
让我们分解最终的filter
方法实现。
opsForValue()
来检索存储在计算出的键下的值。incrementAndExpireKey
。incrementAndExpireKey
。public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
return redisTemplate //
.opsForValue().get(key) //
.flatMap( //
value -> value >= MAX_REQUESTS_PER_MINUTE ? //
ServerResponse.status(TOO_MANY_REQUESTS).build() : //
incrAndExpireKey(key, request, next) //
).switchIfEmpty(incrAndExpireKey(key, request, next));
}
使用 curl 在循环中测试 API 速率限制器是一种简单的方法,因为我们正在测试每单位时间的一组请求,所以下面的 curl 循环就足够了:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
我们循环 22 次,示例代码设置为 20,因此 22 次将使我们能够看到两个 429 响应。使用的 curl 标志如下:第一个是-s
,它使 curl 静默(使其隐藏进度条和错误),-w
是写入选项,我们可以在其中传递一个包含插值变量的字符串。然后我们在周期之间休眠 1/2 秒。
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s
如果我们在监控模式下运行 Redis,我们应该看到对被拒绝请求的GET
的调用,以及对允许请求的INCR
和EXPIRE
的调用。
1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
你可以在主分支上的 https://github.com/redis-developer/fixed-window-rate-limiter找到此示例。