正如您所料,实时车辆跟踪并非易事。从头到尾,您必须在实时车辆跟踪的益处显现之前考虑一系列变量。可以说,最重要的因素是速度。任何通信延迟都会导致数据过时,数据越过时,其价值就越低。
这是一个复杂的操作,但这个 Redis Launchpad 应用使其工作,将 Redis 和 Golang 相结合,以其超高效率和准确性提取数据。这确保了用户从车辆跟踪中获得最大价值。
让我们看看它是如何完成的。(我们还想指出我们Redis Launchpad中各种令人兴奋的应用程序,无论您喜欢哪种框架和语言,都可以帮助您入门。)
让我们看看如何使用 Redis 创建一个可靠且功能强大的实时车辆跟踪系统。我们将揭示如何使用 Redis 在赫尔辛基创建实时跟踪系统,以便在 Web UI 上发布公交车的实时位置。
鉴于 HSL 每天发布约 5000 万个更新,由于RedisTimeSeries 模块的鲁棒性,Redis 是首选工具。它能够快速聚合数万个数据点。
使用此应用程序,用户能够收集有关每辆公交车的实时信息,包括其位置、历史位置和当前速度。
让我们分析每个组件,解包其功能以及其实现步骤。以下是您将需要的系统组件概述:
为了将所有内容整合在一起并让您清楚地了解每个功能的实现方式,以下是所有内容按时间顺序的简要总结。
# Topic - Delivered as Msg Part 1
/hfp/v2/journey/ongoing/vp/bus/0018/00423/2159/2/Matinkylä (M)/09:32/2442201/3/60;24/16/58/67
# Body - Delivered as Msg Part 2
{
"VP": {
"desi": "159",
"dir": "2",
"oper": 6,
"veh": 423,
"tst": "2021-05-15T06:40:28.629Z",
"tsi": 1621060828,
"spd": 21.71,
"hdg": 67,
"lat": 60.156949,
"long": 24.687111,
"acc": 0,
"dl": -21,
"odo": null,
"drst": null,
"oday": "2021-05-15",
"jrn": 202,
"line": 1062,
"start": "09:32",
"loc": "GPS",
"stop": null,
"route": "2159",
"occu": 0
}
}
在我们深入研究架构的具体细节之前,重要的是要强调所有组件都托管在单个 AWS t3.medium 上,并具有 GP3 EBS 卷。虽然 t3.medium 由于其可突发 CPU 而具有吸引力,但更小的实例可以处理其当前状态下的应用程序。
GoLang 代理用于将来自赫尔辛基的传入消息处理到 Redis 中。一旦由 GoLang 处理,消息就会被发送到多个不同的位置
发送到 Redis 流的事件数据随后使用 RedisGears 函数进行处理并写入持久存储(PostgreSQL)。
事件数据由 GoLang 发布并发送到 Redis PubSub,然后通过 Web 套接字发送到每个连接的客户端。这提供了浏览器上实时位置层中位置的实时更新。
每辆公交车的当前速度和位置通过 RedisTimeSeries 记录。时间序列数据被分为不同的序列,用于每个预定行程的位置(GeoHash)和速度。每隔 15 秒,这些记录会使用压缩规则进行标准化,以避免为任何给定行程存储不同间隔的数据。
先决条件
克隆存储库
git clone https://github.com/redis-developer/expert-garbanzo
构建应用程序
可以使用 docker-compose 在本地启动系统的功能版本。
docker-compose up --build
上面的命令将启动(几乎)所有在各自的隔离环境中运行本地演示所需的运行服务。
打开浏览器并使用http://localhost:8080/访问应用程序。
如果您有兴趣接收交通速度/邻域层的定期更新,则可以运行以下命令。这不是绝对必要的,因为可能需要几个小时才能收集到足够的数据来获得合理数量的数据(并且您仍然需要等待 tilegen 作业完成以重新填充图层)。
docker exec <name of redis container> \
bash -c "gears-cli run /redis/stream_writebehind.py --requirements /redis/requirements.txt"
Redis PubSub 通道用于实时位置。它很简单:代理向 Redis PubSub 通道发布一个事件,然后实时位置 API 订阅相同的消息。当客户端连接时,它们会收到通过 Web 套接字传递的相同数据。MQTT 代理使用 Golang 作为 Redis 客户端,还使用以下代码:
// In Golang...
pipe := client.TxPipeline()
ctx := client.Context()
// Stylizing the Actual Message Body for Readme
msg := &hsl.EventHolder{
"acc": 0.1, "speed": 10.6, "route": "foo"
}
pipe.Publish(
ctx, "currentLocationsPS", msg
)
# Using a standard Redis client...
127.0.0.1:6379> PUBLISH currentLocationsPS '{"acc": 0.1, "speed": 10.6, "route": "foo"}'
传入的事件被推送到 Redis 流。然后,它被清除并由通过 RedisGears 运行的代码处理。这是使用以下 Redis Go 客户端编写的
// In Golang...
pipe.XAdd(
ctx, &redis.XAddArgs{
Stream: "events",
Values: []interface{}{
"jid", journeyID,
"lat", e.VP.Lat,
"lng", e.VP.Lng,
"time", e.VP.Timestamp,
"spd", e.VP.Spd,
"acc", e.VP.Acc,
"dl", e.VP.DeltaToSchedule,
},
},
)
# Using a standard Redis client...
127.0.0.1:6379> XADD events * jid journeyhashID lat 60 lng 25 time 1620533624765 speed 10 acc 0.1 dl "00:00"
每次公交车行程的当前速度和位置都在 RedisTimeSeries 中记录。为每个“行程”(JourneyHash)创建了一个唯一的标识符,对事件中的某些属性进行哈希处理。澄清一下,代理会为每个 JourneyHash 创建速度和位置的时间序列。
需要注意的是,位置和速度序列的保留时间很短,并且会压缩到一个辅助时间序列。这个压缩后的序列具有更长的保留时间(约 2 小时),并由 API 用于向用户显示行程历史记录层。通过能够有效地聚合单个事件,这种模式使我们能够减轻内存使用量。
命令
命令使用 Golang 执行。首先,检查 JourneyHash 是否尚未被看到。您可以通过检查其在集合(journeyID)中的包含来做到这一点。如果以下结果为 1,则继续创建序列和规则,否则只进行 TS.ADD 数据。
SADD journeyID <JOURNEYHASH>
第一个序列使用以下命令创建。为了简化,我将这些称为时间序列 A。
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:speed
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:gh
聚合序列由“主”时间序列提供,并使用以下命令创建。同样,为了简化,我将这些称为时间序列 B。
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:speed:agg RETENTION 7200000 LABELS speed 1 journey <JOURNEYHASH>
127.0.0.1:6379> TS.CREATE positions:<JOURNEYHASH>:gh:agg RETENTION 7200000 LABELS gh 1 journey <JOURNEYHASH>
对于管理时间序列 A -> 时间序列 B 的规则,您可以使用以下命令
127.0.0.1:6379> TS.CREATERULE positions:<JOURNEYHASH>:speed positions:<JOURNEYHASH>:speed:agg AGGREGATION LAST 150000
127.0.0.1:6379> TS.CREATERULE positions:<JOURNEYHASH>:gh positions:<JOURNEYHASH>:gh:agg AGGREGATION LAST 150000
如果要将数据添加到时间序列 A,请使用以下方法:
127.0.0.1:6379> TS.ADD positions:<JOURNEYHASH>:speed * 10 RETENTION 60000 CHUNK_SIZE 16 ON_DUPLICATE LAST
127.0.0.1:6379> TS.ADD positions:<JOURNEYHASH>:gh * 123456123456163 RETENTION 60000 ON_DUPLICATE LAST
redislabs/redismod 是一个 Docker 镜像,其中包含所有必要的 Redis 模块。它被用作此项目的基镜像。从流中,此函数每 5 秒/10,000 个事件将数据写入 PostgreSQL/PostGIS。尽管 Gears 从主线程运行,但此组件旨在执行最少的数据处理。
它的主要目的是将 MQTT 数据转储到 PostGIS,并允许 PostGIS 和 TileGen 进程将这些事件转换为 MBtiles。
您还应该知道 RedisGears 函数是用 Python 编写的,不调用任何 Redis 命令。
PostGIS 和 TileGen 容器对于提供 GTFS 和当前流量层至关重要。为了强调,PostGIS 是一个 PostgreSQL 扩展,它允许进行地理空间操作。
TileGen 是一个 Alpine 容器,其中包含两个地理空间处理中常用的实用程序,GDAL 和 tippecanoe(以及 psql,PostgreSQL 客户端)。该容器对于
TilesAPI 是一个简单的 Golang API,用于从磁盘获取这些瓦片并将其发送到前端。
Locations API 中有两个端点:/locations/ 和 /histlocations/。
命令
/locations/ 端点订阅/读取 MQTT 代理部分中定义的 PUB/SUB 通道的數據。虽然是用 Go 编写的,但此处的 redis-cli 命令将是
127.0.0.1:6379> SUBSCRIBE currentLocationsPS
/histlocations/ 端点需要从多个时间序列收集数据以创建组合的客户端响应。这意味着进行 TS.MRANGE 调用。由于每个 时间序列 B 都用其 JourneyHash 标记,因此 TS.MRANGE 使用单个调用收集位置和速度统计信息,并根据 JourneyHash 过滤。
127.0.0.1:6379> TS.MRANGE - + FILTER journey=<JOURNEYHASH>
前端(OpenLayers)
前端使用名为 OpenLayers 的 JS 库。它用于创建地图并显示由之前描述的服务创建的图层。在生产环境中,这是使用 Nginx 提供服务的,而不是 Parcel 的开发模式。
前端还向 公开可用的 API 发出请求,以获取底图影像。
技术附录
数据吞吐量
重要的是要强调,此系统并非专门为处理大量数据而创建。尽管如此,考虑到这个(相对较小的规模)任务,它表现出色。
根据轶事,当订阅与所有公交车位置更新相对应的 MQTT 主题时,系统每天处理约 15 GB 的消息。下面,我们有一些图表来说明周日上午、下午和晚上发生的事件吞吐量增长。
如果您仔细观察,您会注意到,在一天中的中间,事件/秒达到 500/秒的最高水平。这是在从清晨的 <10 个事件逐渐增长之后。
但是,在工作日的早上 8 点,我们可以看到系统轻松地处理约 1600 多个事件/秒。以下是一些来自 2021 年 5 月 14 日上午五分钟窗口的统计数据。
select
now(), -- UTC
count(1)/300 as eps -- averaged over prev 300s
from statistics.events
where approx_event_time > now() - interval'5 minute';
now | eps
-------------------------------+------
2021-05-14 05:06:28.974982+00 | 1646
内存、CPU 和磁盘使用情况
在本地测试中,最初怀疑 CPU 将是系统中最紧张的部分。事实证明,实际上是磁盘。以下是 2021 年 5 月 14 日上午 8 点的 docker 统计数据的捕获。
CONTAINER ID NAME CPU % MEM USAGE / LIMIT MEM % NET I/O
6d0a1d7fab0d redis_hackathon_mqtt_1 24.02% 10.71MiB / 3.786GiB 0.28% 32GB / 60.4GB
833aab4d39a8 redis_hackathon_redis_1 7.02% 862.7MiB / 3.786GiB 22.26% 58.8GB / 38.9GB
通过从 AWS 标准 gp2 EBS 升级到 gp3,我们能够免费获得 3000 IOP 和 125 MB/秒的吞吐量,这使得在容器中托管 PostgreSQL 实例成为可能。尽管没有健壮的磁盘,但该网站仍然可以正常运行,但这付出了代价——瓦片生成非常缓慢,可能会滞后 10 多分钟。
由于团队希望扩展此组件以允许 30 分钟、1 小时、2 小时和 6 小时的交通图层,因此能够有效地从磁盘获取历史位置至关重要。
在升级之前,由于齿轮的写入后处理(写入磁盘)和瓦片生成(从磁盘),系统负载非常高。即使在高峰时段和瓦片生成期间,%iowait 也保持较低,系统负载也保持 <1。
以下是在瓦片生成事件期间从 sar 获得的结果
CPU %user %system %iowait %idle
20:00:07 all 9.98 9.88 47.08 32.56
# PostgreSQL Aggregations - Disk Heavy --
20:00:22 all 10.22 12.04 41.70 35.22
20:00:37 all 10.46 10.66 61.73 16.95
20:00:52 all 34.89 11.97 34.48 18.56
20:01:07 all 8.00 8.51 55.59 26.97
# Tilegeneration - User Heavy --
20:01:22 all 32.93 8.13 26.42 32.42
20:01:37 all 48.94 10.90 21.29 18.87
# Back to High Idle % --
20:01:47 all 7.19 4.39 5.89 81.24
应用程序的核心是需要一个能够快速精确地传输数据的数据库。每个 Redis 组件都是一个重要的器官,它们相互协同工作,形成了一个高效的实时跟踪系统。
但是通过将 Redis 与 Golang 相结合,您可以以超高的效率和准确性提取数据。您可以在 Redis Launchpad 上查看完整的应用程序,并务必查看我们提供的 其他很棒的应用程序。
Dustin Wilson
Dustin 是一位后端工程师,目前在 Numina 工作。要了解更多关于他的工作和他在 GitHub 上的活动的信息,您可以在这里查看他的个人资料。