学习

LETTUCE

本教程使用 Lettuce,它是一个不受支持的 Redis 库。对于生产应用程序,我们建议使用 Jedis

使用 RedisGears 提高原子性和性能#

什么是 RedisGears?#

RedisGears 是一个动态的服务器端数据处理引擎,其中“服务器”部分是 Redis 本身。RedisGears 作为 Redis 模块进行分发。您可以使用官方 Docker 镜像启动预先配置了 Gears 的 Redis 实例:

docker run -p 6379:6379 redislabs/redisgears:latest

或者,就像我通常做的那样,使用 "redismod" 镜像,其中包含 Gears 和所有其他 Redis, Inc. 支持的模块:

docker run -p 6379:6379 redislabs/redismod

RedisGears 的构建目的是在 Redis 内部提供一个数据处理引擎,与更简单的 Lua 服务器端脚本相比,它具有更正式的语义。可以将其视为 Redis 的更灵活的 map-reduce 引擎。它支持对 Redis 数据进行事务、批处理和事件驱动的处理。Gears 允许您将计算和数据本地化,并提供内置协调器以简化在集群环境中处理分布式数据。

在 RedisGears 中,处理的主要单元是 RedisGears 函数,该函数可以用 Python(目前正在开发更多语言)编写。这些函数在自己的线程上运行,与 Redis 的主线程分离,并且可以响应键空间事件或作为外部命令的结果而以命令方式执行。这些函数“注册”(部署)到 Gears 引擎,并具有关联的名称和注册 ID。

在注册过程中,我们会为我们的函数选择一个特定的读取器,它定义了函数如何获取其初始数据

  • KeysReader: Redis 键和值。
  • KeysOnlyReader: Redis 键。
  • StreamReader: Redis 流消息。
  • PythonReader: 任意 Python 生成器。
  • ShardsIDReader: 分片 ID。
  • CommandReader: 来自应用程序客户端的命令参数。

速率限制 RedisGears 函数#

根据读取器类型,Gear 函数可以立即运行,按需运行,作为批处理作业运行,或者通过注册到各种类型的事件以自动触发的方式以事件驱动的运行。

Python 函数 rate_limit 接受 3 个参数

  • key: 支持给定用户计数器的 Redis 键。
  • max_request: 用户的请求配额。
  • expiry: 将计数器 TTL 设置为未来的秒数。
def rate_limit(key, max_requests, expiry):
  requests = execute('GET', key)
  requests = int(requests) if requests else -1
  max_requests = int(max_requests)
  expiry = int(expiry)

  if (requests == -1) or (requests < max_requests):
    with atomic():
      execute('INCR', key)
      execute('EXPIRE', key, expiry)
    return False
  else:
    return True

# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')

将脚本放在 src/main/resources/scripts 下。现在,让我们分解它

 rate_limit 函数

与我们在之前实现中所做的一样,我们

  1. 1. 通过 execute-ing GET 命令检索传递的 key 的当前请求数。
  2. 2. 将结果转换为 int,如果未找到,则默认为 -1
  3. 3.max_requestsexpiry 转换为 int
  4. 4. 如果配额未超过,则在事务中 (with atomic():) 执行 INCR/EXPIRE 命令,并返回 False(无速率限制 - 允许请求)
  5. 5.否则,返回 True(拒绝请求)

函数注册

  1. 1. 在脚本底部,在 # Function registration 部分,我们使用 GearsBuilder(GB) 实例化 CommandReader 读取器。GearsBuilder“构建”函数的上下文,包括参数、转换、触发器等。
  2. 2. 我们使用 map 方法通过映射器函数回调对记录到 rate_limit 函数的参数进行一对一映射。
  3. 3. 现在,我们可以调用 register 操作将函数注册为事件处理程序。我们的事件是触发器 'RateLimiter'

SpringBoot 中的 RedisGears#

根据读取器类型,Gear 函数可以立即运行,按需运行,作为批处理作业运行,或者通过注册到各种类型的事件以自动触发的方式以事件驱动的运行。

Python 函数 rate_limit 接受 3 个参数

  • key: 支持给定用户计数器的 Redis 键。
  • max_request: 用户的请求配额。
  • expiry: 将计数器 TTL 设置为未来的秒数。
def rate_limit(key, max_requests, expiry):
  requests = execute('GET', key)
  requests = int(requests) if requests else -1
  max_requests = int(max_requests)
  expiry = int(expiry)

  if (requests == -1) or (requests < max_requests):
    with atomic():
      execute('INCR', key)
      execute('EXPIRE', key, expiry)
    return False
  else:
    return True

# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')

将脚本放在 src/main/resources/scripts 下。现在,让我们分解它

The rate_limit function

与我们在之前实现中所做的一样,我们

  1. 1. 通过 execute-ing GET 命令检索传递的 key 的当前请求数。
  2. 2. 将结果转换为 int,如果未找到,则默认为 -1
  3. 3.max_requestsexpiry 转换为 int
  4. 4. 如果配额未超过,则在事务中 (with atomic():) 执行 INCR/EXPIRE 命令,并返回 False(无速率限制 - 允许请求)
  5. 5.否则,返回 True(拒绝请求)

函数注册

  1. 1.在脚本底部,# 函数注册 部分,我们使用 CommandReader 读取器实例化 GearsBuilder(GB)GearsBuilder "构建" 函数的上下文,包括参数、转换、触发器等。
  2. 2.我们使用 map 方法通过映射器函数回调对记录进行一对一映射,映射到 rate_limit 函数的参数。
  3. 3.现在,我们可以调用 register 操作将函数注册为事件处理程序。在本例中,事件是触发器 'RateLimiter'

SpringBoot 中的 RedisGears#

为了从 SpringBoot 应用程序中使用 RedisGear 函数,我们需要执行以下几个步骤

  1. 1.将函数部署到 Redis 服务器
  2. 2.执行函数以获取每个请求的肯定或否定答案

Lettuce Mod#

LettuceMod 是基于 Lettuce 的 Redis 模块的 Java 客户端,由 Julien Ruaux 创建。它支持以下模块,可以独立使用或在集群配置中使用

  • 触发器和函数
  • JSON
  • 搜索
  • 时间序列

要使用 LettuceMod,我们将根据以下所示将依赖项添加到 Maven POM 中

<dependency>
  <groupId>com.redis</groupId>
  <artifactId>spring-lettucemod</artifactId>
  <version>1.7.0</version>
</dependency>

在 SpringBoot 中访问 Gears 命令#

要访问 LettuceMod 支持的任何模块,我们将在 FixedWindowRateLimiterApplication 类中注入一个 StatefulRedisModulesConnection,如下所示

@Autowired
StatefulRedisModulesConnection<String, String> connection;

添加匹配的导入语句

import com.redis.lettucemod.api.StatefulRedisModulesConnection;

注册 Gears 函数#

首先,我们将编写一个函数来确定具有触发器 RateLimiter 的函数是否已注册。该函数接受一个 ListRegistration,并深入挖掘以使用 Java Streams API 提取 trigger 参数的值

private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
  return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}

@PostConstruct 注释的方法 loadGearsScript 方法中

  1. 1.我们从之前注入的 StatefulRedisModulesConnection 中检索 RedisGearsCommands 的实例
  2. 2.我们通过 dumpregistrations 方法获取当前注册的 Gears 函数
  3. 3.我们将注册列表传递给我们的 getGearsRegistrationIdForTrigger
  4. 4.如果我们没有找到注册,我们将继续注册函数:
    • 将函数从类路径加载到名为 pyString
    • 使用 pyexecute 方法传递 py 脚本有效负载
@PostConstruct
public void loadGearsScript() throws IOException {
  String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
      Charset.defaultCharset());
  RedisGearsCommands<String, String> gears = connection.sync();
  List<Registration> registrations = gears.dumpregistrations();

  Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
  if (maybeRegistrationId.isEmpty()) {
    try {
      ExecutionResults er = gears.pyexecute(py);
      if (er.isOk()) {
        logger.info("RateLimiter.py has been registered");
      } else if (er.isError()) {
        logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
      }
    } catch (RedisCommandExecutionException rcee) {
      logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
    }
  } else {
    logger.info("RateLimiter.py has already been registered");
  }
}

修改过滤器以使用 Gears 函数#

接下来,我们将修改过滤器,以包含 StatefulRedisModulesConnection 以及配额;我们需要传递给函数的值

class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {

  private StatefulRedisModulesConnection<String, String> connection;
  private Long maxRequestPerMinute;

  public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
      Long maxRequestPerMinute) {
    this.connection = connection;
    this.maxRequestPerMinute = maxRequestPerMinute;
  }

现在,我们可以修改 filter 方法以使用该函数。Gears 函数通过触发正确的事件 RateLimiter 并传递函数所需的 parameters 来调用:key、配额和未来的 TTL 秒数。

与之前一样,如果函数返回 false,我们将让请求通过,否则我们将返回一个 HTTP 429

@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
  int currentMinute = LocalTime.now().getMinute();
  String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);

  RedisGearsCommands<String, String> gears = connection.sync();

  List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
  if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
    return next.handle(request);
  } else {
    return ServerResponse.status(TOO_MANY_REQUESTS).build();
  }
}

使用 curl 测试#

我们再次使用 curl 循环来测试限制器

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done

您应该看到第 21 个请求被拒绝

for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s

如果我们在监视模式下运行 Redis,我们应该看到对 RG.TRIGGER 的 Lua 调用,在它下面应该看到对 GETINCREXPIRE 的调用,用于允许的请求

1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"

对于限速请求,您应该只看到对 GET 的调用

1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"

此实现的完整代码位于分支 with_gears 下。

上次更新于 2024 年 2 月 20 日