学习

使用 Redis Streams 处理签到

Simon Prickett
作者
Simon Prickett, Redis 首席开发者推广工程师

用户与我们的系统最常见的交互是在某个地点签到。系统的这一部分需要快速捕获签到信息,并独立于其他组件进行扩展。

我们决定构建一个独立的 Express 应用,专门用于接收用户的签到 POST 请求。这使得我们可以将其独立于处理 GET 请求的其他 API 端点进行扩展。为了让 Checkin Receiver 尽可能快速,我们决定在独立的 service 中进行实际的签到处理工作。Checkin Processor service 从 stream 中读取签到信息,并更新 Redis 中的用户和地点 Hashes。

签到信息在我们的系统中是瞬时数据——只要我们处理完所有签到,就不需要永久保留它们。按照它们进入系统的顺序存储也很有意义。

使用 Redis Stream 存储签到数据非常适合这个用例。Stream 在生产者和消费者组件之间充当缓冲区。使用 Redis Streams,stream 中的每个条目都被赋予一个时间戳 ID,并且 Stream 按照这些 ID 进行排序。

在我们的应用中,Checkin Receiver Service 是 **生产者**  ,Checkin Processor 是 **消费者**  。我们可以用图示表示如下:

使用 Stream 使得这些组件可以在不同的速度下运行,并且相互之间无需了解。Checkin Receiver 简单地为从用户那里接收到的每个签到向 Stream 添加一个新条目,而 Checkin Processor 以自己的速度读取 Stream 并更新用户和地点 Hashes。

还可以读取 Redis Stream 来查找在指定开始和结束 ID 之间时间段内添加的条目。由于我们的 ID 是时间戳,这意味着我们可以请求在给定时间范围内添加的数据。我们在 API Server 组件中使用了此功能,并且在本模块的编码练习中,您将扩展此功能。

实操练习#

花点时间运行 Checkin Processor 组件,该组件从 stream 读取签到信息并更新用户和地点 Hashes。

示例数据包含 5000 个未处理的签到,Checkin Processor 将消费这些签到。Checkin Processor 通过在 Redis 中存储最后处理的签到 ID 来跟踪其在 stream 中的进度。这样,当它停止并重新启动时,可以从上次中断的地方继续。

在终端窗口中,cd  到您克隆 GitHub 仓库的 node-js-crash-course  文件夹,然后启动 Checkin Processor

$ npm run checkinprocessor delay

添加 delay 会为每个签到引入一个人工的随机处理时间。这会减慢 Checkin Processor 的速度,以便您可以更容易地查看其输出。您应该会看到它启动并开始从 stream 的开头(ID 0,这是可能的最低 stream 条目 ID)处理签到:

$ npm run checkinprocessor delay

> [email protected] checkinprocessor
> node ./src/checkinprocessor.js -- "delay"

info: Reading stream from last ID 0.
debug: Updating user ncc:users:789 and location ncc:locations:171.
info: Processed checkin 1609602085397-0.
debug: Updating user ncc:users:455 and location ncc:locations:181.
info: Processed checkin 1609603711960-0.
debug: Updating user ncc:users:752 and location ncc:locations:100.
info: Processed checkin 1609604227545-0.
debug: Updating user ncc:users:102 and location ncc:locations:144.
info: Processed checkin 1609604778689-0.
debug: Updating user ncc:users:180 and location ncc:locations:13.
info: Processed checkin 1609605397408-0.
...

处理完几个签到后,按 Ctrl-C 停止 Checkin Processor。记下最后处理的签到 ID(在上例中是 1609605397408-0 )。还要记下最后处理的签到的用户 ID 和地点 ID(在上例中是用户 180,地点 13)。

验证 Checkin Processor 已将此 ID 存储在 Redis 中,以便在重启时知道从何处开始。使用 redis-cli 或 RedisInsight,查看键 ncc:checkinprocessor:lastid  的内容

127.0.0.1:6379> get ncc:checkinprocessor:lastid
"1609605397408-0"

该值应与最后处理的签到 ID 匹配。

最后,让我们验证 Checkin Processor 是否使用该签到中的详细信息更新了用户的 Hash。使用 RedisInsight 或 redis-cli 中的 HGETALL  命令查看键为 ncc:users:<user-id>  的 hash,将 <user-id>  替换为您之前记下的用户 ID。

所以对于我的示例,我们来看用户 180

127.0.0.1:6379> hgetall ncc:users:180
 1) "id"
 2) "180"
 3) "firstName"
 4) "Sophia"
 5) "lastName"
 6) "Marshall"
 7) "email"
 8) "[email protected]"
 9) "password"
10) "$2b$05$DPSHjaW44H4fn9sudfz/5.f1WcuZMrA0OZIp0CALQf0MH8zH1SSda"
11) "numCheckins"
12) "2332"
13) "lastCheckin"
14) "1609605397408"
15) "lastSeenAt"
16) "13"

验证 lastCheckin  的值是否是最后处理的签到 ID 的时间戳(在我的示例中是 1609605397408  ),并且 lastSeenAt  的值是否是最后处理的签到的地点 ID(在我的示例中是 13  )。

编码练习#

在此练习中,您将在 API Server 组件中实现一个新的路由。此路由将仅返回签到 stream 中最新的签到。您将为此使用 XREVRANGE 命令 。

首先,确保 API Server 正在运行

$ npm run dev

(请记住,这将使用 nodemon 启动服务器,因此当您修改代码并保存更改时,它会自动重启并运行新代码)。

使用您的 IDE 打开 node-js-crash-course  文件夹,然后打开文件 src/routes/checkin_routes.js  。找到处理 /checkins/latest  路由的函数。

使用 XREVRANGE documentation  作为指南,修改以下行以调用 XREVRANGE  ,使其仅返回最新的签到

const latestCheckin = await redisClient.xrevrange(checkinStreamKey, 'TODO');

请记住:使用 ioredis 时,Redis 命令的每个参数需要作为单独的值传递。

访问 http://localhost:8081/checkins/latest  来测试您的代码——您应该会看到签到的 JSON 表示。

为确保您的代码返回最新的签到,您需要使用 Postman POST 一个签到。在新的终端窗口中启动 Checkin Receiver 组件

$ npm run checkinreceiver

然后使用 Postman POST 一个签到。在 Postman 中,打开一个新请求,按图示配置,然后按发送。

现在,当您在浏览器中刷新 http://localhost:8081/checkins/latest  时,显示的值应与您在 Postman 中提供的值匹配。

外部资源#

在我们的示例应用中,我们使用 ioredis 默认返回的数组表示在 Node.js 中处理 stream 条目。在此视频中,我将介绍如何使用 ioredis 的高级功能使其返回 JavaScript 对象。

在此视频中,Justin 介绍了 Redis Streams,并提供了一个示例应用,该应用与我们在本课程中构建的应用非常相似。