本教程使用 Lettuce,这是一个不受支持的 Redis 库。对于生产应用,我们建议使用 Jedis
RedisGears 是一个动态的服务器端数据处理引擎,其中的"服务器"部分就是 Redis 本身。RedisGears 作为 Redis 模块进行分发。您可以使用官方 Docker 镜像启动一个预配置了 Gears 的 Redis 实例:
docker run -p 6379:6379 redislabs/redisgears:latest
或者,像我大多数时候所做的那样,使用包含 Gears 和所有其他 Redis, Inc. 支持的模块的 "redismod" 镜像:
docker run -p 6379:6379 redislabs/redismod
构建 RedisGears 的目的是在 Redis 内部提供一个数据处理引擎,它比更简单的 Lua 服务器端脚本具有更规范的语义。可以将其视为一个更灵活的 Redis map-reduce 引擎。它支持事务、批处理和事件驱动的 Redis 数据处理。Gears 允许您本地化计算和数据,并提供内置协调器以方便在集群环境中处理分布式数据。
在 RedisGears 中,主要的处理单元是 RedisGears 函数,它(目前)可以用 Python 编写(正在开发更多语言)。这些函数运行在自己的线程上,与 Redis 主线程分离,并且可以响应键空间事件或作为外部命令的结果以命令方式执行。这些函数被“注册”(部署)到 Gears 引擎,并有一个相关的名称和注册 ID。
在注册期间,我们为函数选择一个特定的读取器,它定义了函数如何获取其初始数据
KeysReader
:Redis 键和值。KeysOnlyReader
:仅 Redis 键。StreamReader
:Redis Stream 消息。PythonReader
:任意 Python 生成器。ShardsIDReader
:分片 ID。CommandReader
:来自应用客户端的命令参数。根据读取器类型,Gear 函数可以立即运行、按需运行、作为批处理作业运行,或者通过注册使其在各种类型的事件上自动触发,从而以事件驱动的方式运行。
Python 函数 rate_limit
接受 3 个参数
key
:用作特定用户计数器支持的 Redis 键。max_request
:用户的请求配额。expiry
:将来设置计数器 TTL 的秒数。def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
将脚本放在 src/main/resources/scripts
目录下。现在,我们来分解一下
rate_limit
函数类似于我们在之前的实现中所做的那样,我们
GET
命令,检索传递的 key
当前的请求数量。 int
类型,如果未找到,则默认为 -1
max_requests
和 expiry
转换为 int
类型 INCR
/EXPIRE
命令 (with atomic():
),并返回 False
(没有速率限制 - 请求允许) True
(拒绝请求)# Function registration
部分,我们使用 CommandReader
读取器实例化 GearsBuilder
(GB
)。 GearsBuilder
“构建”函数的上下文,包括参数、转换、触发器等。 map
方法,通过一个 mapper 函数回调,将记录一对一映射到 rate_limit
函数的参数。 register
操作,将函数注册为事件处理器。在本例中,事件是触发器 'RateLimiter'
。 根据读取器类型,Gear 函数可以立即运行、按需运行、作为批处理作业运行,或者通过注册使其在各种类型的事件上自动触发,从而以事件驱动的方式运行。
Python 函数 rate_limit
接受 3 个参数
key
:用作特定用户计数器支持的 Redis 键。max_request
:用户的请求配额。expiry
:将来设置计数器 TTL 的秒数。 def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
将脚本放在 src/main/resources/scripts
目录下。现在,我们来分解一下
类似于我们在之前的实现中所做的那样,我们
GET
命令,检索传递的 key
当前的请求数量。 int
类型,如果未找到,则默认为 -1
max_requests
和 expiry
转换为 int
类型 INCR
/EXPIRE
命令 (with atomic():
),并返回 False
(没有速率限制 - 请求允许) True
(拒绝请求)# Function registration
部分,我们使用 CommandReader
读取器实例化 GearsBuilder
(GB
)。 GearsBuilder
“构建”函数的上下文,包括参数、转换、触发器等。map
方法,通过一个 mapper 函数回调,将记录一对一映射到 rate_limit
函数的参数。register
操作,将函数注册为事件处理器。在本例中,事件是触发器 'RateLimiter'
。为了在 SpringBoot 应用中使用我们的 RedisGear 函数,我们需要做几件事
LettuceMod 是由 Julien Ruaux 创建的基于 Lettuce 的 Redis Modules Java 客户端。它在独立或集群配置中支持以下模块
要使用 LettuceMod,我们将在 Maven POM 中添加依赖项,如下所示
<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>
为了访问任何 LettuceMod 支持的模块,我们将在 FixedWindowRateLimiterApplication
类中注入一个 StatefulRedisModulesConnection
,如下所示
@Autowired
StatefulRedisModulesConnection<String, String> connection;
添加相应的导入语句
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
我们首先编写一个函数来确定带有触发器 RateLimiter
的函数是否已注册。它接受一个 Registration
列表,并深入提取 trigger
参数的值,使用 Java Streams API
private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}
在带有 @PostConstruct
注解的方法 loadGearsScript
中
StatefulRedisModulesConnection
中检索 RedisGearsCommands
实例dumpregistrations
方法获取当前注册的 Gears 函数getGearsRegistrationIdForTrigger
py
的 String
中pyexecute
方法并传递 py
脚本载荷@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}
接下来,我们将修改过滤器以包含 StatefulRedisModulesConnection
以及配额;即我们需要传递给函数的值
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}
现在我们可以修改 filter
方法来使用该函数。Gears 函数通过触发正确的事件 RateLimiter
并传递函数所需的参数来调用;即 key
、配额以及将来 TTL 的秒数。
如前所述,如果函数返回 false
,我们将允许请求通过,否则返回 HTTP 429
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}
我们再次使用 curl 循环来测试限速器
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
您应该看到第 21 个请求被拒绝
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s
如果我们以监控模式运行 Redis,我们应该看到对 RG.TRIGGER
的 Lua 调用,并且在该调用下您应该看到对允许请求的 GET
、INCR
和 EXPIRE
调用
1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"
对于限速请求,您应该只看到对 GET
的调用
1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"
此实现的完整代码位于分支 with_gears
下。