正如您所期望的,实时车辆跟踪绝非易事。从开始到结束,在实时车辆跟踪的优势得以实现之前,您必须考虑各种各样的变量。可以说,最重要的因素是速度。任何通信延迟都会产生过时的数据,而且数据越陈旧,其价值就越低。
这是一个复杂的操作,但 这个 Redis Launchpad 应用程序 使其可行,它结合了 Redis 和 Golang 的能力,以高效和准确地提取数据。这确保了用户从车辆跟踪中获得最大价值。
让我们看看它是如何完成的。(我们还想指出,我们的 Redis Launchpad 中有各种各样令人兴奋的应用程序供您入门,无论您喜欢哪种框架和语言。)
让我们看看如何使用 Redis 创建一个可靠而强大的实时车辆跟踪系统。我们将了解如何使用 Redis 在赫尔辛基创建一个实时跟踪系统,以在 Web UI 上发布公交车的实时位置。
鉴于 HSL 每天发布大约 5000 万次更新,Redis 是首选工具,因为它具有强大的 RedisTimeSeries 模块。它能够快速聚合数万个数据点。
使用此应用程序,用户能够收集有关每辆公交车的实时信息,包括其位置、历史位置和当前速度。
让我们分析每个组件,解开它的功能以及实现它所需的步骤。以下是您需要的系统组件的概述:
为了将所有内容联系在一起,并让您清楚地了解每个功能的实现方式,以下是按时间顺序排列的所有流程的快速摘要。
# Topic - Delivered as Msg Part 1
/hfp/v2/journey/ongoing/vp/bus/0018/00423/2159/2/Matinkylä (M)/09:32/2442201/3/60;24/16/58/67
# Body - Delivered as Msg Part 2
{
"VP": {
"desi": "159",
"dir": "2",
"oper": 6,
"veh": 423,
"tst": "2021-05-15T06:40:28.629Z",
"tsi": 1621060828,
"spd": 21.71,
"hdg": 67,
"lat": 60.156949,
"long": 24.687111,
"acc": 0,
"dl": -21,
"odo": null,
"drst": null,
"oday": "2021-05-15",
"jrn": 202,
"line": 1062,
"start": "09:32",
"loc": "GPS",
"stop": null,
"route": "2159",
"occu": 0
}
}
在我们研究架构的细节之前,重要的是要强调所有组件都托管在具有 GP3 EBS 卷的单个 AWS t3.medium 上。尽管 t3.medium 因其可突发的 CPU 而具有吸引力,但较小的实例可以在其当前状态下处理该应用程序。
GoLang 代理用于将来自赫尔辛基的传入消息处理到 Redis 中。一旦被 GoLang 处理,这些消息就会被发送到许多不同的位置
发送到 Redis Streams 的事件数据随后通过 RedisGears 函数处理并写入持久存储 (PostgreSQL)。
事件数据由 GoLang 发布并发送到 Redis PubSub,然后通过 Web 套接字发送到每个连接的客户端。 这提供了浏览器中实时位置层上位置的实时更新。
每辆公共汽车的当前速度和位置通过 RedisTimeSeries 记录。 时间序列数据被分成不同的系列,用于每个预定行程的位置(GeoHash)和速度。 每隔 15 秒,这些记录使用压缩规则进行标准化,以避免为任何给定的行程存储不同的数据间隔。
先决条件
克隆存储库
git clone https://github.com/redis-developer/expert-garbanzo
构建应用程序
可以使用 docker-compose 在本地启动系统的功能版本。
docker-compose up --build
以上命令将启动(几乎)所有在其自身隔离环境中运行本地演示所需的服务。
打开浏览器并使用 http://localhost:8080/ 访问该应用程序。
如果您有兴趣接收有关交通速度/社区层的定期更新,则可以运行以下命令。 这不是绝对必要的,因为可能需要几个小时才能收集到足够的数据来获得合理数量的数据(并且您仍然需要等待 tilegen 作业来重新填充图层)。
docker exec <name of redis container> \
bash -c "gears-cli run /redis/stream_writebehind.py --requirements /redis/requirements.txt"
Redis PubSub 频道用于实时位置。 它相对简单:代理将事件发布到 Redis PubSub 频道,然后实时位置 API 订阅同一消息。 当客户端连接时,他们会收到通过 Web 套接字传递的相同数据。 MQTT 代理使用 Golang 作为 Redis 客户端,并且还使用以下代码:
// In Golang...
pipe := client.TxPipeline()
ctx := client.Context()
// Stylizing the Actual Message Body for Readme
msg := &hsl.EventHolder{
"acc": 0.1, "speed": 10.6, "route": "foo"
}
pipe.Publish(
ctx, "currentLocationsPS", msg
)
# Using a standard Redis client...
127.0.0.1:6379> PUBLISH currentLocationsPS '{"acc": 0.1, "speed": 10.6, "route": "foo"}'
传入的事件被推送到 Redis Streams。 然后由通过 RedisGears 运行的代码清除和处理。 这是使用以下 Redis Go 客户端编写的
// In Golang...
pipe.XAdd(
ctx, &redis.XAddArgs{
Stream: "events",
Values: []interface{}{
"jid", journeyID,
"lat", e.VP.Lat,
"lng", e.VP.Lng,
"time", e.VP.Timestamp,
"spd", e.VP.Spd,
"acc", e.VP.Acc,
"dl", e.VP.DeltaToSchedule,
},
},
)
# Using a standard Redis client...
127.0.0.1:6379> XADD events * jid journeyhashID lat 60 lng 25 time 1620533624765 speed 10 acc 0.1 dl "00:00"
每次公交车行程的当前速度和位置都记录在 RedisTimeSeries 中。 为每个“行程”(JourneyHash)创建一个唯一标识符,对事件中的某些属性进行哈希处理。 为了清楚起见,代理为每个 JourneyHash 创建速度和位置的时间序列。
重要的是要注意位置和速度系列具有短保留时间,并且被压缩到辅助时间序列。 这个压缩系列具有更长的保留时间(约 2 小时),并被 API 用于向用户显示行程历史记录层。 通过能够有效地聚合单个事件,这种模式使我们能够缓解内存使用。
命令
这些命令是使用 Golang 执行的。 首先,检查是否已看到 JourneyHash。 您可以通过检查它是否包含在集合 (journeyID) 中来执行此操作。 如果以下返回 1,则继续创建序列和规则,否则只需 TS.ADD 数据即可。
SADD journeyID <JOURNEYHASH>
第一个序列是使用以下命令创建的。 为了简化操作,我将其称为时间序列 A。
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:speed
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:gh
聚合序列由“主”时间序列提供,并使用以下命令创建。 同样,为了简化操作,我将其称为时间序列 B。
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:speed:agg RETENTION 7200000 LABELS speed 1 journey <JOURNEYHASH>
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:gh:agg RETENTION 7200000 LABELS gh 1 journey <JOURNEYHASH>
对于控制时间序列 A -> 时间序列 B 的规则,您可以使用以下命令
127.0.0.1:6379> TS.CREATERULE positions:<JOURNEYHASH>:speed positions:<JOURNEYHASH>:speed:agg AGGREGATION LAST 150000
127.0.0.1:6379> TS.CREATERULE positions:<JOURNEYHASH>:gh positions:<JOURNEYHASH>:gh:agg AGGREGATION LAST 150000
如果要将数据添加到时间序列 A,请使用以下命令:
127.0.0.1:6379> TS.ADD positions:<JOURNEYHASH>:speed * 10 RETENTION 60000 CHUNK_SIZE 16 ON_DUPLICATE LAST
127.0.0.1:6379> TS.ADD positions:<JOURNEYHASH>:gh * 123456123456163 RETENTION 60000 ON_DUPLICATE LAST
redislabs/redismod 是一个 Docker 镜像,其中包含所有必需的 Redis 模块。 这被用作此项目的基础镜像。 从流中,此函数每 5 秒/10,000 个事件将数据写入 PostgreSQL/PostGIS。 尽管 Gears 在主线程之外运行,但此组件旨在进行最少的数据处理。
它的主要目的是将 MQTT 数据转储到 PostGIS 中,并允许 PostGIS 和 TileGen 进程将这些事件转换为 MBtiles。
您还应该知道 RedisGears 函数是用 Python 编写的,并且不调用任何 Redis 命令。
PostGIS 和 TileGen 容器对于提供 GTFS 和当前流量层至关重要。 只是为了强调一下,PostGIS 是一个 PostgreSQL 扩展,它支持地理空间操作。
TileGen 是一个 Alpine 容器,其中包含地理空间处理中使用的两个常见实用程序:GDAL 和 tippecanoe(以及 psql,PostgreSQL 客户端)。 此容器是必需的
TilesAPI 是一个简单的 Golang API,用于从磁盘中获取这些切片并将它们发送到前端。
Locations API 中有两个端点:/locations/ 和 /histlocations/。
命令
/locations/ 端点订阅/读取 MQTT 代理部分中定义的 PUB/SUB 通道中的数据。 虽然是用 Go 编写的,但 redis-cli 命令是
127.0.0.1:6379> SUBSCRIBE currentLocationsPS
/histlocations/ 端点需要从多个时间序列收集数据,以便为客户端创建组合响应。 这意味着进行 TS.MRANGE 调用。 因为每个 Timeseries B 都标有其 JourneyHash,所以 TS.MRANGE 通过单个调用收集位置和速度统计信息,并根据 JourneyHash 进行过滤。
127.0.0.1:6379> TS.MRANGE - + FILTER journey=<JOURNEYHASH>
前端 (OpenLayers)
前端使用一个名为 OpenLayers 的 JS 库。 这用于创建地图并显示先前描述的服务创建的图层。 在生产环境中,这是使用 Nginx 而不是 Parcel 的开发模式提供的。
前端还调用 公开可用的 API 来获取底图图像。
技术附录
数据吞吐量
重要的是要强调,该系统并非专门用于处理大量数据。 尽管如此,考虑到这项(相对较小规模的)任务,它表现得还不错。
根据传闻,该系统在订阅对应于所有公交车位置更新的 MQTT 主题时,每天处理约 15GB 的消息。 下面,我们有一些图表来说明周日上午到下午和晚上发生的事件吞吐量增加的情况。
如果仔细观察,您会注意到在一天中的中间时段,事件/秒达到了最高水平,为 500/秒。 这是从清晨的 <10 个事件逐渐增长之后的。
但是,在工作日上午 8 点,我们可以看到系统轻松处理了约 1600 多个事件/秒。 以下是 2021 年 5 月 14 日上午五分钟窗口的一些统计数据。
select
now(), -- UTC
count(1)/300 as eps -- averaged over prev 300s
from statistics.events
where approx_event_time > now() - interval'5 minute';
now | eps
-------------------------------+------
2021-05-14 05:06:28.974982+00 | 1646
内存、CPU 和磁盘使用情况
在本地测试中,最初怀疑 CPU 将是系统中最紧张的部分。 结果证明,实际上是磁盘。 下面是 2021 年 5 月 14 日上午 8 点的 Docker 统计数据捕获。
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O
6d0a1d7fab0d redis_hackathon_mqtt_1 24.02% 10.71MiB / 3.786GiB 0.28% 32GB / 60.4GB
833aab4d39a8 redis_hackathon_redis_1 7.02% 862.7MiB / 3.786GiB 22.26% 58.8GB / 38.9GB
通过从 AWS 标准 gp2 EBS 升级到 gp3,我们能够免费获得 3000 IOPs 和 125MB/s 的吞吐量,这使得在容器中托管 PostgreSQL 实例成为可能。 尽管没有强大的磁盘,但该站点仍然可以正常运行,但这需要付出代价——瓦片生成非常慢,可能会滞后 10 多分钟。
由于团队希望扩展此组件以允许 30 分钟、1 小时、2 小时和 6 小时的流量图层,因此能够有效地从磁盘获取历史位置至关重要。
在升级之前,由于来自 Gears(写入磁盘)和瓦片生成(从磁盘)的写后操作,系统负载非常高。 即使在高峰时段和瓦片重新生成期间,%iowait 也保持较低水平,系统负载保持在 <1。
以下是在瓦片重新生成事件期间来自 sar 的结果
CPU %user %system %iowait %idle
20:00:07 all 9.98 9.88 47.08 32.56
# PostgreSQL Aggregations - Disk Heavy --
20:00:22 all 10.22 12.04 41.70 35.22
20:00:37 all 10.46 10.66 61.73 16.95
20:00:52 all 34.89 11.97 34.48 18.56
20:01:07 all 8.00 8.51 55.59 26.97
# Tilegeneration - User Heavy --
20:01:22 all 32.93 8.13 26.42 32.42
20:01:37 all 48.94 10.90 21.29 18.87
# Back to High Idle % --
20:01:47 all 7.19 4.39 5.89 81.24
该应用程序的核心需求是拥有一个能够以精度和速度传输数据的数据库。 每个 Redis 组件都是一个重要的器官,彼此和谐地工作,以产生高效的实时跟踪系统。
但是,通过将 Redis 与 Golang 合并,您就可以以超高的效率和准确性提取数据。 您可以在 Redis Launchpad 上查看完整的应用程序,并务必查看我们提供的 其他出色应用程序。
Dustin Wilson
Dustin 是一名后端工程师,目前在 Numina 工作。 要了解更多关于他的工作以及他在 GitHub 上的活动,您可以点击此处查看他的个人资料。