在 RedisInsight 中管理流和消费者组

了解如何在 RedisInsight 中管理流和消费者组

是一个仅追加日志文件。当您向其中添加数据时,您无法更改它。这看起来可能是一个缺点;但是,流充当日志或单一事实来源。它还可以用作以不同速度工作且无需了解彼此的进程之间的缓冲区。有关流的更多概念信息,请参阅 Redis 流

在本主题中,您将学习如何在 RedisInsight 中添加和使用流以及消费者组。

这是一个模拟温度和湿度传感器的流。与流交互的进程执行两个角色之一:消费者生产者。流的重点在于它不会结束,因此您无法捕获整个数据集并在其上进行一些处理。

在此流中,传感器被视为广播数据的生产者消费者从流中读取并对其执行一些工作。例如,如果温度高于某个阈值,它会发出消息以打开该单元中的空调或通知维护人员。

可以有多个消费者执行不同的作业,一个测量湿度,另一个在一段时间内测量温度。Redis 在内存中存储整个数据集的副本,这是一个有限的资源。为避免数据失控,在向流中添加内容时可以对其进行修剪。在使用 XADD 向流中添加内容时,你可以选择指定将流修剪到特定或近似的最新条目数量,或仅包含 ID 高于指定 ID 的条目。你还可以使用键过期时间来管理流数据所需的存储空间。例如,通过将每一天的数据写入 Redis 中自己的流,并在一段时间后(比如一周)使每个流的键过期。ID 可以是任意数字,但流中的每个新条目必须具有一个 ID,其值高于添加到流中的最后一个 ID。

添加新条目

对 ID 使用 * 以及 XADD,让 Redis 自动为你生成一个新的 ID,该 ID 由毫秒精度时间戳、破折号和序列号组成。例如 1656416957625-0。然后提供要存储在新流条目中的字段名称和值。

有几种检索内容的方法。你可以按时间范围检索条目,也可以请求自你指定的某个时间戳或 ID 以来发生的所有事情。使用单个命令,你可以请求某一天上午 10:30 到 11:15 之间发生的任何事情。

消费者组

一个更实际的用例是一个系统,其中包含许多温度传感器,Redis 将其数据放入流中,记录其到达时间并对其进行排序。

在右侧,我们有两个读取流的消费者。其中一个在温度超过一定数字时发出警报,并向维护人员发送短信,告知他们需要采取行动,另一个是数据仓库,它获取数据并将其放入数据库中。

它们独立运行。在右侧,我们有另一种任务。让我们假设警报和数据仓库非常快。你可以收到一条消息,告知温度是否大于特定值,这可能需要一毫秒。并且警报可以跟上数据流。你可以扩展消费者的方法之一是消费者组,它允许同一消费者或同一代码的多个实例作为团队协作处理流。

在 RedisInsight 中管理流

您可以在 RedisInsight 中通过两种方式添加流:创建新流或添加到现有流。

要创建流,首先选择键类型(流)。您无法设置生存时间 (TTL),因为它无法放置在流中的消息上;它只能在 Redis 键上完成。将流命名为 mystream。然后,将 条目 ID 设置为 * 以默认为时间戳。如果您有自己的 ID 生成策略,请输入序列中的下一个 ID。请记住,ID 必须高于流中任何其他条目的 ID。

然后,使用 + 输入字段和值以添加多个字段和值(例如,名称和位置)。现在,您有一个显示在 视图中的流,您可以继续向其中添加字段和值。

RedisInsight 为您运行读取命令,以便您可以在 视图中看到流条目。而 消费者组 视图显示给定消费者组中的每个消费者以及 Redis 上次分配消息的时间、消息的 ID 以及该过程发生的次数,以及消费者是否告诉 Redis 您已使用 XACK 命令完成该任务。

在 RedisInsight 中监控传感器中的温度和湿度

此示例展示了如何将现有流引入 RedisInsight 并对其进行处理。

设置

  1. 安装 RedisInsight
  2. 下载并安装 Node.js(LTS 版本)。
  3. 安装 Redis。在 Docker 中,检查 Redis 是否在本地默认端口 6379 上运行(未设置密码)。
  4. 克隆此示例的 代码存储库。有关此示例和安装提示的更多信息,请参阅 自述文件
  5. 在命令行中,导航到包含代码存储库的文件夹并安装 Node.js 包管理器 (npm)。
npm install

运行生产者

要启动生产者,它将每隔几秒向流中添加一个新条目,请输入

npm run producer

> streams@1.0.0 producer
> node producer.js

Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...

生产者无限期运行。选择 Ctrl+C 停止它。如果你想更快地向流中添加条目,你可以启动生产者的多个实例。

运行消费者

要启动消费者,它每隔几秒从流中读取一次,输入

npm run consumer

> streams@1.0.0 consumer
> node consumer.js

Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]

消费者将它读取的最后一个条目 ID 存储在键 consumer:lastid 的 Redis 字符串中。它使用此字符串在重新启动后从它停止的地方继续。通过使用 Ctrl+C 停止它并重新启动它来尝试一下。

一旦消费者处理了流中的每个条目,它将无限期地等待生产者的实例添加更多条目

Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...

使用 Ctrl+C 停止它。

运行消费者组

消费者组由多个消费者实例共同工作组成。Redis 管理分配从流中读取的条目给消费者组的成员。组中的消费者将接收条目的子集,整个组将接收所有条目。在消费者组中工作时,消费者进程必须确认已接收/处理每个条目。

使用多个终端窗口,启动消费者组消费者三个实例,给每个实例一个唯一名称

npm run consumergroup consumer1

> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"

Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...

在第二个终端中

npm run consumergroup consumer2

在第三个终端中

npm run consumergroup consumer3

消费者将无限期运行,等待生产者实例将新消息添加到流中,当它们共同消费整个流时。请注意,在此模型中,每个消费者实例不会接收流中的所有条目,但该组的三个成员各自接收一个子集。

在 RedisInsight 中查看流

  1. 启动 RedisInsight。
  2. 选择 localhost:6379
  3. 选择 STREAM。可以选择从右上角全屏显示以展开视图。

你现在可以在 StreamConsumer Groups 视图之间切换以查看你的数据。如本主题前面所述,流是一个仅追加日志,因此你无法修改条目的内容,但你可以删除整个条目。在所谓毒丸消息(可能导致消费者崩溃)的情况下,这很有用。你可以在 Streams 视图中物理删除此类消息,或在命令行界面 (CLI) 中使用 XDEL 命令。

你可以在 CLI 继续与你的流交互。例如,要获取流的当前长度,请使用 XLEN 命令

XLEN ingest:temphumidity

在银行、游戏、供应链、物联网、社交媒体等领域使用流来审计和处理事件。

给此页面评分
© . This site is unofficial and not affiliated with Redis, Inc.