学习

响应式实现

Brian Sam-Bodden
作者
Brian Sam-Bodden, Redis 开发倡导者

一个基本的 Spring Web Flux 应用程序#

让我们首先使用 Spring Initializr 创建一个简单的 Spring Boot 应用程序

  1. 1.在您的浏览器中,启动 Spring Initilizr
  2. 2.使用以下设置填写表单:
  • 项目:Maven 项目
  • 语言:Java
  • Spring Boot:2.5.4
  • 项目元数据:
    • 组:com.redis.rl
    • 工件:fixed-window-rate-limiter
    • 名称:fixed-window-rate-limiter
    • 描述:Redis 固定窗口速率限制器
    • 包名:com.redis.rl
    • 打包:JAR
    • Java:11
  • 依赖项:
    • Spring Reactive Web
    • String Data Reactive
    • Spring Boot DevTools

单击生成,Initializr 将生成项目的 zip 文件,并提示您将其下载到本地计算机。将下载的文件(名为 fixed-window-rate-limiter.zip)解压缩到合适的目录。例如在 Mac 上

cd ~/my-directory
unzip ~/Downloads/fixed-window-rate-limiter.zip

现在,打开位于 fixed-window-rate-limiter/src/main/java/com/redis/rl/ 下的文件 FixedWindowRateLimiterApplication.java

package com.redis.rl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FixedWindowRateLimiterApplication {

    public static void main(String[] args) {
        SpringApplication.run(FixedWindowRateLimiterApplication.class, args);
    }

}

让我们添加 @Bean 注释的方法 routes() 来创建最简单的端点;一个“ping”端点,它用文本响应“PONG”进行回复

@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).build();
}

您还需要以下导入

import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;
import static org.springframework.http.MediaType.TEXT_PLAIN;

import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.BodyInserters;

让我们使用 curl 测试新创建的端点

/>curl https://localhost:8080/api/ping
PONG

Spring WebFlux 过滤器#

我们将速率限制器实现为 Spring WebFlux 过滤器。过滤器允许我们拦截请求并修改响应。对于我们的需求,我们将使用 HandlerFilterFunction;我们可以过滤 RouterFunction,例如我们的 routes() 端点。

为了使示例更易于管理,我们将所有内容都保存在一个文件中。在文件 FixedWindowRateLimiterApplication.java 的末尾,让我们添加一个名为 RateLimiterHandlerFilterFunction 的类,如下所示

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  @Override
  public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
    // TODO Auto-generated method stub
    return null;
  }
}

该类实现了 filter 方法,该方法目前返回 null。让我们修改它以简单地将请求传递。

@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  return next.handle(request);
}

并且,让我们将它应用于我们的 routes 方法:

@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).filter(new RateLimiterHandlerFilterFunction()).build();
}

使用 curl 测试端点应该仍然像以前一样工作。

响应式 Redis 模板#

Spring Data Redis 提供了一个响应式 API,它与 Spring 框架的所有其他响应式部分配合良好。为了使用 Redis,我们将使用 ReactiveRedisTemplate,恰如其分地命名为 reactiveRedisTemplate 方法配置一个响应式模板,前提是给定一个 ReactiveRedisConnectionFactory(它将由框架注入)。

该模板被配置为使用 String 键和 Long 值进行工作,如 StringRedisSerializerGenericToStringSerializer 的使用所示。

该模板为 Redis 交互提供了高级抽象,我们稍后会将其传递给我们的速率限制器过滤器,以便使用 Redis 作为速率限制的后端

@Bean
ReactiveRedisTemplate<String, Long> reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
  JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
  StringRedisSerializer stringRedisSerializer = StringRedisSerializer.UTF_8;
  GenericToStringSerializer<Long> longToStringSerializer = new GenericToStringSerializer<>(Long.class);
  ReactiveRedisTemplate<String, Long> template = new ReactiveRedisTemplate<>(factory,
      RedisSerializationContext.<String, Long>newSerializationContext(jdkSerializationRedisSerializer)
          .key(stringRedisSerializer).value(longToStringSerializer).build());
  return template;
}

@Autowired
private ReactiveRedisTemplate<String, Long> redisTemplate;

以下是添加的必需导入

import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import org.springframework.beans.factory.annotation.Autowired;

接下来,让我们修改 RateLimiterHandlerFilterFunction 类以包含模板,并添加一个构造函数以便我们能够正确地初始化它:

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  private ReactiveRedisTemplate<String, Long> redisTemplate;

  public RateLimiterHandlerFilterFunction(ReactiveRedisTemplate<String, Long> redisTemplate) {
    this.redisTemplate = redisTemplate;
  }

我们还需要修改 routes 方法以在调用 RateLimiterHandlerFilterFunction 构造函数时包含 redisTemplate

@Bean
RouterFunction<ServerResponse> routes() {
  return route() //
      .GET("/api/ping", r -> ok() //
          .contentType(TEXT_PLAIN) //
          .body(BodyInserters.fromValue("PONG")) //
      ).filter(new RateLimiterHandlerFilterFunction(redisTemplate)).build();
}

识别请求者#

如速率限制介绍中所述,识别请求的来源对于速率限制实现的有效性至关重要。在这个简单的实现中,我们将重点关注速率限制的机制,并做我们所能做的最简单的事情来识别请求者。

RateLimiterHandlerFilterFunction 中的 filter 方法接受一个 ServerRequest 实例,我们可以从中获取请求者的 IP 地址。将私有方法 requestAddress 添加到过滤器函数类

private String requestAddress(Optional<InetSocketAddress> maybeAddress) {
  return maybeAddress.isPresent() ? maybeAddress.get().getHostName() : "";
}

有了 requestAddress,我们就可以构建用于检查请求者配额的键。为了计算键,我们将连接请求者的地址和当前小时的分钟数。我们将键名前缀为 rl_,代表“速率限制”

public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
  System.out.println(">>>> key " + key);

  return next.handle(request);
}

调用端点应该在控制台中显示键,例如

>>>> key rl_localhost:34

递增和过期键#

现在我们需要实现执行INCREXPIRE逻辑的代码,这些逻辑在使用 Spring Data Redis Reactive 的固定窗口实现中概述。

MULTI
INCR [user-api-key]:[current minute number]
EXPIRE [user-api-key]:[current minute number] 59
EXEC

“基本速率限制”方案要求使用Redis 事务,其中命令被发送到服务器,以串行方式累积并按顺序执行,而不会被来自其他客户端的请求中断。

基本上,我们希望INCREXPIRE调用以原子方式更新每单位时间的请求计数器,或者完全不更新。鉴于Reactive API 和 Redis 事务 (MULTI/EXEC) 是不兼容的范式,这归结为“你无法监听将在未来执行的命令”在一个反应式命令链中,使用ReactiveRedisTemplate execute方法是使用反应式 API 实现这种行为的“最佳”方法,该方法接受一个ReactiveRedisCallback,确保命令至少在同一个 Redis 连接上运行,但这绝不是真正的“事务”。

private Mono<ServerResponse> incrAndExpireKey(String key, ServerRequest request,
  HandlerFunction<ServerResponse> next) {
  return redisTemplate.execute(new ReactiveRedisCallback<List<Object>>() {
    @Override
    public Publisher<List<Object>> doInRedis(ReactiveRedisConnection connection) throws DataAccessException {
      ByteBuffer bbKey = ByteBuffer.wrap(key.getBytes());
      return Mono.zip( //
          connection.numberCommands().incr(bbKey), //
          connection.keyCommands().expire(bbKey, Duration.ofSeconds(59L)) //
      ).then(Mono.empty());
    }
  }).then(next.handle(request));
}

让我们分解这个反应式方法的庞然大物。

  1. 1.该方法返回一个异步 (0-1) 结果,一个Mono (一个专门的Publisher<T>,在本例中最多发射一个项目,即一个ServerResponse)
  2. 2.该方法接受计算出的速率限制key、原始服务器requestnext处理程序函数。
  3. 3.doInRedis方法中,我们将键转换为ByteBuffer,以便与ReactiveRedisConnection命令一起使用。
  4. 4.zip 方法等待所有源发射一个元素,并将这些元素组合成一个输出值,我们忽略这个值,因为我们只是希望在INCREXPIRE命令上进行顺序执行。
  5. 5.然后,该方法返回一个Mono.empty()
  6. 6.最后处理请求。

完成过滤器实现#

为了完成过滤器实现,我们将添加一个常量来限制每分钟的请求数,该常量可以从应用程序的属性中可选地加载。

@Value("${MAX_REQUESTS_PER_MINUTE}")
private static Long MAX_REQUESTS_PER_MINUTE = 20L;

让我们分解最终的filter方法实现。

  1. 1.我们使用 Redis 模板opsForValue()来检索存储在计算出的键下的值。
  2. 2.如果该值...
    • 大于或等于最大配额,我们将拒绝请求并返回 409 响应。
    • 否则,调用incrementAndExpireKey
  3. 3.
    • 为空/键未找到(此窗口的第一个请求),我们将调用incrementAndExpireKey
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);

  return redisTemplate //
      .opsForValue().get(key) //
      .flatMap( //
          value -> value >= MAX_REQUESTS_PER_MINUTE ? //
              ServerResponse.status(TOO_MANY_REQUESTS).build() : //
              incrAndExpireKey(key, request, next) //
      ).switchIfEmpty(incrAndExpireKey(key, request, next));
}

使用 curl 测试#

使用 curl 在循环中测试 API 速率限制器是一种简单的方法,因为我们正在测试每单位时间的一组请求,所以下面的 curl 循环就足够了:

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done

我们循环 22 次,示例代码设置为 20,因此 22 次将使我们能够看到两个 429 响应。使用的 curl 标志如下:第一个是-s,它使 curl 静默(使其隐藏进度条和错误),-w是写入选项,我们可以在其中传递一个包含插值变量的字符串。然后我们在周期之间休眠 1/2 秒。

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.393156 s
PONG :: HTTP 200, 4 bytes, 0.019530 s
PONG :: HTTP 200, 4 bytes, 0.023677 s
PONG :: HTTP 200, 4 bytes, 0.019922 s
PONG :: HTTP 200, 4 bytes, 0.025573 s
PONG :: HTTP 200, 4 bytes, 0.018916 s
PONG :: HTTP 200, 4 bytes, 0.019548 s
PONG :: HTTP 200, 4 bytes, 0.018335 s
PONG :: HTTP 200, 4 bytes, 0.010105 s
PONG :: HTTP 200, 4 bytes, 0.008416 s
PONG :: HTTP 200, 4 bytes, 0.009829 s
PONG :: HTTP 200, 4 bytes, 0.011766 s
PONG :: HTTP 200, 4 bytes, 0.010809 s
PONG :: HTTP 200, 4 bytes, 0.015483 s
PONG :: HTTP 200, 4 bytes, 0.009732 s
PONG :: HTTP 200, 4 bytes, 0.009970 s
PONG :: HTTP 200, 4 bytes, 0.008696 s
PONG :: HTTP 200, 4 bytes, 0.009176 s
PONG :: HTTP 200, 4 bytes, 0.009678 s
PONG :: HTTP 200, 4 bytes, 0.012497 s
:: HTTP 429, 0 bytes, 0.010071 s
:: HTTP 429, 0 bytes, 0.006625 s

如果我们在监控模式下运行 Redis,我们应该看到对被拒绝请求的GET的调用,以及对允许请求的INCREXPIRE的调用。

1630366639.188290 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366639.200956 [0 172.17.0.1:65016] "INCR" "rl_localhost:37"
1630366639.202372 [0 172.17.0.1:65016] "EXPIRE" "rl_localhost:37" "59"
...
1630366649.891110 [0 172.17.0.1:65016] "GET" "rl_localhost:37"
1630366650.417131 [0 172.17.0.1:65016] "GET" "rl_localhost:37"

你可以在主分支上的 https://github.com/redis-developer/fixed-window-rate-limiter找到此示例。