在 Redis Insight 中管理流和消费者组
了解如何在 Redis Insight 中管理流和消费者组
一个流是一个仅追加的日志文件。当您向其中添加数据时,您无法更改它。这看起来可能是一个缺点;然而,流充当日志或单一的事实来源。它也可以用作在不同速度工作且不需要彼此了解的进程之间的缓冲区。有关流的更多概念信息,请参阅 Redis 流。
在本主题中,您将了解如何在 Redis Insight 中添加和使用流以及消费者组。
这是一个模拟温度和湿度传感器的流。与流交互的进程执行以下两种角色之一:消费者和生产者。流的重点在于它不会结束,因此您无法捕获整个数据集并对其进行处理。
在这个流中,传感器被认为是生产者,它们广播数据。消费者从流中读取数据并对其进行一些操作。例如,如果温度高于某个阈值,它会发出消息来打开该单元的空调或通知维护人员。

可以使用多个消费者执行不同的任务,例如一个测量湿度,另一个测量一段时间内的温度。Redis 会在内存中存储整个数据集的副本,而内存是有限资源。为了避免数据失控,可以在向流添加数据时对其进行修剪。使用 XADD
向流添加数据时,可以选择指定修剪流以保留特定数量或近似数量的最新条目,或仅保留 ID 大于指定 ID 的条目。还可以使用键过期来管理流数据所需的存储空间。例如,将每天的数据写入 Redis 的独立流中,并在一段时间(例如一周)后使每个流的键过期。ID 可以是任何数字,但流中每个新条目必须具有一个 ID,其值高于添加到流中的最后一个 ID。
添加新条目
使用 XADD
并将 ID 设置为 *
,让 Redis 自动为您生成一个新的 ID,该 ID 由毫秒精度时间戳、一个短横线和一个序列号组成。例如 1656416957625-0
。然后提供要存储在新的流条目中的字段名称和值。
有几种方法可以检索数据。您可以按时间范围检索条目,或者可以请求自您指定的某个时间戳或 ID 以来发生的所有事件。使用单个命令,您可以请求某天从上午 10:30 到 11:15 之间的任何事件。
消费者组
一个更现实的用例是一个系统,其中有许多温度传感器,Redis 将其数据放入流中,记录其到达时间并对其进行排序。

在右侧,我们有两个消费者读取流。其中一个消费者会监控温度是否超过某个值,并在温度超过某个值时向维护人员发送短信,通知他们需要采取措施。另一个消费者是一个数据仓库,它会获取数据并将其放入数据库。
它们相互独立地运行。在右侧上方,我们有另一种任务。假设警报和数据仓库速度很快。您会收到一条消息,告知温度是否大于特定值,这可能只需要几毫秒。警报可以跟上数据流。扩展消费者的一个方法是使用 _消费者组_,它允许同一个消费者的多个实例或同一代码的多个实例作为一个团队处理流。
在 Redis Insight 中管理流
您可以在 Redis Insight 中通过两种方式添加流:创建新流或添加到现有流。
要创建流,首先选择键类型(流)。您无法设置生存时间(TTL),因为它不能放置在流中的消息上;它只能在 Redis 键上完成。将流命名为 mystream。然后,将 _条目 ID_ 设置为 *
以默认使用时间戳。如果您有自己的 ID 生成策略,请从您的序列中输入下一个 ID。请记住,ID 必须高于流中任何其他条目的 ID。
然后,使用 + 输入多个字段和值(例如,名称和位置)以添加多个字段和值。现在您有一个流,它会显示在 **流** 视图中,您可以继续向其添加字段和值。
Redis Insight 会为您运行读取命令,这样您就可以在 **流** 视图中查看流条目。**消费者组** 视图将显示每个消费者组中的每个消费者以及 Redis 最后一次分配消息的时间、消息的 ID、该过程发生的次数以及消费者是否已使用 XACK
命令告诉 Redis 您已完成对该任务的操作。
在 Redis Insight 中监控来自传感器的温度和湿度
此示例演示如何将现有流引入 Redis Insight 并对其进行操作。
设置
- 安装 Redis Insight.
- 下载并安装 Node.js(LTS 版本)。
- 安装 Redis。在 Docker 中,检查 Redis 是否在默认端口 6379(没有设置密码)上本地运行。
- 克隆此示例的 代码库。有关此示例和安装技巧的更多信息,请参阅 README。
- 在命令行中,导航到包含代码库的文件夹并安装 Node.js 包管理器 (npm)。
npm install
运行生产者
要启动生产者(它将每隔几秒钟向流添加一个新的条目),请输入
npm run producer
> streams@1.0.0 producer
> node producer.js
Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...
生产者将无限期运行。选择 Ctrl+C
以停止它。如果您想更快地向流中添加条目,可以启动生产者的多个实例。
运行消费者
要启动消费者(它将每隔几秒钟从流中读取数据),请输入
npm run consumer
> streams@1.0.0 consumer
> node consumer.js
Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
消费者会将读取的最后一个条目 ID 存储在 Redis 字符串中,该字符串位于键 consumer:lastid
下。它会使用该字符串在重新启动后从上次中断的地方继续执行。尝试通过使用 Ctrl+C
停止它并重新启动它来测试这一点。
消费者处理完流中的所有条目后,它将无限期地等待生产者实例添加更多条目。
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
使用 Ctrl+C
停止它。
运行消费者组
消费者组由多个消费者实例协同工作组成。Redis 会管理从流中读取的条目的分配,分配给消费者组的成员。消费者组中的一个消费者将接收条目的子集,而整个消费者组将接收所有条目。在消费者组中工作时,消费者进程必须确认已收到/处理每个条目。
使用多个终端窗口,启动消费者组消费者的三个实例,为每个实例提供一个唯一的名称
npm run consumergroup consumer1
> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"
Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...
在第二个终端中
npm run consumergroup consumer2
在第三个终端中
npm run consumergroup consumer3
消费者将无限期运行,等待生产者实例在它们共同消耗完整个流后将新消息添加到流中。请注意,在此模型中,每个消费者实例都不会接收来自流的所有条目,但该组的三个成员中的每一个都将接收一个子集。
在 Redis Insight 中查看流
- 启动 Redis Insight。
- 选择
localhost:6379
- 选择 **流**。可以选择右上角的全屏按钮以展开视图。

现在,您可以在 **流** 和 **消费者组** 视图之间切换以查看您的数据。如本主题前面所述,流是仅追加日志,因此您无法修改条目的内容,但可以删除整个条目。当发生所谓的 _毒丸消息_(这会导致消费者崩溃)时,此功能很有用。您可以在 **流** 视图中物理删除此类消息,或者在命令行界面 (CLI) 中使用 XDEL
命令。
您可以在 CLI 中继续与流进行交互。例如,要获取流的当前长度,请使用 XLEN
命令
XLEN ingest:temphumidity
将流用于银行、游戏、供应链、物联网、社交媒体等方面的审计和事件处理。