学习

高级流:使用消费者组并行处理签到

Simon Prickett
作者
Simon Prickett, Redis 首席开发者布道师

随着我们的应用越来越受欢迎,用户群不断增长,我们收到的签到也越来越多。回想一下,签到是由签到接收器添加到 Redis Stream 中的,并由签到处理器从该流中读取。该流充当这两个组件之间的缓冲区。

不幸的是,我们单一的签到处理器难以跟上新签到的数量。这意味着签到到达流中到其值反映在我们的用户和位置哈希中,我们看到的时间延迟越来越长。

而且,我们无法运行多个签到处理器实例,因为每个实例都会消费整个流。我们需要一种方法,让同一个消费者代码的多个实例能够协同处理流中的条目。

Redis Streams 提供了消费者组作为此问题的解决方案。我们可以将消费者组视为一个单独的逻辑消费者,它读取整个流,并将工作分配给组中的各个消费者。

Redis 跟踪已将哪些消息发送给了组中的哪些消费者,确保每个消费者接收到流中独有的子集进行处理。这使得多个消费者进程可以并行处理流。正如您将在视频中看到的那样,这要求我们重新思考处理逻辑,以允许流条目乱序处理,并避免在更新用户和位置哈希时出现竞态条件。我们将使用 Redis 内置的 Lua 解释器来提供帮助。

动手实践练习#

在本练习中,您将运行 Checkin Group Processor 的多个并发实例,以便了解它们如何协同处理流。

如果您仍在运行 Checkin Processor 服务,请按 Ctrl-C 停止它。

接下来,打开两个终端窗口。在两个窗口中都 cd 到您克隆 GitHub 仓库的 node-js-crash-course 文件夹。

在一个终端中,启动 Checkin Group Processor 的一个实例,我们将其称为 consumer1。

$ npm run checkingroupprocessor consumer1

> [email protected] checkingroupprocessor
> node ./src/checkingroupprocessor.js "consumer1"

info: consumer1: Starting up.
info: consumer1: Processing checkin 1609602085397-0.
debug: consumer1: Processing 1609602085397-0.
debug: consumer1: Updating user ncc:users:789 and location ncc:locations:171.
info: consumer1: Acknowledged processing of checkin 1609602085397-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609604227545-0.
debug: consumer1: Processing 1609604227545-0.
debug: consumer1: Updating user ncc:users:752 and location ncc:locations:100.
info: consumer1: Acknowledged processing of checkin 1609604227545-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609605397408-0.
debug: consumer1: Processing 1609605397408-0.
debug: consumer1: Updating user ncc:users:180 and location ncc:locations:13.
info: consumer1: Acknowledged processing of checkin 1609605397408-0.
info: consumer1: Pausing to simulate work.
info: consumer1: Processing checkin 1609605876514-0.
...

在第二个终端中,启动 Checkin Group Processor 的另一个实例,consumer2。

$ npm run checkingroupprocessor consumer2

> [email protected] checkingroupprocessor
> node ./src/checkingroupprocessor.js "consumer2"

info: consumer2: Starting up.
info: consumer2: Processing checkin 1609603711960-0.
debug: consumer2: Processing 1609603711960-0.
debug: consumer2: Updating user ncc:users:455 and location ncc:locations:181.
info: consumer2: Acknowledged processing of checkin 1609603711960-0.
info: consumer2: Pausing to simulate work.
info: consumer2: Processing checkin 1609604778689-0.
debug: consumer2: Processing 1609604778689-0.
debug: consumer2: Updating user ncc:users:102 and location ncc:locations:144.
info: consumer2: Acknowledged processing of checkin 1609604778689-0.
info: consumer2: Pausing to simulate work.
...

查看每个消费者处理的签到 ID。请注意,它们不会收到相同的签到。Redis 服务器为组中的每个消费者提供其自己的流的逻辑视图,每个消费者处理一部分条目。这加快了签到处理速度,因为现在我们可以同时运行多个消费者。

让我们看看 Redis 正在跟踪的关于我们的消费者组的一些信息。继续按 Ctrl-C 停止这两个消费者进程。

如果您正在使用 RedisInsight,打开“Streams”浏览器,点击 ncc:checkins 键,然后选择“Consumer Groups”选项卡。您应该会看到类似如下的内容:

这显示了组中的消费者数量、每个消费者有多少待处理消息(待处理消息是指已被消费者读取但尚未通过 XACK 确认的消息),以及消费者自上次从流中读取以来的空闲时间。

在 Consumer Group 表中点击“checkinConsumers”以查看每个消费者的待处理消息和空闲时间的详细信息。

在实际系统中,您可以使用此信息来检测处理条目时遇到问题的消费者。Redis Streams 提供了命令来重新分配消费者已读取但未确认的消息,使您能够构建消费者恢复策略,将这些消息重新分配给同一组中健康的消费者实例。

如果您使用的是 redis-cli 而不是 RedisInsight,您可以使用 XINFO 和 XPENDING 命令查看相同的信息。

127.0.0.1:6379> xinfo groups ncc:checkins
1) 1) "name"
   2) "checkinConsumers"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 0
   7) "last-delivered-id"
   8) "1609605876514-0"
127.0.0.1:6379> xpending ncc:checkins checkinConsumers
1) (integer) 0
127.0.0.1:6379> xinfo consumers ncc:checkins checkinConsumers
1) 1) "name"
   2) "consumer1"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 2262454
2) 1) "name"
   2) "consumer2"
   3) "pending"
   4) (integer) 0
   5) "idle"
   6) (integer) 2266244

外部资源#