dot Redis 8 来了——而且是开源的

了解更多

使用 Redis 和 Apache Kafka 处理时序数据

RedisTimeSeries 是一个 Redis 模块,它将原生时序数据结构引入 Redis。早期构建在排序集合(或 Redis Streams)之上的时序解决方案可以从 RedisTimeSeries 的特性中受益,例如高容量插入、低延迟读取、灵活的查询语言、降采样等等!

一般来说,时序数据是(相对)简单的。话虽如此,我们还需要考虑其他特性

  • 数据速度:例如,考虑每秒来自数千个设备的数百个指标
  • 容量(大数据):考虑数月(甚至数年)的数据累积

因此,像 RedisTimeSeries 这样的数据库只是整个解决方案的一部分。您还需要考虑如何**收集**(摄取)、**处理**和**发送**所有数据到 RedisTimeSeries。您真正需要的是一个可扩展的数据管道,它可以作为生产者和消费者之间的缓冲。

这就是 Apache Kafka 的用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括 Kafka Connect(它是本文中提出的解决方案架构的一部分)、多种语言的客户端库、Kafka Streams、Mirror Maker 等。

kafka map

这篇博文提供了一个实际的示例,说明如何将 RedisTimeSeries 与 Apache Kafka 结合使用来分析时序数据。

代码可以在这个 GitHub 仓库中找到:https://github.com/abhirockzz/redis-timeseries-kafka

让我们首先探讨一下用例。请注意,为了博文的目的,它被简化了,然后在后续章节中进一步解释。

场景:设备监控

想象一下,有很多位置,每个位置都有多个设备,你的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在 RedisTimeSeries 中(当然!),并使用以下键命名约定——<指标名称>:<位置>:<设备>。例如,位置 5 中设备 1 的温度将表示为 temp:5:1。每个时序数据点也将具有以下 标签(键值对)——metric, location, device。这是为了允许灵活的查询,你将在接下来的章节中看到。

这里有一些例子,让你了解如何使用 TS.ADD 命令添加数据点

# 位置 3 中设备 2 的温度,带有标签

TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2

# 位置 3 中设备 2 的压力

TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2

解决方案架构

以下是解决方案的总体概览

Solution architecture

让我们来分解一下

源(本地)组件

  • MQTT 代理 (mosquitto): MQTT 是物联网用例的事实协议。我们将使用的场景是物联网和时序的组合——稍后会详细介绍。
  • Kafka Connect:MQTT 源连接器 用于将数据从 MQTT 代理传输到 Kafka 集群。

Azure 服务

  • Azure Cache for Redis 企业版层:企业版层基于 Redis Enterprise,这是 Redis 的商业版本。除了 RedisTimeSeries 之外,企业版层还支持 RediSearchRedisBloom。客户无需担心企业版层的许可证获取。Azure Cache for Redis 将促进此过程,客户可以通过 Azure Marketplace 产品获取和支付此软件的许可证。
  • Azure 上的 Confluent Cloud:一个完全托管的产品,提供 Apache Kafka 作为服务,这要归功于从 Azure 到 Confluent Cloud 的集成预配层。它减轻了跨平台管理的负担,并为在 Azure 基础设施上使用 Confluent Cloud 提供了整合的体验,从而允许您轻松地将 Confluent Cloud 与您的 Azure 应用程序集成。
  • Azure Spring Cloud:得益于 Azure Spring Cloud,将 Spring Boot 微服务部署到 Azure 变得更加容易。Azure Spring Cloud 减轻了基础设施的担忧,提供配置管理、服务发现、CI/CD 集成、蓝绿部署等。该服务完成了所有的繁重工作,因此开发人员可以专注于他们的代码。

请注意,某些服务仅在本地托管,以使事情简单。在生产级部署中,您也希望在 Azure 中运行它们。例如,您可以在 Azure Kubernetes Service 中运行 Kafka Connect 集群以及 MQTT 连接器。

总而言之,这是端到端的流程

  • 一个脚本生成模拟设备数据,该数据被发送到本地 MQTT 代理。
  • 此数据由 MQTT Kafka Connect 源连接器拾取并发送到在 Azure 中运行的 Confluent Cloud Kafka 集群中的主题。
  • 它由托管在 Azure Spring Cloud 中的 Spring Boot 应用程序进一步处理,然后将其持久化到 Azure Cache for Redis 实例。

现在是开始实践的时候了!在此之前,请确保您拥有以下内容。

先决条件

设置基础设施组件

按照文档 配置 Azure Cache for Redis(企业版层),该版本带有 RedisTimeSeries 模块。

infrastructure components

配置 Azure Marketplace 上的 Confluent Cloud 集群。 还要 创建一个 Kafka 主题(使用名称 mqtt.device-stats)和 创建凭据(API 密钥和密钥),您稍后将使用它们安全地连接到您的集群。

kafka cluster

您可以使用 Azure 门户使用 Azure CLI 配置 Azure Spring Cloud 的实例

az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
Southeast Asia

在继续之前,请确保克隆 GitHub 仓库

git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka

设置本地服务

这些组件包括

MQTT 代理

我在 Mac 上本地安装并启动了 mosquitto 代理。

brew install mosquitto
brew services start mosquitto

您可以 按照与您的操作系统对应的步骤 或随意使用此 Docker 镜像

Grafana

我在 Mac 上本地安装并启动了 Grafana。

brew install grafana
brew services start grafana

您也可以对您的操作系统执行相同的操作,或者随意使用此Docker 镜像

docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana

Kafka Connect

您应该能够在您刚刚克隆的 repo 中找到 connect-distributed.properties 文件。 替换诸如 bootstrap.servers、sasl.jaas.config 等属性的值。

首先,在本地下载并解压 Apache Kafka

启动本地 Kafka Connect 集群

export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0>

$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties

手动安装 MQTT 源连接器

  • 从该链接下载连接器/插件 ZIP 文件,并且
  • 将其提取到 Connect worker 的 plugin.path 配置属性上列出的目录之一中

如果您在本地使用 Confluent Platform,只需使用 Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest

创建 MQTT 源连接器实例

请务必检查 mqtt-source-config.json 文件。 确保为 kafka.topic 输入正确的主题名称,并保持 mqtt.topics 不变。

curl -X POST -H 'Content-Type: application/json'
http://localhost:8083/connectors -d @mqtt-source-config.json

# wait for a minute before checking the connector status
curl http://localhost:8083/connectors/mqtt-source/status

部署设备数据处理器应用程序

在您刚刚克隆的 GitHub repo 中,查找 consumer/src/resources 文件夹中的 application.yaml 文件,并替换以下值:

  • 用于 Redis 的 Azure 缓存的主机、端口和主访问密钥
  • Azure 上 Confluent Cloud 的 API 密钥和密码

构建应用程序 JAR 文件

cd consumer

export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>

mvn clean package

创建 Azure Spring Cloud 应用程序并将 JAR 文件部署到其中

az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11

az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar

启动模拟设备数据生成器

您可以使用您刚刚克隆的 GitHub repo 中的脚本

./gen-timeseries-data.sh

注意 - 它所做的只是使用 mosquitto_pub CLI 命令来发送数据。

数据将发送到 device-stats MQTT 主题(这*不是* Kafka 主题)。 您可以通过使用 CLI 订阅者进行双重检查

mosquitto_sub -h localhost -t device-stats

在 Confluent Cloud 门户中检查 Kafka 主题。您还应该检查日志,以了解 Azure Spring Cloud 中设备数据处理器应用程序的日志

az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>

尽情享用 Grafana 仪表板!

localhost:3000 浏览到 Grafana UI。

Grafana dash

用于 Grafana 的 Redis 数据源插件适用于任何 Redis 数据库,包括 Azure Cache for Redis。 按照此博客文章中的说明配置数据源。

导入您已克隆的 GitHub repo 中 grafana_dashboards 文件夹中的仪表板(如果您需要有关如何导入仪表板的帮助,请参阅 Grafana 文档)。

例如,这是一个仪表板,显示 location 1device 5 的平均压力(超过 30 秒)(使用 TS.MRANGE)。

avg pressure

这是另一个仪表板,显示 location 3 中多个设备的最大温度(超过 15 秒)(同样,感谢 TS.MRANGE)。

max temp

所以,您想运行一些 RedisTimeSeries 命令?

启动 redis-cli 并连接到 Azure Cache for Redis 实例

redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls

从简单查询开始

# pressure in device 5 for location 1
TS.GET pressure:1:5

# temperature in device 5 for location 4
TS.GET temp:4:5

按位置过滤并获取所有设备的温度和压力

TS.MGET WITHLABELS FILTER location=3

提取特定时间范围内一个或多个位置中所有设备的温度和压力

TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)

– + 指的是从开始到最新时间戳的所有内容,但您可以更具体。

MRANGE是我们所需要的!我们还可以按位置中的特定设备进行过滤,并进一步向下钻取温度或压力:

TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp

所有这些都可以与聚合结合使用。

# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp

也可以创建一个规则来执行此聚合并将其存储在不同的时间序列中。

完成后,请不要忘记删除资源以避免不必要的费用。

删除资源

在您的本地计算机上

  • 停止 Kafka Connect 集群
  • 停止 mosquito broker (例如 brew services stop mosquito)
  • 停止 Grafana 服务 (例如 brew services stop grafana)

我们探索了一条使用 Redis 和 Kafka 提取、处理和查询时间序列数据的数据管道。 当您考虑后续步骤并转向生产级解决方案时,您应该考虑更多事项。

其他注意事项

downsampling

优化 RedisTimeSeries

  • 保留策略:考虑一下这一点,因为您的时间序列数据点默认情况下不会被修剪或删除。
  • 降采样和聚合规则:您不想永远存储数据,对吧? 确保配置适当的规则来处理此问题(例如,TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000)。
  • 重复数据策略:您想如何处理重复样本? 确保默认策略 (BLOCK) 确实是您所需要的。 如果不是,请考虑其他选项

这不是一份详尽的清单。 对于其他配置选项,请参阅RedisTimeSeries 文档

长期数据保留怎么样?

数据是宝贵的,包括时间序列! 您可能希望进一步处理它(例如,运行机器学习来提取见解、预测性维护等)。 为了使这成为可能,您将需要将此数据保留更长的时间,为了使其具有成本效益和效率,您将需要使用可扩展的对象存储服务,例如Azure Data Lake Storage Gen2 (ADLS Gen2)。

data retention

有一个连接器可以做到这一点! 您可以使用完全托管的 Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud 来处理现有数据管道,以处理并将数据存储在 ADLS 中,然后使用 Azure Synapse AnalyticsAzure Databricks 运行机器学习。

可扩展性

您的时间序列数据量只能朝一个方向移动——向上! 对于您的解决方案而言,可扩展性至关重要

  • 核心基础设施:托管服务使团队能够专注于解决方案,而不是设置和维护基础设施,尤其是在涉及复杂的分布式系统(例如数据库和流媒体平台,例如 Redis 和 Kafka)时。
  • Kafka Connect:就数据管道而言,Kafka Connect 平台本质上是无状态的并且可以水平扩展,因此您的情况很好。 在您想要如何构建和调整 Kafka Connect worker 集群的大小时,您有很多选择。
  • 自定义应用程序:正如本解决方案中的情况一样,我们构建了一个自定义应用程序来处理 Kafka 主题中的数据。 幸运的是,相同的可扩展性特征也适用于它们。 就水平扩展而言,它仅受您拥有的 Kafka 主题分区数量的限制。

集成:不仅仅是 Grafana! RedisTimeSeries 还与 Prometheus 和 Telegraf 集成。 但是,在本博客文章撰写时,没有 Kafka 连接器——这将是一个很棒的附加组件!

结论

当然,您可以将 Redis 用于(几乎)所有内容,包括时间序列工作负载! 确保考虑从时间序列数据源到 Redis 及其他位置的数据管道和集成的端到端架构。