Stream 触发器
当项目添加到流时,执行 JavaScript 函数
Redis 开源 | Redis Enterprise 软件 | Redis Cloud | Redis 开源 | Redis Enterprise for Kubernetes | 客户端 |
---|
Redis Stack 的触发器和函数功能带有一个完整的流 API 来处理来自 Redis 流 的数据。 与提供微批处理 API 的 RedisGears v1 不同,新的触发器和函数功能提供了一个 真正的流式处理 API,这意味着数据将在进入流后立即被处理。
注册流消费者
触发器和函数提供了一个 API,允许注册流触发器。 不要与 Redis 流消费者组 混淆,触发器和函数使用 Redis Module API 有效地读取流并管理其消费者。 这种方法提供了更好的性能,因为不需要调用任何 Redis 命令来从流中读取。 让我们看一个简单的例子
#!js api_version=1.0 name=myFirstLibrary
redis.registerStreamTrigger(
"consumer", // consumer name
"stream", // streams prefix
function(c, data) {
// callback to run on each element added to the stream
redis.log(JSON.stringify(data, (key, value) =>
typeof value === 'bigint'
? value.toString()
: value // return everything else unchanged
));
}
);
参数描述
- consumer - 消费者名称。
- stream - 触发回调的流名称前缀。
- callback - 在流中的每个元素上调用的回调。 遵循 同步和异步调用 的相同规则。 回调将仅在主分片上调用。
如果我们注册此库(请参阅 快速入门 部分,了解如何注册 RedisGears 函数)并在我们的 Redis 上运行以下命令
XADD stream:1 * foo1 bar1
XADD stream:1 * foo2 bar2
XADD stream:2 * foo1 bar1
XADD stream:2 * foo2 bar2
我们将在 Redis 日志文件中看到以下行
2630021:M 05 Jul 2022 17:13:22.506 * <redisgears_2> {"id":["1657030402506","0"],"stream_name":"stream:1","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:25.323 * <redisgears_2> {"id":["1657030405323","0"],"stream_name":"stream:1","record":[["foo2","bar2"]]}
2630021:M 05 Jul 2022 17:13:29.475 * <redisgears_2> {"id":["1657030409475","0"],"stream_name":"stream:2","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:32.715 * <redisgears_2> {"id":["1657030412715","0"],"stream_name":"stream:2","record":[["foo2","bar2"]]}
传递给流消费者回调的 data
参数的格式如下
{
"id": ["<ms>", "<seq>"],
"stream_name": "<stream name>",
"stream_name_raw": "<stream name as ArrayBuffer>",
"record":[
["<key>", "<value>"],
.
.
["<key>", "<value>"]
],
"record_raw":[
["<key_raw>", "<value_raw>"],
.
.
["<key_raw>", "<value_raw>"]
],
}
记录是元组列表而不是对象的原因是因为 Redis Stream 规范允许重复键。
请注意,如果数据无法解码为字符串,则 stream_name
和 record
字段可能包含 null
。 *_raw
字段将始终提供,并将包含作为 JS
ArrayBuffer
的数据。
我们可以使用 TFUNCTION LIST
命令观察我们注册的消费者跟踪的流
127.0.0.1:6379> TFUNCTION LIST LIBRARY lib vvv
1) 1) "engine"
1) "js"
2) "api_version"
3) "1.0"
4) "name"
5) "lib"
6) "pending_jobs"
7) (integer) 0
8) "user"
9) "default"
10) "functions"
1) (empty array)
2) "stream_triggers"
3) 1) 1) "name"
1) "consumer"
2) "prefix"
3) "stream"
4) "window"
5) (integer) 1
6) "trim"
7) "disabled"
8) "num_streams"
1) (integer) 2
2) "streams"
3) 1) 1) "name"
1) "stream:2"
2) "last_processed_time"
3) (integer) 0
4) "avg_processed_time"
5) "0"
6) "last_lag"
7) (integer) 0
8) "avg_lag"
1) "0"
2) "total_record_processed"
3) (integer) 2
4) "id_to_read_from"
5) "1657030412715-0"
6) "last_error"
7) "None"
17) "pending_ids"
18) (empty array)
1) 1) "name"
1) "stream:1"
2) "last_processed_time"
3) (integer) 1
4) "avg_processed_time"
5) "0.5"
6) "last_lag"
7) (integer) 1
8) "avg_lag"
1) "0.5"
2) "total_record_processed"
3) (integer) 2
4) "id_to_read_from"
5) "1657030405323-0"
6) "last_error"
7) "None"
8) "pending_ids"
9) (empty array)
4) "keyspace_triggers"
5) (empty array)
启用修剪并设置窗口
我们可以在触发器回调后添加 isStreamTrimmed
可选参数来启用流修剪,我们还可以设置 window
参数来控制可以同时处理多少个元素。 例子
#!js api_version=1.0 name=myFirstLibrary
redis.registerStreamTrigger(
"consumer", // consumer name
"stream", // streams prefix
function(c, data) {
// callback to run on each element added to the stream
redis.log(JSON.stringify(data, (key, value) =>
typeof value === 'bigint'
? value.toString()
: value // return everything else unchanged
));
},
{
isStreamTrimmed: true,
window: 3
}
);
默认值为
isStreamTrimmed
-false
窗口
- 1
单个消费者启用修剪就足以修剪流。 该流将根据在给定时间消耗该流的最慢消费者(即使这不是启用修剪的消费者)进行修剪。 在回调调用期间引发异常将 不会阻止修剪。 回调应该决定如何通过调用重试或写入一些错误日志来处理失败。 该错误将被添加到 TFUNCTION LIST
命令的 last_error
字段中。
数据处理保证
只要主分片启动并运行,我们就保证 exactly once 属性(回调将在流中的每个元素上恰好触发一次)。 如果发生故障(例如分片崩溃),我们保证至少一次属性(回调将在流中的每个元素上至少触发一次)
升级
升级消费者代码时(使用 TFUNCTION LOAD
命令的 REPLACE
选项),可以更新以下消费者参数
- 窗口
- 修剪
尝试更新任何其他参数将在加载库时导致错误。