视频

了解更多
RedisTimeSeries 是一个 Redis 模块,它将原生时序数据结构引入 Redis。早期构建在排序集合(或 Redis Streams)之上的时序解决方案可以从 RedisTimeSeries 的特性中受益,例如高容量插入、低延迟读取、灵活的查询语言、降采样等等!
一般来说,时序数据是(相对)简单的。话虽如此,我们还需要考虑其他特性
因此,像 RedisTimeSeries 这样的数据库只是整个解决方案的一部分。您还需要考虑如何**收集**(摄取)、**处理**和**发送**所有数据到 RedisTimeSeries。您真正需要的是一个可扩展的数据管道,它可以作为生产者和消费者之间的缓冲。
这就是 Apache Kafka 的用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括 Kafka Connect(它是本文中提出的解决方案架构的一部分)、多种语言的客户端库、Kafka Streams、Mirror Maker 等。
这篇博文提供了一个实际的示例,说明如何将 RedisTimeSeries 与 Apache Kafka 结合使用来分析时序数据。
代码可以在这个 GitHub 仓库中找到:https://github.com/abhirockzz/redis-timeseries-kafka
让我们首先探讨一下用例。请注意,为了博文的目的,它被简化了,然后在后续章节中进一步解释。
想象一下,有很多位置,每个位置都有多个设备,你的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在 RedisTimeSeries 中(当然!),并使用以下键命名约定——<指标名称>:<位置>:<设备>。例如,位置 5 中设备 1 的温度将表示为 temp:5:1。每个时序数据点也将具有以下 标签(键值对)——metric, location, device。这是为了允许灵活的查询,你将在接下来的章节中看到。
这里有一些例子,让你了解如何使用 TS.ADD 命令添加数据点
# 位置 3 中设备 2 的温度,带有标签
TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2
# 位置 3 中设备 2 的压力
TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2
以下是解决方案的总体概览
让我们来分解一下
源(本地)组件
Azure 服务
请注意,某些服务仅在本地托管,以使事情简单。在生产级部署中,您也希望在 Azure 中运行它们。例如,您可以在 Azure Kubernetes Service 中运行 Kafka Connect 集群以及 MQTT 连接器。
总而言之,这是端到端的流程
现在是开始实践的时候了!在此之前,请确保您拥有以下内容。
按照文档 配置 Azure Cache for Redis(企业版层),该版本带有 RedisTimeSeries 模块。
配置 Azure Marketplace 上的 Confluent Cloud 集群。 还要 创建一个 Kafka 主题(使用名称 mqtt.device-stats)和 创建凭据(API 密钥和密钥),您稍后将使用它们安全地连接到您的集群。
您可以使用 Azure 门户 或 使用 Azure CLI 配置 Azure Spring Cloud 的实例
az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
在继续之前,请确保克隆 GitHub 仓库
git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka
这些组件包括
我在 Mac 上本地安装并启动了 mosquitto 代理。
brew install mosquitto
brew services start mosquitto
您可以 按照与您的操作系统对应的步骤 或随意使用此 Docker 镜像。
我在 Mac 上本地安装并启动了 Grafana。
brew install grafana
brew services start grafana
您也可以对您的操作系统执行相同的操作,或者随意使用此Docker 镜像。
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
您应该能够在您刚刚克隆的 repo 中找到 connect-distributed.properties 文件。 替换诸如 bootstrap.servers、sasl.jaas.config 等属性的值。
首先,在本地下载并解压 Apache Kafka。
启动本地 Kafka Connect 集群
export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0>
$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
如果您在本地使用 Confluent Platform,只需使用 Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest
创建 MQTT 源连接器实例
请务必检查 mqtt-source-config.json 文件。 确保为 kafka.topic 输入正确的主题名称,并保持 mqtt.topics 不变。
curl -X POST -H 'Content-Type: application/json'
http://localhost:8083/connectors -d @mqtt-source-config.json
# wait for a minute before checking the connector status
curl http://localhost:8083/connectors/mqtt-source/status
在您刚刚克隆的 GitHub repo 中,查找 consumer/src/resources 文件夹中的
application.yaml 文件,并替换以下值:
构建应用程序 JAR 文件
cd consumer
export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>
mvn clean package
创建 Azure Spring Cloud 应用程序并将 JAR 文件部署到其中
az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11
az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
您可以使用您刚刚克隆的 GitHub repo 中的脚本
./gen-timeseries-data.sh
注意 - 它所做的只是使用 mosquitto_pub CLI 命令来发送数据。
数据将发送到 device-stats MQTT 主题(这*不是* Kafka 主题)。 您可以通过使用 CLI 订阅者进行双重检查
mosquitto_sub -h localhost -t device-stats
在 Confluent Cloud 门户中检查 Kafka 主题。您还应该检查日志,以了解 Azure Spring Cloud 中设备数据处理器应用程序的日志
az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>
在 localhost:3000 浏览到 Grafana UI。
用于 Grafana 的 Redis 数据源插件适用于任何 Redis 数据库,包括 Azure Cache for Redis。 按照此博客文章中的说明配置数据源。
导入您已克隆的 GitHub repo 中 grafana_dashboards 文件夹中的仪表板(如果您需要有关如何导入仪表板的帮助,请参阅 Grafana 文档)。
例如,这是一个仪表板,显示 location 1 中 device 5 的平均压力(超过 30 秒)(使用 TS.MRANGE)。
这是另一个仪表板,显示 location 3 中多个设备的最大温度(超过 15 秒)(同样,感谢 TS.MRANGE)。
启动 redis-cli 并连接到 Azure Cache for Redis 实例
redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls
从简单查询开始
# pressure in device 5 for location 1
TS.GET pressure:1:5
# temperature in device 5 for location 4
TS.GET temp:4:5
按位置过滤并获取所有设备的温度和压力
TS.MGET WITHLABELS FILTER location=3
提取特定时间范围内一个或多个位置中所有设备的温度和压力
TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)
– + 指的是从开始到最新时间戳的所有内容,但您可以更具体。
MRANGE是我们所需要的!我们还可以按位置中的特定设备进行过滤,并进一步向下钻取温度或压力:
TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
所有这些都可以与聚合结合使用。
# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp
也可以创建一个规则来执行此聚合并将其存储在不同的时间序列中。
完成后,请不要忘记删除资源以避免不必要的费用。
在您的本地计算机上
我们探索了一条使用 Redis 和 Kafka 提取、处理和查询时间序列数据的数据管道。 当您考虑后续步骤并转向生产级解决方案时,您应该考虑更多事项。
优化 RedisTimeSeries
这不是一份详尽的清单。 对于其他配置选项,请参阅RedisTimeSeries 文档
数据是宝贵的,包括时间序列! 您可能希望进一步处理它(例如,运行机器学习来提取见解、预测性维护等)。 为了使这成为可能,您将需要将此数据保留更长的时间,为了使其具有成本效益和效率,您将需要使用可扩展的对象存储服务,例如Azure Data Lake Storage Gen2 (ADLS Gen2)。
有一个连接器可以做到这一点! 您可以使用完全托管的 Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud 来处理现有数据管道,以处理并将数据存储在 ADLS 中,然后使用 Azure Synapse Analytics 或 Azure Databricks 运行机器学习。
可扩展性
您的时间序列数据量只能朝一个方向移动——向上! 对于您的解决方案而言,可扩展性至关重要
集成:不仅仅是 Grafana! RedisTimeSeries 还与 Prometheus 和 Telegraf 集成。 但是,在本博客文章撰写时,没有 Kafka 连接器——这将是一个很棒的附加组件!
当然,您可以将 Redis 用于(几乎)所有内容,包括时间序列工作负载! 确保考虑从时间序列数据源到 Redis 及其他位置的数据管道和集成的端到端架构。