流触发器
在将项目添加到流时执行 JavaScript 函数
Redis Stack 的触发器和函数功能提供完整的流 API 来处理来自 Redis 流 的数据。与提供微批处理 API 的 RedisGears v1 不同,新的触发器和函数功能提供 **真正的流式** API,这意味着数据将在进入流时立即被处理。
注册流消费者
触发器和函数提供一个 API,允许注册流触发器。不要与 Redis 流消费者组 混淆,触发器和函数使用 Redis 模块 API 来有效地读取流并管理其消费者。这种方法提供了更好的性能,因为不需要调用任何 Redis 命令来从流中读取。让我们看一个简单的例子
#!js api_version=1.0 name=myFirstLibrary
redis.registerStreamTrigger(
"consumer", // consumer name
"stream", // streams prefix
function(c, data) {
// callback to run on each element added to the stream
redis.log(JSON.stringify(data, (key, value) =>
typeof value === 'bigint'
? value.toString()
: value // return everything else unchanged
));
}
);
参数描述
- consumer - 消费者名称。
- stream - 要触发回调的流名称前缀。
- callback - 在流中每个元素上调用的回调。遵循与 同步和异步调用 相同的规则。回调仅在主分片上调用。
如果我们注册这个库(请查看 快速入门 部分了解如何注册 RedisGears 函数),并在我们的 Redis 上运行以下命令
XADD stream:1 * foo1 bar1
XADD stream:1 * foo2 bar2
XADD stream:2 * foo1 bar1
XADD stream:2 * foo2 bar2
我们将在 Redis 日志文件中看到以下行
2630021:M 05 Jul 2022 17:13:22.506 * <redisgears_2> {"id":["1657030402506","0"],"stream_name":"stream:1","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:25.323 * <redisgears_2> {"id":["1657030405323","0"],"stream_name":"stream:1","record":[["foo2","bar2"]]}
2630021:M 05 Jul 2022 17:13:29.475 * <redisgears_2> {"id":["1657030409475","0"],"stream_name":"stream:2","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:32.715 * <redisgears_2> {"id":["1657030412715","0"],"stream_name":"stream:2","record":[["foo2","bar2"]]}
传递给流消费者回调的 data
参数采用以下格式
{
"id": ["<ms>", "<seq>"],
"stream_name": "<stream name>",
"stream_name_raw": "<stream name as ArrayBuffer>",
"record":[
["<key>", "<value>"],
.
.
["<key>", "<value>"]
],
"record_raw":[
["<key_raw>", "<value_raw>"],
.
.
["<key_raw>", "<value_raw>"]
],
}
记录之所以是一系列元组,而不是一个对象,是因为 Redis 流规范允许重复键。
请注意,如果数据无法解码为字符串,则 stream_name
和 record
字段可能包含 null
。*_raw
字段始终提供,并将包含数据作为 JS
ArrayBuffer
。
我们可以使用 TFUNCTION LIST
命令观察注册的消费者跟踪的流
127.0.0.1:6379> TFUNCTION LIST LIBRARY lib vvv
1) 1) "engine"
1) "js"
2) "api_version"
3) "1.0"
4) "name"
5) "lib"
6) "pending_jobs"
7) (integer) 0
8) "user"
9) "default"
10) "functions"
1) (empty array)
2) "stream_triggers"
3) 1) 1) "name"
1) "consumer"
2) "prefix"
3) "stream"
4) "window"
5) (integer) 1
6) "trim"
7) "disabled"
8) "num_streams"
1) (integer) 2
2) "streams"
3) 1) 1) "name"
1) "stream:2"
2) "last_processed_time"
3) (integer) 0
4) "avg_processed_time"
5) "0"
6) "last_lag"
7) (integer) 0
8) "avg_lag"
1) "0"
2) "total_record_processed"
3) (integer) 2
4) "id_to_read_from"
5) "1657030412715-0"
6) "last_error"
7) "None"
17) "pending_ids"
18) (empty array)
1) 1) "name"
1) "stream:1"
2) "last_processed_time"
3) (integer) 1
4) "avg_processed_time"
5) "0.5"
6) "last_lag"
7) (integer) 1
8) "avg_lag"
1) "0.5"
2) "total_record_processed"
3) (integer) 2
4) "id_to_read_from"
5) "1657030405323-0"
6) "last_error"
7) "None"
8) "pending_ids"
9) (empty array)
4) "keyspace_triggers"
5) (empty array)
启用修剪并设置窗口
我们可以通过在触发器回调之后添加 isStreamTrimmed
可选参数来启用流修剪,我们还可以设置 window
参数来控制可以同时处理的元素数量。例如
#!js api_version=1.0 name=myFirstLibrary
redis.registerStreamTrigger(
"consumer", // consumer name
"stream", // streams prefix
function(c, data) {
// callback to run on each element added to the stream
redis.log(JSON.stringify(data, (key, value) =>
typeof value === 'bigint'
? value.toString()
: value // return everything else unchanged
));
},
{
isStreamTrimmed: true,
window: 3
}
);
默认值为
isStreamTrimmed
-false
window
- 1
只要有一个消费者启用修剪,流就会被修剪。流将根据最慢的消费者进行修剪,该消费者在给定时间消耗流(即使这不是启用修剪的消费者)。在回调调用期间引发异常 **不会阻止修剪**。回调应该通过调用重试或写入一些错误日志来决定如何处理错误。错误将被添加到 TFUNCTION LIST
命令上的 last_error
字段中。
数据处理保证
只要主分片正常运行,我们就保证恰好一次属性(回调将对流中的每个元素触发正好一次)。如果发生故障,例如分片崩溃,我们保证至少一次属性(回调将对流中的每个元素触发至少一次)。
升级
升级消费者代码(使用 TFUNCTION LOAD
命令的 REPLACE
选项)时,可以更新以下消费者参数
- 窗口
- 修剪
任何尝试更新任何其他参数都会导致加载库时出错。