学习

高级流:使用消费者组并行处理签入

Simon Prickett
作者
Simon Prickett, Redis 首席开发者倡导者

随着我们应用程序的流行度和用户群的增长,我们收到的签入越来越多。回想一下,签入由签入接收器添加到 Redis 流中,并由签入处理器从该流中读取。流充当这两个组件之间的缓冲区。

不幸的是,我们唯一的签入处理器难以跟上新签入的量。这意味着我们看到签入到达流与它们的值反映在我们的用户和位置哈希之间的时间滞后越来越长。

而且,我们不能运行多个签入处理器实例,因为每个实例都会消耗整个流。我们需要一种方法让同一个消费者代码的多个实例协同处理流中的条目。

Redis 流提供消费者组作为解决方案。我们可以将消费者组视为一个读取整个流的单个逻辑消费者,在组中的各个消费者之间分配工作。

Redis 跟踪哪些消息已传递给组中的哪些消费者,确保每个消费者都收到自己对流的唯一子集进行处理。这允许多个消费者进程并行处理流。正如您将在视频中看到的那样,这要求我们重新考虑我们的处理逻辑,以允许流条目以乱序方式进行处理,并在更新用户和位置哈希时避免竞争条件。我们将使用 Redis 中内置的 Lua 解释器来帮助解决这个问题。

动手练习#

在本练习中,您将运行签入组处理器的多个并发实例,以便您可以看到它们如何协同工作来协同处理流。

如果您仍在运行签入处理器服务,请使用 Ctrl-C 停止它。

接下来,打开两个终端窗口。在两个窗口中都 cd 到您在 GitHub 存储库中克隆的 node-js-crash-course 文件夹中。

在一个终端中,启动一个我们将称为 consumer1 的签入组处理器实例

$ npm run checkingroupprocessor consumer1

> js-crash-course@0.0.1 checkingroupprocessor
> node ./src/checkingroupprocessor.js "consumer1"

info: consumer1: Starting up.
info: consumer1: Processing checkin 1609602085397-0.
debug: consumer1: Processing 1609602085397-0.
debug: consumer1: Updating user ncc:users:789 and location ncc:locations:171.
info: consumer1: Acknowledged processing of checkin 1609602085397-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609604227545-0.
debug: consumer1: Processing 1609604227545-0.
debug: consumer1: Updating user ncc:users:752 and location ncc:locations:100.
info: consumer1: Acknowledged processing of checkin 1609604227545-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609605397408-0.
debug: consumer1: Processing 1609605397408-0.
debug: consumer1: Updating user ncc:users:180 and location ncc:locations:13.
info: consumer1: Acknowledged processing of checkin 1609605397408-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609605876514-0.
...

在第二个终端中,启动另一个签入组处理器实例,consumer2

$ npm run checkingroupprocessor consumer2

> js-crash-course@0.0.1 checkingroupprocessor
> node ./src/checkingroupprocessor.js "consumer2"

info: consumer2: Starting up.
info: consumer2: Processing checkin 1609603711960-0.
debug: consumer2: Processing 1609603711960-0.
debug: consumer2: Updating user ncc:users:455 and location ncc:locations:181.
info: consumer2: Acknowledged processing of checkin 1609603711960-0.
info: consumer2: Pausing to simulate work.
info: consumer2: Processing checkin 1609604778689-0.
debug: consumer2: Processing 1609604778689-0.
debug: consumer2: Updating user ncc:users:102 and location ncc:locations:144.
info: consumer2: Acknowledged processing of checkin 1609604778689-0.
info: consumer2: Pausing to simulate work.
...

查看每个消费者处理的签入 ID。请注意,它们不会接收相同的签入。Redis 服务器为组中的每个消费者提供其自己的流逻辑视图,每个消费者处理条目的子集。这加快了签入处理速度,因为现在我们可以同时运行多个消费者。

让我们看看 Redis 跟踪关于我们消费者组的一些信息。继续并通过按 Ctrl-C 停止两个消费者进程。

如果您使用的是 RedisInsight,请打开“流”浏览器,单击 ncc:checkins 键,然后选择“消费者组”选项卡。您应该会看到类似于这样的内容

这显示了组中消费者的数量、每个消费者有多少个待处理消息(待处理消息是指消费者已读取但尚未使用 XACK 确认的消息),以及消费者自上次读取流以来的空闲时间。

单击消费者组表中的“checkinConsumers”以查看每个消费者的待处理消息和空闲时间的细分

在现实世界系统中,您可以使用此信息来检测遇到处理条目问题的消费者。Redis 流提供命令来重新分配消费者已读取但未确认的消息,允许您构建将这些消息重新分配给同一组中的健康消费者实例的消费者恢复策略。

如果您使用的是 redis-cli 而不是 RedisInsight,则可以使用 XINFOXPENDING 命令查看相同的信息

127.0.0.1:6379> xinfo groups ncc:checkins
1) 1) "name"
   2) "checkinConsumers"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1609605876514-0"
127.0.0.1:6379> xpending ncc:checkins checkinConsumers
1) (integer) 0
127.0.0.1:6379> xinfo consumers ncc:checkins checkinConsumers
1) 1) "name"
   2) "consumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 2262454
2) 1) "name"
   2) "consumer2"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 2266244

外部资源#