学习

LETTUCE

本教程使用 Lettuce,这是一个不受支持的 Redis 库。对于生产应用,我们建议使用 Jedis

利用 RedisGears 提升原子性和性能#

什么是 RedisGears?#

RedisGears 是一个动态的服务器端数据处理引擎,其中的"服务器"部分就是 Redis 本身。RedisGears 作为 Redis 模块进行分发。您可以使用官方 Docker 镜像启动一个预配置了 Gears 的 Redis 实例:

docker run -p 6379:6379 redislabs/redisgears:latest

或者,像我大多数时候所做的那样,使用包含 Gears 和所有其他 Redis, Inc. 支持的模块的 "redismod" 镜像:

docker run -p 6379:6379 redislabs/redismod

构建 RedisGears 的目的是在 Redis 内部提供一个数据处理引擎,它比更简单的 Lua 服务器端脚本具有更规范的语义。可以将其视为一个更灵活的 Redis map-reduce 引擎。它支持事务、批处理和事件驱动的 Redis 数据处理。Gears 允许您本地化计算和数据,并提供内置协调器以方便在集群环境中处理分布式数据。

在 RedisGears 中,主要的处理单元是 RedisGears 函数,它(目前)可以用 Python 编写(正在开发更多语言)。这些函数运行在自己的线程上,与 Redis 主线程分离,并且可以响应键空间事件或作为外部命令的结果以命令方式执行。这些函数被“注册”(部署)到 Gears 引擎,并有一个相关的名称和注册 ID。

在注册期间,我们为函数选择一个特定的读取器,它定义了函数如何获取其初始数据

  • KeysReader:Redis 键和值。
  • KeysOnlyReader:仅 Redis 键。
  • StreamReader:Redis Stream 消息。
  • PythonReader:任意 Python 生成器。
  • ShardsIDReader:分片 ID。
  • CommandReader:来自应用客户端的命令参数。

一个速率限制 RedisGears 函数#

根据读取器类型,Gear 函数可以立即运行、按需运行、作为批处理作业运行,或者通过注册使其在各种类型的事件上自动触发,从而以事件驱动的方式运行。

Python 函数 rate_limit 接受 3 个参数

  • key:用作特定用户计数器支持的 Redis 键。
  • max_request:用户的请求配额。
  • expiry:将来设置计数器 TTL 的秒数。
def rate_limit(key, max_requests, expiry):
  requests = execute('GET', key)
  requests = int(requests) if requests else -1
  max_requests = int(max_requests)
  expiry = int(expiry)

  if (requests == -1) or (requests < max_requests):
    with atomic():
      execute('INCR', key)
      execute('EXPIRE', key, expiry)
    return False
  else:
    return True

# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')

将脚本放在 src/main/resources/scripts 目录下。现在,我们来分解一下

 rate_limit 函数

类似于我们在之前的实现中所做的那样,我们

  1. 1. 通过执行 GET 命令,检索传递的 key 当前的请求数量。
  2. 2. 将结果转换为 int 类型,如果未找到,则默认为 -1
  3. 3.max_requestsexpiry 转换为 int 类型
  4. 4. 如果配额未超出,则在事务中执行 INCR/EXPIRE 命令 (with atomic():),并返回 False(没有速率限制 - 请求允许)
  5. 5. 否则,返回 True(拒绝请求)

函数注册

  1. 1. 在脚本底部,在 # Function registration 部分,我们使用 CommandReader 读取器实例化 GearsBuilder(GB)GearsBuilder “构建”函数的上下文,包括参数、转换、触发器等。
  2. 2. 我们使用 map 方法,通过一个 mapper 函数回调,将记录一对一映射到 rate_limit 函数的参数。
  3. 3. 现在我们可以调用 register 操作,将函数注册为事件处理器。在本例中,事件是触发器 'RateLimiter'

SpringBoot 中的 RedisGears#

根据读取器类型,Gear 函数可以立即运行、按需运行、作为批处理作业运行,或者通过注册使其在各种类型的事件上自动触发,从而以事件驱动的方式运行。

Python 函数 rate_limit 接受 3 个参数

  • key:用作特定用户计数器支持的 Redis 键。
  • max_request:用户的请求配额。
  • expiry:将来设置计数器 TTL 的秒数。
def rate_limit(key, max_requests, expiry):
  requests = execute('GET', key)
  requests = int(requests) if requests else -1
  max_requests = int(max_requests)
  expiry = int(expiry)

  if (requests == -1) or (requests < max_requests):
    with atomic():
      execute('INCR', key)
      execute('EXPIRE', key, expiry)
    return False
  else:
    return True

# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')

将脚本放在 src/main/resources/scripts 目录下。现在,我们来分解一下

rate_limit 函数

类似于我们在之前的实现中所做的那样,我们

  1. 1. 通过执行 GET 命令,检索传递的 key 当前的请求数量。
  2. 2. 将结果转换为 int 类型,如果未找到,则默认为 -1
  3. 3.max_requestsexpiry 转换为 int 类型
  4. 4. 如果配额未超出,则在事务中执行 INCR/EXPIRE 命令 (with atomic():),并返回 False(没有速率限制 - 请求允许)
  5. 5. 否则,返回 True(拒绝请求)

函数注册

  1. 1.在脚本底部,在 # Function registration 部分,我们使用 CommandReader 读取器实例化 GearsBuilder(GB)GearsBuilder “构建”函数的上下文,包括参数、转换、触发器等。
  2. 2.我们使用 map 方法,通过一个 mapper 函数回调,将记录一对一映射到 rate_limit 函数的参数。
  3. 3.现在我们可以调用 register 操作,将函数注册为事件处理器。在本例中,事件是触发器 'RateLimiter'

SpringBoot 中的 RedisGears#

为了在 SpringBoot 应用中使用我们的 RedisGear 函数,我们需要做几件事

  1. 1. 将函数部署到 Redis 服务器
  2. 2. 执行函数以在每个请求上获得是/否的响应

Lettuce Mod#

LettuceMod 是由 Julien Ruaux 创建的基于 Lettuce 的 Redis Modules Java 客户端。它在独立或集群配置中支持以下模块

  • 触发器和函数
  • JSON
  • 搜索
  • 时间序列

要使用 LettuceMod,我们将在 Maven POM 中添加依赖项,如下所示

<dependency>
  <groupId>com.redis</groupId>
  <artifactId>spring-lettucemod</artifactId>
  <version>1.7.0</version>
</dependency>

在 SpringBoot 中访问 Gears 命令#

为了访问任何 LettuceMod 支持的模块,我们将在 FixedWindowRateLimiterApplication 类中注入一个 StatefulRedisModulesConnection,如下所示

@Autowired
StatefulRedisModulesConnection<String, String> connection;

添加相应的导入语句

import com.redis.lettucemod.api.StatefulRedisModulesConnection;

注册 Gears 函数#

我们首先编写一个函数来确定带有触发器 RateLimiter 的函数是否已注册。它接受一个 Registration 列表,并深入提取 trigger 参数的值,使用 Java Streams API

private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
  return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}

在带有 @PostConstruct 注解的方法 loadGearsScript

  1. 1. 从之前注入的 StatefulRedisModulesConnection 中检索 RedisGearsCommands 实例
  2. 2. 通过 dumpregistrations 方法获取当前注册的 Gears 函数
  3. 3. 将注册列表传递给我们的 getGearsRegistrationIdForTrigger
  4. 4. 如果我们没有找到注册,则继续注册函数:
    • 从 classpath 加载函数到名为 pyString
    • 使用 pyexecute 方法并传递 py 脚本载荷
@PostConstruct
public void loadGearsScript() throws IOException {
  String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
      Charset.defaultCharset());
  RedisGearsCommands<String, String> gears = connection.sync();
  List<Registration> registrations = gears.dumpregistrations();

  Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
  if (maybeRegistrationId.isEmpty()) {
    try {
      ExecutionResults er = gears.pyexecute(py);
      if (er.isOk()) {
        logger.info("RateLimiter.py has been registered");
      } else if (er.isError()) {
        logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
      }
    } catch (RedisCommandExecutionException rcee) {
      logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
    }
  } else {
    logger.info("RateLimiter.py has already been registered");
  }
}

修改过滤器以使用 Gears 函数#

接下来,我们将修改过滤器以包含 StatefulRedisModulesConnection 以及配额;即我们需要传递给函数的值

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  private StatefulRedisModulesConnection<String, String> connection;
  private Long maxRequestPerMinute;

  public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
      Long maxRequestPerMinute) {
    this.connection = connection;
    this.maxRequestPerMinute = maxRequestPerMinute;
  }

现在我们可以修改 filter 方法来使用该函数。Gears 函数通过触发正确的事件 RateLimiter 并传递函数所需的参数来调用;即 key、配额以及将来 TTL 的秒数。

如前所述,如果函数返回 false,我们将允许请求通过,否则返回 HTTP 429

@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);

  RedisGearsCommands<String, String> gears = connection.sync();

  List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
  if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
    return next.handle(request);
  } else {
    return ServerResponse.status(TOO_MANY_REQUESTS).build();
  }
}

使用 curl 进行测试#

我们再次使用 curl 循环来测试限速器

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done

您应该看到第 21 个请求被拒绝

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s

如果我们以监控模式运行 Redis,我们应该看到对 RG.TRIGGER 的 Lua 调用,并且在该调用下您应该看到对允许请求的 GETINCREXPIRE 调用

1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"

对于限速请求,您应该只看到对 GET 的调用

1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"

此实现的完整代码位于分支 with_gears 下。