用户与我们的系统最常见的交互是在某个地点签到。系统的这一部分需要快速捕获签到信息,并独立于其他组件进行扩展。
我们决定构建一个独立的 Express 应用,专门用于接收用户的签到 POST 请求。这使得我们可以将其独立于处理 GET 请求的其他 API 端点进行扩展。为了让 Checkin Receiver 尽可能快速,我们决定在独立的 service 中进行实际的签到处理工作。Checkin Processor service 从 stream 中读取签到信息,并更新 Redis 中的用户和地点 Hashes。
签到信息在我们的系统中是瞬时数据——只要我们处理完所有签到,就不需要永久保留它们。按照它们进入系统的顺序存储也很有意义。
使用 Redis Stream 存储签到数据非常适合这个用例。Stream 在生产者和消费者组件之间充当缓冲区。使用 Redis Streams,stream 中的每个条目都被赋予一个时间戳 ID,并且 Stream 按照这些 ID 进行排序。
在我们的应用中,Checkin Receiver Service 是 **生产者** ,Checkin Processor 是 **消费者** 。我们可以用图示表示如下:
使用 Stream 使得这些组件可以在不同的速度下运行,并且相互之间无需了解。Checkin Receiver 简单地为从用户那里接收到的每个签到向 Stream 添加一个新条目,而 Checkin Processor 以自己的速度读取 Stream 并更新用户和地点 Hashes。
还可以读取 Redis Stream 来查找在指定开始和结束 ID 之间时间段内添加的条目。由于我们的 ID 是时间戳,这意味着我们可以请求在给定时间范围内添加的数据。我们在 API Server 组件中使用了此功能,并且在本模块的编码练习中,您将扩展此功能。
花点时间运行 Checkin Processor 组件,该组件从 stream 读取签到信息并更新用户和地点 Hashes。
示例数据包含 5000 个未处理的签到,Checkin Processor 将消费这些签到。Checkin Processor 通过在 Redis 中存储最后处理的签到 ID 来跟踪其在 stream 中的进度。这样,当它停止并重新启动时,可以从上次中断的地方继续。
在终端窗口中,cd
到您克隆 GitHub 仓库的 node-js-crash-course
文件夹,然后启动 Checkin Processor
$ npm run checkinprocessor delay
添加 delay
会为每个签到引入一个人工的随机处理时间。这会减慢 Checkin Processor 的速度,以便您可以更容易地查看其输出。您应该会看到它启动并开始从 stream 的开头(ID 0,这是可能的最低 stream 条目 ID)处理签到:
$ npm run checkinprocessor delay
> [email protected] checkinprocessor
> node ./src/checkinprocessor.js -- "delay"
info: Reading stream from last ID 0.
debug: Updating user ncc:users:789 and location ncc:locations:171.
info: Processed checkin 1609602085397-0.
debug: Updating user ncc:users:455 and location ncc:locations:181.
info: Processed checkin 1609603711960-0.
debug: Updating user ncc:users:752 and location ncc:locations:100.
info: Processed checkin 1609604227545-0.
debug: Updating user ncc:users:102 and location ncc:locations:144.
info: Processed checkin 1609604778689-0.
debug: Updating user ncc:users:180 and location ncc:locations:13.
info: Processed checkin 1609605397408-0.
...
处理完几个签到后,按 Ctrl-C 停止 Checkin Processor。记下最后处理的签到 ID(在上例中是 1609605397408-
0 )。还要记下最后处理的签到的用户 ID 和地点 ID(在上例中是用户 180,地点 13)。
验证 Checkin Processor 已将此 ID 存储在 Redis 中,以便在重启时知道从何处开始。使用 redis-cli 或 RedisInsight,查看键 ncc:checkinprocessor:lastid
的内容
127.0.0.1:6379> get ncc:checkinprocessor:lastid
"1609605397408-0"
该值应与最后处理的签到 ID 匹配。
最后,让我们验证 Checkin Processor 是否使用该签到中的详细信息更新了用户的 Hash。使用 RedisInsight 或 redis-cli 中的 HGETALL
命令查看键为 ncc:users:<user-id>
的 hash,将 <user-id>
替换为您之前记下的用户 ID。
所以对于我的示例,我们来看用户 180
127.0.0.1:6379> hgetall ncc:users:180
1) "id"
2) "180"
3) "firstName"
4) "Sophia"
5) "lastName"
6) "Marshall"
7) "email"
8) "[email protected]"
9) "password"
10) "$2b$05$DPSHjaW44H4fn9sudfz/5.f1WcuZMrA0OZIp0CALQf0MH8zH1SSda"
11) "numCheckins"
12) "2332"
13) "lastCheckin"
14) "1609605397408"
15) "lastSeenAt"
16) "13"
验证 lastCheckin
的值是否是最后处理的签到 ID 的时间戳(在我的示例中是 1609605397408
),并且 lastSeenAt
的值是否是最后处理的签到的地点 ID(在我的示例中是 13
)。
在此练习中,您将在 API Server 组件中实现一个新的路由。此路由将仅返回签到 stream 中最新的签到。您将为此使用 XREVRANGE 命令 。
首先,确保 API Server 正在运行
$ npm run dev
(请记住,这将使用 nodemon 启动服务器,因此当您修改代码并保存更改时,它会自动重启并运行新代码)。
使用您的 IDE 打开 node-js-crash-course
文件夹,然后打开文件 src/routes/checkin_routes.js
。找到处理 /checkins/latest
路由的函数。
使用 XREVRANGE documentation 作为指南,修改以下行以调用 XREVRANGE
,使其仅返回最新的签到
const latestCheckin = await redisClient.xrevrange(checkinStreamKey, 'TODO');
请记住:使用 ioredis 时,Redis 命令的每个参数需要作为单独的值传递。
访问 http://localhost:8081/checkins/latest
来测试您的代码——您应该会看到签到的 JSON 表示。
为确保您的代码返回最新的签到,您需要使用 Postman POST 一个签到。在新的终端窗口中启动 Checkin Receiver 组件
$ npm run checkinreceiver
然后使用 Postman POST 一个签到。在 Postman 中,打开一个新请求,按图示配置,然后按发送。
现在,当您在浏览器中刷新 http://localhost:8081/checkins/latest
时,显示的值应与您在 Postman 中提供的值匹配。
在我们的示例应用中,我们使用 ioredis 默认返回的数组表示在 Node.js 中处理 stream 条目。在此视频中,我将介绍如何使用 ioredis 的高级功能使其返回 JavaScript 对象。
在此视频中,Justin 介绍了 Redis Streams,并提供了一个示例应用,该应用与我们在本课程中构建的应用非常相似。