学习

使用 Redis Streams 处理签到

Simon Prickett
作者
Simon Prickett, Redis 首席开发者倡导者

用户在我们的系统中执行的最常见操作是在某个位置签到。这部分系统需要快速捕获签到信息,并且能够独立于其他组件进行扩展。

我们决定构建一个单独的 Express 应用程序来接收来自用户的签到 POST 请求。这使我们能够独立于处理 GET 请求的其他 API 端点进行扩展。为了使我们的签到接收器尽可能快,我们决定在一个单独的服务中完成处理签到的实际工作。签到处理器服务从流中读取签到信息,并更新 Redis 中的用户和位置哈希。

签到信息在我们的系统中是瞬态数据——只要我们处理完所有签到信息,就不需要永远保存它们。将它们按到达系统的顺序存储也有意义。

使用 Redis 流来存储我们的签到数据非常适合这种情况。流充当生产者和消费者组件之间的缓冲区。在 Redis 流中,流中的每个条目都将被赋予一个时间戳 ID,并且流按这些 ID 排序。

在我们的应用程序中,签到接收器服务是生产者,签到处理器是消费者。我们可以用下图表示:

使用流可以让这些组件以不同的速度运行,并且彼此之间没有了解。签到接收器只需为它从用户那里收到的每个签到信息在流中添加一个新条目,而签到处理器则以自己的速度读取流并更新用户和位置哈希。

还可以读取 Redis 流来查找在指定时间段内添加的条目,时间段在开始 ID 和结束 ID 之间。由于我们的 ID 是时间戳,这意味着我们可以请求在给定时间段内添加的数据。我们在 API 服务器组件中使用了此功能,在本模块的编码练习中,您将有机会使用新功能扩展此功能。

动手练习#

花点时间运行签到处理器组件,该组件从流中读取签到信息并更新用户和位置哈希。

示例数据包含 5000 个未处理的签到信息,签到处理器将消耗这些信息。签到处理器通过在 Redis 中存储最后一个处理的签到的 ID 来跟踪它在流中的位置。这样,当它停止并重新启动时,它将从停止的地方继续处理。

在一个终端窗口中,cd 到您克隆的 GitHub 存储库的 node-js-crash-course 文件夹,然后启动签到处理器

$ npm run checkinprocessor delay

添加delay为每个签到信息引入了人工随机处理时间。这会减慢签到处理器的速度,以便您可以更轻松地检查其输出。您应该会看到它启动并开始从流的开头处理签到信息,ID 为 0,这是可能的最低流条目 ID:

$ npm run checkinprocessor delay

> js-crash-course@0.0.1 checkinprocessor
> node ./src/checkinprocessor.js -- "delay"

info: Reading stream from last ID 0.
debug: Updating user ncc:users:789 and location ncc:locations:171.
info: Processed checkin 1609602085397-0.
debug: Updating user ncc:users:455 and location ncc:locations:181.
info: Processed checkin 1609603711960-0.
debug: Updating user ncc:users:752 and location ncc:locations:100.
info: Processed checkin 1609604227545-0.
debug: Updating user ncc:users:102 and location ncc:locations:144.
info: Processed checkin 1609604778689-0.
debug: Updating user ncc:users:180 and location ncc:locations:13.
info: Processed checkin 1609605397408-0.
...

在它处理了一些签到信息后,使用 Ctrl-C 停止签到处理器。记下最后一个处理的签到的 ID(在上例中为 1609605397408-0)。还要记下最后一个处理的签到的用户 ID 和位置 ID(在上例中为用户 180,位置 13)。

验证签到处理器是否将此 ID 存储在 Redis 中,以便它知道在重新启动时从哪里开始。使用 redis-cli 或 RedisInsight 检查键 ncc:checkinprocessor:lastid 的内容

127.0.0.1:6379> get ncc:checkinprocessor:lastid
"1609605397408-0"

该值应该与最后一个处理的签到 ID 相匹配。

最后,让我们验证签到处理器是否使用该签到信息更新了用户的哈希。使用 RedisInsight 或 redis-cli 中的 HGETALL 命令查看键为 ncc:users:<user-id> 的哈希,用您之前记下的用户 ID 替换 <user-id>

因此,以我的示例为例,让我们看看用户 180

127.0.0.1:6379> hgetall ncc:users:180
 1) "id"
 2) "180"
 3) "firstName"
 4) "Sophia"
 5) "lastName"
 6) "Marshall"
 7) "email"
 8) "sophia.marshall@example.com"
 9) "password"
10) "$2b$05$DPSHjaW44H4fn9sudfz/5.f1WcuZMrA0OZIp0CALQf0MH8zH1SSda"
11) "numCheckins"
12) "2332"
13) "lastCheckin"
14) "1609605397408"
15) "lastSeenAt"
16) "13"

验证 lastCheckin 的值是否为最后一个处理的签到的 ID 的时间戳(在我的例子中为 1609605397408),以及 lastSeenAt 的值是否为最后一个处理的签到的位置 ID(在我的例子中为 13)。

编码练习#

在本练习中,您将在 API 服务器组件中实现一条新路由。此路由将只返回签到流中的最新签到信息。您将使用 XREVRANGE 命令 来实现此功能。

首先,确保 API 服务器正在运行

$ npm run dev

(请记住,这将使用 nodemon 启动服务器,因此,当您修改代码并保存更改时,它将自动重启并运行新代码)。

使用您的 IDE 打开 node-js-crash-course 文件夹,然后打开文件 src/routes/checkin_routes.js。找到处理 /checkins/latest 路由的函数。

XREVRANGE 文档 为参考,修改以下行以调用 XREVRANGE,使其只返回最新签到信息

const latestCheckin = await redisClient.xrevrange(checkinStreamKey, 'TODO');

请记住:使用 ioredis 时,Redis 命令的每个参数都需要作为单独的值传递。

通过访问 https://localhost:8081/checkins/latest 进行测试——您应该会看到签到信息的 JSON 表示形式。

为了确保您的代码返回最新签到信息,您需要使用 Postman 发送签到信息。在一个新的终端窗口中启动签到接收器组件

$ npm run checkinreceiver

然后使用 Postman 发送签到信息。在 Postman 中,打开一个新的请求,按如下所示配置它,然后按发送

现在,当您在浏览器中刷新 https://localhost:8081/checkins/latest 时,显示的值应该与您在 Postman 中提供的值匹配。

外部资源#

在我们的示例应用程序中,我们使用 ioredis 默认返回的数组表示形式在 Node.js 中处理流条目。在此视频中,我介绍了如何使用 ioredis 的高级功能使其返回 JavaScript 对象而不是数组。

在此视频中,Justin 使用了一个与我们在本课程中构建的应用程序非常相似的示例应用程序介绍了 Redis 流