在 RedisInsight 中管理流和消费者组
了解如何在 RedisInsight 中管理流和消费者组
流 是一个仅追加日志文件。当您向其中添加数据时,您无法更改它。这看起来可能是一个缺点;但是,流充当日志或单一事实来源。它还可以用作以不同速度工作且无需了解彼此的进程之间的缓冲区。有关流的更多概念信息,请参阅 Redis 流。
在本主题中,您将学习如何在 RedisInsight 中添加和使用流以及消费者组。
这是一个模拟温度和湿度传感器的流。与流交互的进程执行两个角色之一:消费者和生产者。流的重点在于它不会结束,因此您无法捕获整个数据集并在其上进行一些处理。
在此流中,传感器被视为广播数据的生产者。消费者从流中读取并对其执行一些工作。例如,如果温度高于某个阈值,它会发出消息以打开该单元中的空调或通知维护人员。
可以有多个消费者执行不同的作业,一个测量湿度,另一个在一段时间内测量温度。Redis 在内存中存储整个数据集的副本,这是一个有限的资源。为避免数据失控,在向流中添加内容时可以对其进行修剪。在使用 XADD 向流中添加内容时,你可以选择指定将流修剪到特定或近似的最新条目数量,或仅包含 ID 高于指定 ID 的条目。你还可以使用键过期时间来管理流数据所需的存储空间。例如,通过将每一天的数据写入 Redis 中自己的流,并在一段时间后(比如一周)使每个流的键过期。ID 可以是任意数字,但流中的每个新条目必须具有一个 ID,其值高于添加到流中的最后一个 ID。
添加新条目
对 ID 使用 * 以及 XADD,让 Redis 自动为你生成一个新的 ID,该 ID 由毫秒精度时间戳、破折号和序列号组成。例如 1656416957625-0。然后提供要存储在新流条目中的字段名称和值。
有几种检索内容的方法。你可以按时间范围检索条目,也可以请求自你指定的某个时间戳或 ID 以来发生的所有事情。使用单个命令,你可以请求某一天上午 10:30 到 11:15 之间发生的任何事情。
消费者组
一个更实际的用例是一个系统,其中包含许多温度传感器,Redis 将其数据放入流中,记录其到达时间并对其进行排序。
在右侧,我们有两个读取流的消费者。其中一个在温度超过一定数字时发出警报,并向维护人员发送短信,告知他们需要采取行动,另一个是数据仓库,它获取数据并将其放入数据库中。
它们独立运行。在右侧,我们有另一种任务。让我们假设警报和数据仓库非常快。你可以收到一条消息,告知温度是否大于特定值,这可能需要一毫秒。并且警报可以跟上数据流。你可以扩展消费者的方法之一是消费者组,它允许同一消费者或同一代码的多个实例作为团队协作处理流。
在 RedisInsight 中管理流
您可以在 RedisInsight 中通过两种方式添加流:创建新流或添加到现有流。
要创建流,首先选择键类型(流)。您无法设置生存时间 (TTL),因为它无法放置在流中的消息上;它只能在 Redis 键上完成。将流命名为 mystream。然后,将 条目 ID 设置为 * 以默认为时间戳。如果您有自己的 ID 生成策略,请输入序列中的下一个 ID。请记住,ID 必须高于流中任何其他条目的 ID。
然后,使用 + 输入字段和值以添加多个字段和值(例如,名称和位置)。现在,您有一个显示在 流 视图中的流,您可以继续向其中添加字段和值。
RedisInsight 为您运行读取命令,以便您可以在 流 视图中看到流条目。而 消费者组 视图显示给定消费者组中的每个消费者以及 Redis 上次分配消息的时间、消息的 ID 以及该过程发生的次数,以及消费者是否告诉 Redis 您已使用 XACK 命令完成该任务。
在 RedisInsight 中监控传感器中的温度和湿度
此示例展示了如何将现有流引入 RedisInsight 并对其进行处理。
设置
- 安装 RedisInsight。
- 下载并安装 Node.js(LTS 版本)。
- 安装 Redis。在 Docker 中,检查 Redis 是否在本地默认端口 6379 上运行(未设置密码)。
- 克隆此示例的 代码存储库。有关此示例和安装提示的更多信息,请参阅 自述文件。
- 在命令行中,导航到包含代码存储库的文件夹并安装 Node.js 包管理器 (npm)。
npm install运行生产者
要启动生产者,它将每隔几秒向流中添加一个新条目,请输入
npm run producer
> streams@1.0.0 producer
> node producer.js
Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...生产者无限期运行。选择 Ctrl+C 停止它。如果你想更快地向流中添加条目,你可以启动生产者的多个实例。
运行消费者
要启动消费者,它每隔几秒从流中读取一次,输入
npm run consumer
> streams@1.0.0 consumer
> node consumer.js
Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]消费者将它读取的最后一个条目 ID 存储在键 consumer:lastid 的 Redis 字符串中。它使用此字符串在重新启动后从它停止的地方继续。通过使用 Ctrl+C 停止它并重新启动它来尝试一下。
一旦消费者处理了流中的每个条目,它将无限期地等待生产者的实例添加更多条目
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...使用 Ctrl+C 停止它。
运行消费者组
消费者组由多个消费者实例共同工作组成。Redis 管理分配从流中读取的条目给消费者组的成员。组中的消费者将接收条目的子集,整个组将接收所有条目。在消费者组中工作时,消费者进程必须确认已接收/处理每个条目。
使用多个终端窗口,启动消费者组消费者三个实例,给每个实例一个唯一名称
npm run consumergroup consumer1
> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"
Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...在第二个终端中
npm run consumergroup consumer2在第三个终端中
npm run consumergroup consumer3消费者将无限期运行,等待生产者实例将新消息添加到流中,当它们共同消费整个流时。请注意,在此模型中,每个消费者实例不会接收流中的所有条目,但该组的三个成员各自接收一个子集。
在 RedisInsight 中查看流
- 启动 RedisInsight。
- 选择
localhost:6379 - 选择 STREAM。可以选择从右上角全屏显示以展开视图。
你现在可以在 Stream 和 Consumer Groups 视图之间切换以查看你的数据。如本主题前面所述,流是一个仅追加日志,因此你无法修改条目的内容,但你可以删除整个条目。在所谓毒丸消息(可能导致消费者崩溃)的情况下,这很有用。你可以在 Streams 视图中物理删除此类消息,或在命令行界面 (CLI) 中使用 XDEL 命令。
你可以在 CLI 继续与你的流交互。例如,要获取流的当前长度,请使用 XLEN 命令
XLEN ingest:temphumidity在银行、游戏、供应链、物联网、社交媒体等领域使用流来审计和处理事件。