视频

了解更多信息
RedisTimeSeries 是一个为 Redis 提供原生时序数据结构的 Redis 模块。以前基于有序集合(或 Redis 流)构建的时序解决方案现在可以使用 RedisTimeSeries 功能的更多优势,比如大容量插入、低延迟读取、灵活的查询语言、下采样等!
一般来说,时序数据是(相对)简单的。话虽如此,我们还需要考虑其他特征
因此,诸如 RedisTimeSeries 等数据库仅仅是整体解决方案的一部分。还需要考虑如何将所有数据收集(摄取)、处理和发送到 RedisTimeSeries。真正需要的是一个可伸缩的数据管道,它可以作为缓冲区以解耦生产者和使用者。
这就是Apache Kafka 的用武之地!除了核心代理之外,它还拥有一个丰富的组件生态系统,包括 Kafka Connect(它是本博客文章提出的解决方案架构的一部分)、多种编程语言的客户端库、Kafka Streams、Mirror Maker 等组件。
这篇博客文章提供了一个实用示例,介绍如何将 RedisTimeSeries 与Apache Kafka结合起来分析时序数据。
代码可在该 GitHub 仓库中获取 https://github.com/abhirockzz/redis-timeseries-kafka
让我们首先探索用例。请注意,出于博客文章的目的,已对此用例进行了简化,并在后面的部分中作了进一步解释。
想象一下有很多个位置,每个位置都有多个设备,并且你要负责监控设备指标,目前我们考虑温度和压力。这些指标将存储在 RedisTimeSeries(当然!)中,并使用以下命名约定作为键值 —<metric name>:<location>:<device>。例如,位置 5 中设备 1 的温度表示为 temp:5:1。每个时序数据点还将具有以下标签(键值对)—metric, location, device,以便于灵活查询,正如你将在后面的章节中看到的那样。
这里有几个示例,可让你了解如何使用TS.ADD命令添加数据点
# 位置 3 中设备 2 的温度以及标签
TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2
# 位置 3 中设备 2 的压力
TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2
以下是该解决方案在高层面的样子
我们来详细分析
源(本地)组件
Azure 服务
请注意,一些服务仅出于简洁性的考虑在本地托管。在生产级部署中,你需要在 Azure 中运行它们。例如,可以在 Azure Kubernetes Service 中与 MQTT 连接器一起运行 Kafka Connect 集群。
总结一下,以下是端到端流程
是时候开始动手实践了!在那之前,请确保你拥有以下内容。
按照文档 预配 Azure 缓存适用于 Redis(企业层) 随附 RedisTimeSeries 模块。
在 Azure 市场中预配 Confluent Cloud 集群。还需要 创建一个 Kafka 主题 (使用名称 mqtt.device-stats) 与 创建凭据 (API 密钥和密钥) 以便以后安全连接到集群。
您可以使用 Azure 门户或 使用 Azure CLI预配 Azure Spring Cloud 的实例。
az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
继续之前,请确保克隆 GitHub 存储库
git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka
组件包括
我在 Mac 上本地安装并启动了 mosquitto 代理。
brew install mosquitto
brew services start mosquitto
您可按照与您的操作系统对应的步骤来操作,或随意使用该 Docker 映像。
我在 Mac 上本地安装并启动了 Grafana。
brew install grafana
brew services start grafana
您可对自己的操作系统执行同样的操作,或随意使用该 Docker 映像。
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
您应该可以在您刚才克隆的存储库中找到 connect-distributed.properties 文件。替换 bootstrap.servers、sasl.jaas.config 等属性的值。
首先,下载并解压 Apache Kafka 到本地。
启动本地 Kafka Connect 集群
export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0>
$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
如果你在本地使用 Confluent Platform,只需使用 Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest
创建 MQTT 源连接器实例
务必检查mqtt-source-config.json文件。确保为kafka.topic输入正确的主题名称,并保留mqtt.topics不变。
curl -X POST -H 'Content-Type: application/json'
https://127.0.0.1:8083/connectors -d @mqtt-source-config.json
# wait for a minute before checking the connector status
curl https://127.0.0.1:8083/connectors/mqtt-source/status
在刚刚克隆的 GitHub 存储库中,查找consumer/src/resources文件夹中的application.yaml文件,并替换以下值:
构建应用程序 JAR 文件
cd consumer
export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>
mvn clean package
创建一个 Azure Spring Cloud 应用程序,并向其中部署 JAR 文件
az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11
az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
可以使用刚刚克隆的 GitHub 存储库中的脚本
./gen-timeseries-data.sh
注意 - 它所做的只是使用mosquitto_pubCLI 命令来发送数据。
数据将发送到 device-stats MQTT 主题(这不是 Kafka 主题)。你可以使用 CLI 订阅者进行仔细检查
mosquitto_sub -h localhost -t device-stats
在 Confluent Cloud 门户中检查 Kafka 主题。你还应该检查日志以获得 Azure Spring Cloud 中的设备数据处理程序应用
az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>
浏览localhost:3000上的 Grafana UI。
用于 Grafana 的 Redis 数据源插件可与任何 Redis 数据库配合使用,包括 Azure Cache for Redis。按照此博客文章中的说明来配置数据源。
导入 GitHub 代码库中 grafana_dashboards 文件夹中的仪表盘,您已克隆该文件(如果您需要有关如何导入仪表盘的帮助,请参阅 Grafana 文档)。
例如,这是一个显示 位置 1 中 设备 5 的平均压力(超过 30 秒)的仪表盘(使用 TS.MRANGE)。
这是另一个显示 位置 3 中多个设备的最大温度(超过 15 秒)的仪表盘(再次感谢 TS.MRANGE)。
启动 redis-cli 并连接到 Azure Cache for Redis 实例
redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls
从简单查询开始
# pressure in device 5 for location 1
TS.GET pressure:1:5
# temperature in device 5 for location 4
TS.GET temp:4:5
按位置筛选,并获取所有设备的温度和压力
TS.MGET WITHLABELS FILTER location=3
在特定时间范围内提取一个或多个位置中的所有设备的温度和压力
TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)
– + 指从开始到最新时间戳的所有内容,但您还可以更具体。
MRANGE 就是我们需要的!我们还可以按位置中的特定设备筛选,并按温度或压力进一步细分:
TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
所有这些都可以与聚合结合使用。
# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp
也可以创建规则来执行此聚合,并将其存储在不同的时间序列中。
完成后,不要忘记删除资源以避免不必要的成本。
在本地计算机上
我们探索了一个数据管道来使用 Redis 和 Kafka 摄取、处理和查询时间序列数据。当您考虑下一步并转向生产级解决方案时,您应该考虑更多一些内容。
优化 RedisTimeSeries
这不是一个详尽的列表。对于其他配置选项,请参考 RedisTimeSeries 文档
数据很宝贵,包括时间序列!您可能希望进一步处理它(例如运行机器学习来提取见解、预测性维护等)。为了实现这一点,您需要将这些数据保留更长时间,并且为了做到既经济高效,您需要使用可扩展的对象存储服务,例如 Azure Data Lake Storage Gen2 (ADLS Gen2)。
有一个连接器可以做到!您可以通过使用 Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud(一种完全托管的服务)来增强现有的数据管道,该服务用于处理数据并将其存储在 ADLS 中,然后使用 Azure Synapse Analytics 或 Azure Databricks 运行机器学习。
可扩展性
您的时间序列数据量只能朝着一个方向发展:上升!让您的解决方案可扩展至关重要
集成:不仅限于 Grafana!RedisTimeSeries 也与 Prometheus 和 Telegraf 集成。不过,在撰写此博客文章之时,尚未提供 Kafka 连接器,这将是一项极好的附加功能!
当然,你可以将 Redis 用于(几乎)所有内容,包括时间序列负载!务必仔细考虑数据管道和集成的时间序列数据源到 Redis 及其后续步骤的端到端架构。