流触发器

在将项目添加到流时执行 JavaScript 函数

Redis Stack 的触发器和函数功能提供完整的流 API 来处理来自 Redis 流 的数据。与提供微批处理 API 的 RedisGears v1 不同,新的触发器和函数功能提供 **真正的流式** API,这意味着数据将在进入流时立即被处理。

注册流消费者

触发器和函数提供一个 API,允许注册流触发器。不要与 Redis 流消费者组 混淆,触发器和函数使用 Redis 模块 API 来有效地读取流并管理其消费者。这种方法提供了更好的性能,因为不需要调用任何 Redis 命令来从流中读取。让我们看一个简单的例子

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }
);

参数描述

  • consumer - 消费者名称。
  • stream - 要触发回调的流名称前缀。
  • callback - 在流中每个元素上调用的回调。遵循与 同步和异步调用 相同的规则。回调仅在主分片上调用。

如果我们注册这个库(请查看 快速入门 部分了解如何注册 RedisGears 函数),并在我们的 Redis 上运行以下命令

XADD stream:1 * foo1 bar1
XADD stream:1 * foo2 bar2
XADD stream:2 * foo1 bar1
XADD stream:2 * foo2 bar2

我们将在 Redis 日志文件中看到以下行

2630021:M 05 Jul 2022 17:13:22.506 * <redisgears_2> {"id":["1657030402506","0"],"stream_name":"stream:1","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:25.323 * <redisgears_2> {"id":["1657030405323","0"],"stream_name":"stream:1","record":[["foo2","bar2"]]}
2630021:M 05 Jul 2022 17:13:29.475 * <redisgears_2> {"id":["1657030409475","0"],"stream_name":"stream:2","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:32.715 * <redisgears_2> {"id":["1657030412715","0"],"stream_name":"stream:2","record":[["foo2","bar2"]]}

传递给流消费者回调的 data 参数采用以下格式

{
    "id": ["<ms>", "<seq>"],
    "stream_name": "<stream name>",
    "stream_name_raw": "<stream name as ArrayBuffer>",
    "record":[
        ["<key>", "<value>"],
        .
        .
        ["<key>", "<value>"]
    ],
    "record_raw":[
        ["<key_raw>", "<value_raw>"],
        .
        .
        ["<key_raw>", "<value_raw>"]
    ],
}

记录之所以是一系列元组,而不是一个对象,是因为 Redis 流规范允许重复键。

请注意,如果数据无法解码为字符串,则 stream_namerecord 字段可能包含 null*_raw 字段始终提供,并将包含数据作为 JS ArrayBuffer

我们可以使用 TFUNCTION LIST 命令观察注册的消费者跟踪的流

127.0.0.1:6379> TFUNCTION LIST LIBRARY lib vvv
1)  1) "engine"
    1) "js"
    2) "api_version"
    3) "1.0"
    4) "name"
    5) "lib"
    6) "pending_jobs"
    7) (integer) 0
    8) "user"
    9)  "default"
    10) "functions"
   1)  (empty array)
   2)  "stream_triggers"
   3)  1)  1) "name"
           1) "consumer"
           2) "prefix"
           3) "stream"
           4) "window"
           5) (integer) 1
           6) "trim"
           7) "disabled"
           8) "num_streams"
          1)  (integer) 2
          2)  "streams"
          3)  1)  1) "name"
                  1) "stream:2"
                  2) "last_processed_time"
                  3) (integer) 0
                  4) "avg_processed_time"
                  5) "0"
                  6) "last_lag"
                  7) (integer) 0
                  8) "avg_lag"
                 1)  "0"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030412715-0"
                 6)  "last_error"
                 7)  "None"
                 17) "pending_ids"
                 18) (empty array)
              1)  1) "name"
                  1) "stream:1"
                  2) "last_processed_time"
                  3) (integer) 1
                  4) "avg_processed_time"
                  5) "0.5"
                  6) "last_lag"
                  7) (integer) 1
                  8) "avg_lag"
                 1)  "0.5"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030405323-0"
                 6)  "last_error"
                 7)  "None"
                 8)  "pending_ids"
                 9)  (empty array)
   4)  "keyspace_triggers"
   5)  (empty array)

启用修剪并设置窗口

我们可以通过在触发器回调之后添加 isStreamTrimmed 可选参数来启用流修剪,我们还可以设置 window 参数来控制可以同时处理的元素数量。例如

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }, 
    {
        isStreamTrimmed: true,
        window: 3   
    }
);

默认值为

  • isStreamTrimmed - false
  • window - 1

只要有一个消费者启用修剪,流就会被修剪。流将根据最慢的消费者进行修剪,该消费者在给定时间消耗流(即使这不是启用修剪的消费者)。在回调调用期间引发异常 **不会阻止修剪**。回调应该通过调用重试或写入一些错误日志来决定如何处理错误。错误将被添加到 TFUNCTION LIST 命令上的 last_error 字段中。

数据处理保证

只要主分片正常运行,我们就保证恰好一次属性(回调将对流中的每个元素触发正好一次)。如果发生故障,例如分片崩溃,我们保证至少一次属性(回调将对流中的每个元素触发至少一次)。

升级

升级消费者代码(使用 TFUNCTION LOAD 命令的 REPLACE 选项)时,可以更新以下消费者参数

  • 窗口
  • 修剪

任何尝试更新任何其他参数都会导致加载库时出错。

RATE THIS PAGE
Back to top ↑