本教程使用 Lettuce,它是一个不受支持的 Redis 库。对于生产应用程序,我们建议使用 Jedis
RedisGears 是一个动态的服务器端数据处理引擎,其中“服务器”部分是 Redis 本身。RedisGears 作为 Redis 模块进行分发。您可以使用官方 Docker 镜像启动预先配置了 Gears 的 Redis 实例:
docker run -p 6379:6379 redislabs/redisgears:latest
或者,就像我通常做的那样,使用 "redismod" 镜像,其中包含 Gears 和所有其他 Redis, Inc. 支持的模块:
docker run -p 6379:6379 redislabs/redismod
RedisGears 的构建目的是在 Redis 内部提供一个数据处理引擎,与更简单的 Lua 服务器端脚本相比,它具有更正式的语义。可以将其视为 Redis 的更灵活的 map-reduce 引擎。它支持对 Redis 数据进行事务、批处理和事件驱动的处理。Gears 允许您将计算和数据本地化,并提供内置协调器以简化在集群环境中处理分布式数据。
在 RedisGears 中,处理的主要单元是 RedisGears 函数,该函数可以用 Python(目前正在开发更多语言)编写。这些函数在自己的线程上运行,与 Redis 的主线程分离,并且可以响应键空间事件或作为外部命令的结果而以命令方式执行。这些函数“注册”(部署)到 Gears 引擎,并具有关联的名称和注册 ID。
在注册过程中,我们会为我们的函数选择一个特定的读取器,它定义了函数如何获取其初始数据
KeysReader
: Redis 键和值。KeysOnlyReader
: Redis 键。StreamReader
: Redis 流消息。PythonReader
: 任意 Python 生成器。ShardsIDReader
: 分片 ID。CommandReader
: 来自应用程序客户端的命令参数。根据读取器类型,Gear 函数可以立即运行,按需运行,作为批处理作业运行,或者通过注册到各种类型的事件以自动触发的方式以事件驱动的运行。
Python 函数 rate_limit
接受 3 个参数
key
: 支持给定用户计数器的 Redis 键。max_request
: 用户的请求配额。expiry
: 将计数器 TTL 设置为未来的秒数。def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
将脚本放在 src/main/resources/scripts
下。现在,让我们分解它
rate_limit
函数与我们在之前实现中所做的一样,我们
execute
-ing GET
命令检索传递的 key
的当前请求数。 int
,如果未找到,则默认为 -1
max_requests
和 expiry
转换为 int
with atomic():
) 执行 INCR
/EXPIRE
命令,并返回 False
(无速率限制 - 允许请求) True
(拒绝请求)# Function registration
部分,我们使用 GearsBuilder
(GB
) 实例化 CommandReader
读取器。GearsBuilder
“构建”函数的上下文,包括参数、转换、触发器等。 map
方法通过映射器函数回调对记录到 rate_limit
函数的参数进行一对一映射。 register
操作将函数注册为事件处理程序。我们的事件是触发器 'RateLimiter'
。 根据读取器类型,Gear 函数可以立即运行,按需运行,作为批处理作业运行,或者通过注册到各种类型的事件以自动触发的方式以事件驱动的运行。
Python 函数 rate_limit
接受 3 个参数
key
: 支持给定用户计数器的 Redis 键。max_request
: 用户的请求配额。expiry
: 将计数器 TTL 设置为未来的秒数。 def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
将脚本放在 src/main/resources/scripts
下。现在,让我们分解它
与我们在之前实现中所做的一样,我们
execute
-ing GET
命令检索传递的 key
的当前请求数。 int
,如果未找到,则默认为 -1
max_requests
和 expiry
转换为 int
with atomic():
) 执行 INCR
/EXPIRE
命令,并返回 False
(无速率限制 - 允许请求) True
(拒绝请求)# 函数注册
部分,我们使用 CommandReader
读取器实例化 GearsBuilder
(GB
)。 GearsBuilder
"构建" 函数的上下文,包括参数、转换、触发器等。map
方法通过映射器函数回调对记录进行一对一映射,映射到 rate_limit
函数的参数。register
操作将函数注册为事件处理程序。在本例中,事件是触发器 'RateLimiter'
。为了从 SpringBoot 应用程序中使用 RedisGear 函数,我们需要执行以下几个步骤
LettuceMod 是基于 Lettuce 的 Redis 模块的 Java 客户端,由 Julien Ruaux 创建。它支持以下模块,可以独立使用或在集群配置中使用
要使用 LettuceMod,我们将根据以下所示将依赖项添加到 Maven POM 中
<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>
要访问 LettuceMod 支持的任何模块,我们将在 FixedWindowRateLimiterApplication
类中注入一个 StatefulRedisModulesConnection
,如下所示
@Autowired
StatefulRedisModulesConnection<String, String> connection;
添加匹配的导入语句
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
首先,我们将编写一个函数来确定具有触发器 RateLimiter
的函数是否已注册。该函数接受一个 List
的 Registration
,并深入挖掘以使用 Java Streams API 提取 trigger
参数的值
private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}
在 @PostConstruct
注释的方法 loadGearsScript
方法中
StatefulRedisModulesConnection
中检索 RedisGearsCommands
的实例dumpregistrations
方法获取当前注册的 Gears 函数getGearsRegistrationIdForTrigger
py
的 String
中pyexecute
方法传递 py
脚本有效负载@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}
接下来,我们将修改过滤器,以包含 StatefulRedisModulesConnection
以及配额;我们需要传递给函数的值
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}
现在,我们可以修改 filter
方法以使用该函数。Gears 函数通过触发正确的事件 RateLimiter
并传递函数所需的 parameters 来调用:key
、配额和未来的 TTL 秒数。
与之前一样,如果函数返回 false
,我们将让请求通过,否则我们将返回一个 HTTP 429
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}
我们再次使用 curl 循环来测试限制器
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
您应该看到第 21 个请求被拒绝
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET https://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s
如果我们在监视模式下运行 Redis,我们应该看到对 RG.TRIGGER
的 Lua 调用,在它下面应该看到对 GET
、INCR
和 EXPIRE
的调用,用于允许的请求
1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"
对于限速请求,您应该只看到对 GET
的调用
1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"
此实现的完整代码位于分支 with_gears
下。