在 Redis Insight 中管理流和消费者组

了解如何在 Redis Insight 中管理流和消费者组

流(Stream)是一种只追加的日志文件。一旦向其中添加数据,就无法更改。这看起来像是一个缺点;然而,流可以作为日志或单一事实来源。它还可以用作在以不同速度工作且无需彼此了解的进程之间的缓冲区。有关流的更多概念信息,请参阅 Redis 流

在本主题中,您将学习如何在 Redis Insight 中添加和使用流以及消费者组。

这是一个模拟温度和湿度传感器的流。与流交互的进程扮演两种角色之一:消费者(consumer)生产者(producer)。流的关键在于它不会结束,因此您无法捕获整个数据集并对其进行处理。

在这个流中,传感器被视为生产者(producers),负责广播数据。消费者(consumer)从流中读取数据并对其进行一些工作。例如,如果温度高于某个阈值,它会发出消息来打开该单元的空调或通知维护人员。

可以有多个消费者做不同的工作,一个测量湿度,另一个测量一段时间内的温度。Redis 在内存中存储整个数据集的副本,内存是一种有限资源。为了避免数据失控,在向流中添加数据时可以对其进行修剪。使用 XADD 命令添加数据到流时,您可以选择指定将流修剪到指定或近似数量的最新条目,或者只包含 ID 高于指定 ID 的条目。您还可以使用键过期(key expiry)来管理流数据所需的存储空间。例如,每天将数据写入 Redis 中自己的流中,并在一段时间(比如一周)后使每个流的键过期。ID 可以是任何数字,但流中的每个新条目必须具有一个高于流中最后添加的条目 ID 的 ID。

添加新条目

使用 XADD 命令,将 ID 设置为 *,让 Redis 自动为您生成一个新 ID,该 ID 由毫秒级时间戳、一个破折号和一个序列号组成。例如 1656416957625-0。然后提供要存储在新流条目中的字段名称和值。

有几种检索数据的方式。您可以按时间范围检索条目,或者要求获取自您指定的某个时间戳或 ID 以来发生的所有事情。使用一个命令,您可以要求获取给定一天从上午 10:30 到 11:15 之间的任何数据。

消费者组

一个更实际的用例是拥有许多温度传感器的系统,Redis 将其数据放入流中,记录它们到达的时间,并按顺序排列。

右侧有两个读取流的消费者。其中一个在温度超过某个数值时发出警报并通知维护人员需要进行处理,另一个是将数据提取并存入数据库的数据仓库。

它们彼此独立运行。右上方还有另一种任务。假设警报和数据仓库都非常快速。你会收到一条消息,指示温度是否大于特定值,这可能只需要一毫秒。而且警报可以跟上数据流的速度。扩展消费者的一种方式是消费者组(consumer groups),它允许同一消费者或同一代码的多个实例作为一个团队来处理流。

在 Redis Insight 中管理流

您可以通过两种方式在 Redis Insight 中添加流:创建新流或向现有流添加数据。

要创建流,首先选择键类型(stream)。您无法设置生存时间(TTL),因为它不能应用于流中的消息;它只能应用于 Redis 键。将流命名为 mystream。然后,将条目 ID(Entry ID)设置为 * 以默认为时间戳。如果您有自己的 ID 生成策略,请输入您的序列中的下一个 ID。请记住,该 ID 必须高于流中任何其他条目的 ID。

然后,使用 + 输入字段和值以添加多个(例如,name 和 location)。现在您拥有一个流,它会显示在流(Streams)视图中,您可以继续向其中添加字段和值。

Redis Insight 会为您运行读取命令,以便您可以在流(Streams)视图中看到流条目。消费者组(Consumer Groups)视图显示给定消费者组中的每个消费者,以及 Redis 最后一次分配消息的时间、消息的 ID、该过程发生的次数,以及消费者是否已使用 XACK 命令告知 Redis 已完成该任务。

在 Redis Insight 中监控来自传感器的温度和湿度

本示例展示了如何将现有流导入 Redis Insight 并使用它。

设置

  1. 安装 Redis Insight
  2. 下载并安装 Node.js(LTS 版本)。
  3. 安装 Redis。在 Docker 中,检查 Redis 是否正在本地默认端口 6379 上运行(未设置密码)。
  4. 克隆本例的代码仓库。请参阅 README 以获取有关此示例和安装技巧的更多信息。
  5. 在命令行中,导航到包含代码仓库的文件夹并安装 Node.js 包管理器 (npm)。
npm install

运行生产者

要启动生产者(它将每隔几秒向流中添加一个新条目),输入

npm run producer

> [email protected] producer
> node producer.js

Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...

生产者将无限期运行。按 Ctrl+C 停止它。如果您想更快地向流中添加条目,可以启动生产者的多个实例。

运行消费者

要启动消费者(它将每隔几秒从流中读取数据),输入

npm run consumer

> [email protected] consumer
> node consumer.js

Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]

消费者将其读取的最后一个条目 ID 存储在 Redis 字符串键 consumer:lastid 中。它使用此字符串在重新启动后从上次离开的地方继续处理。通过按 Ctrl+C 停止它并重新启动来尝试此功能。

一旦消费者处理完流中的所有条目,它将无限期地等待生产者实例添加更多数据

Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...

使用 Ctrl+C 停止它。

运行消费者组

消费者组由多个协同工作的消费者实例组成。Redis 负责将从流中读取的条目分配给消费者组成员。组中的一个消费者将收到条目的子集,而整个组将收到所有条目。在消费者组中工作时,消费者进程必须确认每个条目的接收/处理。

使用多个终端窗口,启动消费者组消费者的三个实例,每个实例赋予唯一的名称

npm run consumergroup consumer1

> [email protected] consumergroup
> node consumer_group.js -- "consumer1"

Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...

在第二个终端中

npm run consumergroup consumer2

在第三个终端中

npm run consumergroup consumer3

当消费者们共同消费完整个流后,它们将无限期地运行,等待生产者实例向流中添加新消息。请注意,在此模型中,每个消费者实例不会收到流中的所有条目,而是该组的三个成员各自收到条目的子集。

在 Redis Insight 中查看流

  1. 启动 Redis Insight。
  2. 选择 localhost:6379
  3. 选择流(STREAM)。您还可以选择右上角的“全屏”按钮来展开视图。

您现在可以在流(Stream)消费者组(Consumer Groups)视图之间切换以查看您的数据。如本主题前面所述,流是只追加的日志,因此您无法修改条目的内容,但可以删除整个条目。在遇到所谓的“毒丸消息”(poison-pill message)可能导致消费者崩溃时,删除条目会很有用。您可以在流(Streams)视图中物理删除此类消息,或在命令行界面(CLI)中使用 XDEL 命令。

您可以在 CLI 继续与您的流交互。例如,要获取流的当前长度,使用 XLEN 命令

XLEN ingest:temphumidity

流可用于银行、游戏、供应链、物联网、社交媒体等领域的审计和事件处理。

评价此页面
返回顶部 ↑