Stream 触发器

当项目添加到流时,执行 JavaScript 函数

Redis 开源 Redis Enterprise 软件 Redis Cloud Redis 开源 Redis Enterprise for Kubernetes 客户端

Redis Stack 的触发器和函数功能带有一个完整的流 API 来处理来自 Redis 流 的数据。 与提供微批处理 API 的 RedisGears v1 不同,新的触发器和函数功能提供了一个 真正的流式处理 API,这意味着数据将在进入流后立即被处理。

注册流消费者

触发器和函数提供了一个 API,允许注册流触发器。 不要与 Redis 流消费者组 混淆,触发器和函数使用 Redis Module API 有效地读取流并管理其消费者。 这种方法提供了更好的性能,因为不需要调用任何 Redis 命令来从流中读取。 让我们看一个简单的例子

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }
);

参数描述

  • consumer - 消费者名称。
  • stream - 触发回调的流名称前缀。
  • callback - 在流中的每个元素上调用的回调。 遵循 同步和异步调用 的相同规则。 回调将仅在主分片上调用。

如果我们注册此库(请参阅 快速入门 部分,了解如何注册 RedisGears 函数)并在我们的 Redis 上运行以下命令

XADD stream:1 * foo1 bar1
XADD stream:1 * foo2 bar2
XADD stream:2 * foo1 bar1
XADD stream:2 * foo2 bar2

我们将在 Redis 日志文件中看到以下行

2630021:M 05 Jul 2022 17:13:22.506 * <redisgears_2> {"id":["1657030402506","0"],"stream_name":"stream:1","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:25.323 * <redisgears_2> {"id":["1657030405323","0"],"stream_name":"stream:1","record":[["foo2","bar2"]]}
2630021:M 05 Jul 2022 17:13:29.475 * <redisgears_2> {"id":["1657030409475","0"],"stream_name":"stream:2","record":[["foo1","bar1"]]}
2630021:M 05 Jul 2022 17:13:32.715 * <redisgears_2> {"id":["1657030412715","0"],"stream_name":"stream:2","record":[["foo2","bar2"]]}

传递给流消费者回调的 data 参数的格式如下

{
    "id": ["<ms>", "<seq>"],
    "stream_name": "<stream name>",
    "stream_name_raw": "<stream name as ArrayBuffer>",
    "record":[
        ["<key>", "<value>"],
        .
        .
        ["<key>", "<value>"]
    ],
    "record_raw":[
        ["<key_raw>", "<value_raw>"],
        .
        .
        ["<key_raw>", "<value_raw>"]
    ],
}

记录是元组列表而不是对象的原因是因为 Redis Stream 规范允许重复键。

请注意,如果数据无法解码为字符串,则 stream_namerecord 字段可能包含 null*_raw 字段将始终提供,并将包含作为 JS ArrayBuffer 的数据。

我们可以使用 TFUNCTION LIST 命令观察我们注册的消费者跟踪的流

127.0.0.1:6379> TFUNCTION LIST LIBRARY lib vvv
1)  1) "engine"
    1) "js"
    2) "api_version"
    3) "1.0"
    4) "name"
    5) "lib"
    6) "pending_jobs"
    7) (integer) 0
    8) "user"
    9)  "default"
    10) "functions"
   1)  (empty array)
   2)  "stream_triggers"
   3)  1)  1) "name"
           1) "consumer"
           2) "prefix"
           3) "stream"
           4) "window"
           5) (integer) 1
           6) "trim"
           7) "disabled"
           8) "num_streams"
          1)  (integer) 2
          2)  "streams"
          3)  1)  1) "name"
                  1) "stream:2"
                  2) "last_processed_time"
                  3) (integer) 0
                  4) "avg_processed_time"
                  5) "0"
                  6) "last_lag"
                  7) (integer) 0
                  8) "avg_lag"
                 1)  "0"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030412715-0"
                 6)  "last_error"
                 7)  "None"
                 17) "pending_ids"
                 18) (empty array)
              1)  1) "name"
                  1) "stream:1"
                  2) "last_processed_time"
                  3) (integer) 1
                  4) "avg_processed_time"
                  5) "0.5"
                  6) "last_lag"
                  7) (integer) 1
                  8) "avg_lag"
                 1)  "0.5"
                 2)  "total_record_processed"
                 3)  (integer) 2
                 4)  "id_to_read_from"
                 5)  "1657030405323-0"
                 6)  "last_error"
                 7)  "None"
                 8)  "pending_ids"
                 9)  (empty array)
   4)  "keyspace_triggers"
   5)  (empty array)

启用修剪并设置窗口

我们可以在触发器回调后添加 isStreamTrimmed 可选参数来启用流修剪,我们还可以设置 window 参数来控制可以同时处理多少个元素。 例子

#!js api_version=1.0 name=myFirstLibrary

redis.registerStreamTrigger(
    "consumer", // consumer name
    "stream", // streams prefix
    function(c, data) {
        // callback to run on each element added to the stream
        redis.log(JSON.stringify(data, (key, value) =>
            typeof value === 'bigint'
                ? value.toString()
                : value // return everything else unchanged
        ));
    }, 
    {
        isStreamTrimmed: true,
        window: 3   
    }
);

默认值为

  • isStreamTrimmed - false
  • 窗口 - 1

单个消费者启用修剪就足以修剪流。 该流将根据在给定时间消耗该流的最慢消费者(即使这不是启用修剪的消费者)进行修剪。 在回调调用期间引发异常将 不会阻止修剪。 回调应该决定如何通过调用重试或写入一些错误日志来处理失败。 该错误将被添加到 TFUNCTION LIST 命令的 last_error 字段中。

数据处理保证

只要主分片启动并运行,我们就保证 exactly once 属性(回调将在流中的每个元素上恰好触发一次)。 如果发生故障(例如分片崩溃),我们保证至少一次属性(回调将在流中的每个元素上至少触发一次)

升级

升级消费者代码时(使用 TFUNCTION LOAD 命令的 REPLACE 选项),可以更新以下消费者参数

  • 窗口
  • 修剪

尝试更新任何其他参数将在加载库时导致错误。

对此页面评分
返回顶部 ↑