圆点 快捷的未来正向你的城市靠近。

与我们相约 Redis Released

使用 Redis 和 Apache Kafka 处理时序数据

RedisTimeSeries 是一个为 Redis 提供原生时序数据结构的 Redis 模块。以前基于有序集合(或 Redis 流)构建的时序解决方案现在可以使用 RedisTimeSeries 功能的更多优势,比如大容量插入、低延迟读取、灵活的查询语言、下采样等!

一般来说,时序数据是(相对)简单的。话虽如此,我们还需要考虑其他特征

  • 数据速率:例如,考虑每秒来自数千个设备的数百个指标
  • 容量(大数据):考虑几个月(甚至几年)的数据累积

因此,诸如 RedisTimeSeries 等数据库仅仅是整体解决方案的一部分。还需要考虑如何将所有数据收集(摄取)、处理发送到 RedisTimeSeries。真正需要的是一个可伸缩的数据管道,它可以作为缓冲区以解耦生产者和使用者。

这就是Apache Kafka 的用武之地!除了核心代理之外,它还拥有一个丰富的组件生态系统,包括 Kafka Connect(它是本博客文章提出的解决方案架构的一部分)、多种编程语言的客户端库、Kafka Streams、Mirror Maker 等组件。

kafka map

这篇博客文章提供了一个实用示例,介绍如何将 RedisTimeSeries 与Apache Kafka结合起来分析时序数据。

代码可在该 GitHub 仓库中获取 https://github.com/abhirockzz/redis-timeseries-kafka

让我们首先探索用例。请注意,出于博客文章的目的,已对此用例进行了简化,并在后面的部分中作了进一步解释。

场景:设备监控

想象一下有很多个位置,每个位置都有多个设备,并且你要负责监控设备指标,目前我们考虑温度和压力。这些指标将存储在 RedisTimeSeries(当然!)中,并使用以下命名约定作为键值 —<metric name>:<location>:<device>。例如,位置 5 中设备 1 的温度表示为 temp:5:1。每个时序数据点还将具有以下标签(键值对)—metric, location, device,以便于灵活查询,正如你将在后面的章节中看到的那样。

这里有几个示例,可让你了解如何使用TS.ADD命令添加数据点

# 位置 3 中设备 2 的温度以及标签

TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2

# 位置 3 中设备 2 的压力

TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2

解决方案架构

以下是该解决方案在高层面的样子

Solution architecture

我们来详细分析

源(本地)组件

  • MQTT 代理(Mosquitto): MQTT 是一个物联网用例的事实上的协议。我们要使用的场景是物联网和时间序列的组合,稍后将详细介绍。
  • Kafka Connect: MQTT 源连接器 用于将数据从 MQTT 代理传输到 Kafka 集群。

Azure 服务

  • 适用于 Redis Enterprise 版的 Azure 缓存:Enterprise 版基于 Redis Enterprise,而 Redis Enterprise 是 Redis 中的商业变体。除了 RedisTimeSeries,Enterprise 版还 支持 RediSearch 和 RedisBloom。客户无需担心 Enterprise 版本的许可证获取问题。适用于 Redis 的 Azure 缓存将简化此进程,客户可以通过 Azure 市集产品获取许可证并付费以使用该软件。
  • 适用于 Azure 上的 Confluent Cloud:一项完全托管产品,提供 Apache Kafka 服务,这要归功于 Azure 到 Confluent Cloud 的集成预配层。该产品消减了跨平台管理负担,并提供了在 Azure 基础设施上使用 Confluent Cloud 的整合体验,从而允许你轻松地将 Confluent Cloud 与 Azure 应用程序集成。
  • Azure Spring Cloud:借助 Azure Spring Cloud,在 Azure 中部署 Spring Boot 微服务变得更容易。Azure Spring Cloud 减轻基础设施的顾虑,提供配置管理、服务发现、CI/CD 整合、蓝绿部署等等。该服务做了所有繁重的工作,因此,开发人员可以专注于其代码。

请注意,一些服务仅出于简洁性的考虑在本地托管。在生产级部署中,你需要在 Azure 中运行它们。例如,可以在 Azure Kubernetes Service 中与 MQTT 连接器一起运行 Kafka Connect 集群。

总结一下,以下是端到端流程

  • 脚本生成模拟设备数据,这些数据被发送到本地 MQTT 代理。
  • 此数据被 MQTT Kafka Connect 源连接器获取并发送到在 Azure 中运行的 Confluent Cloud Kafka 集群中的一个主题。
  • 该数据随后由在 Azure Spring Cloud 中托管的 Spring Boot 应用程序进行进一步处理,然后由该程序持久保存到用于 Redis 的 Azure 缓存实例。

是时候开始动手实践了!在那之前,请确保你拥有以下内容。

前提条件

设置基础设施组件

按照文档 预配 Azure 缓存适用于 Redis(企业层) 随附 RedisTimeSeries 模块。

infrastructure components

在 Azure 市场中预配 Confluent Cloud 集群。还需要 创建一个 Kafka 主题 (使用名称 mqtt.device-stats) 与 创建凭据 (API 密钥和密钥) 以便以后安全连接到集群。

kafka cluster

您可以使用 Azure 门户或 使用 Azure CLI预配 Azure Spring Cloud 的实例。

az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>
Southeast Asia

继续之前,请确保克隆 GitHub 存储库

git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka

设置本地服务

组件包括

MQTT 代理

我在 Mac 上本地安装并启动了 mosquitto 代理。

brew install mosquitto
brew services start mosquitto

您可按照与您的操作系统对应的步骤来操作,或随意使用该 Docker 映像

Grafana

我在 Mac 上本地安装并启动了 Grafana。

brew install grafana
brew services start grafana

您可对自己的操作系统执行同样的操作,或随意使用该 Docker 映像

docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana

Kafka Connect

您应该可以在您刚才克隆的存储库中找到 connect-distributed.properties 文件。替换 bootstrap.servers、sasl.jaas.config 等属性的值。

首先,下载并解压 Apache Kafka 到本地。

启动本地 Kafka Connect 集群

export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0>

$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties

若要手动安装 MQTT 源连接器

  • 此链接下载连接器/插件 ZIP 文件,然后
  • 将其解压到 Connect worker 的 plugin.path 配置属性中列出的一个目录中

如果你在本地使用 Confluent Platform,只需使用 Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest

创建 MQTT 源连接器实例

务必检查mqtt-source-config.json文件。确保为kafka.topic输入正确的主题名称,并保留mqtt.topics不变。

curl -X POST -H 'Content-Type: application/json'
https://127.0.0.1:8083/connectors -d @mqtt-source-config.json

# wait for a minute before checking the connector status
curl https://127.0.0.1:8083/connectors/mqtt-source/status

部署设备数据处理程序应用程序

在刚刚克隆的 GitHub 存储库中,查找consumer/src/resources文件夹中的application.yaml文件,并替换以下值:

  • Azure Cache for Redis 主机、端口和主访问密钥
  • Confluent Cloud on Azure API 密钥和密钥

构建应用程序 JAR 文件

cd consumer

export JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>

mvn clean package

创建一个 Azure Spring Cloud 应用程序,并向其中部署 JAR 文件

az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11

az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar

启动模拟的设备数据生成器

可以使用刚刚克隆的 GitHub 存储库中的脚本

./gen-timeseries-data.sh

注意 - 它所做的只是使用mosquitto_pubCLI 命令来发送数据。

数据将发送到 device-stats MQTT 主题(这不是 Kafka 主题)。你可以使用 CLI 订阅者进行仔细检查

mosquitto_sub -h localhost -t device-stats

在 Confluent Cloud 门户中检查 Kafka 主题。你还应该检查日志以获得 Azure Spring Cloud 中的设备数据处理程序应用

az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>

享受 Grafana 仪表板!

浏览localhost:3000上的 Grafana UI。

Grafana dash

用于 Grafana 的 Redis 数据源插件可与任何 Redis 数据库配合使用,包括 Azure Cache for Redis。按照此博客文章中的说明来配置数据源。

导入 GitHub 代码库中 grafana_dashboards 文件夹中的仪表盘,您已克隆该文件(如果您需要有关如何导入仪表盘的帮助,请参阅 Grafana 文档)。

例如,这是一个显示 位置 1设备 5 的平均压力(超过 30 秒)的仪表盘(使用 TS.MRANGE)。

avg pressure

这是另一个显示 位置 3 中多个设备的最大温度(超过 15 秒)的仪表盘(再次感谢 TS.MRANGE)。

max temp

因此,您想运行一些 RedisTimeSeries 命令?

启动 redis-cli 并连接到 Azure Cache for Redis 实例

redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls

从简单查询开始

# pressure in device 5 for location 1
TS.GET pressure:1:5

# temperature in device 5 for location 4
TS.GET temp:4:5

按位置筛选,并获取所有设备的温度和压力

TS.MGET WITHLABELS FILTER location=3

在特定时间范围内提取一个或多个位置中的所有设备的温度和压力

TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)

– + 指从开始到最新时间戳的所有内容,但您还可以更具体。

MRANGE 就是我们需要的!我们还可以按位置中的特定设备筛选,并按温度或压力进一步细分:

TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp

所有这些都可以与聚合结合使用。

# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp

也可以创建规则来执行此聚合,并将其存储在不同的时间序列中。

完成后,不要忘记删除资源以避免不必要的成本。

删除资源

在本地计算机上

  • 停止 Kafka Connect 集群
  • 停止 mosquito 代理(例如,brew services stop mosquito)
  • 停止 Grafana 服务(例如,brew services stop grafana)

我们探索了一个数据管道来使用 Redis 和 Kafka 摄取、处理和查询时间序列数据。当您考虑下一步并转向生产级解决方案时,您应该考虑更多一些内容。

其他注意事项

downsampling

优化 RedisTimeSeries

  • 保留策略:考虑这一点,因为您的时间序列数据点默认情况下不会修剪或删除。
  • 降采样和聚合 规则:您不想永远存储数据,对不对?请务必配置适当的规则来处理此问题(例如 TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000)。
  • 重复数据策略:您希望如何处理重复的样本?请确保默认策略(BLOCK)确实是您需要的。如果不是,请考虑 其他选项

这不是一个详尽的列表。对于其他配置选项,请参考 RedisTimeSeries 文档

长期数据保留呢?

数据很宝贵,包括时间序列!您可能希望进一步处理它(例如运行机器学习来提取见解、预测性维护等)。为了实现这一点,您需要将这些数据保留更长时间,并且为了做到既经济高效,您需要使用可扩展的对象存储服务,例如 Azure Data Lake Storage Gen2 (ADLS Gen2)。

data retention

有一个连接器可以做到!您可以通过使用 Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud(一种完全托管的服务)来增强现有的数据管道,该服务用于处理数据并将其存储在 ADLS 中,然后使用 Azure Synapse AnalyticsAzure Databricks 运行机器学习。

可扩展性

您的时间序列数据量只能朝着一个方向发展:上升!让您的解决方案可扩展至关重要

  • 核心基础设施:托管服务允许团队专注于解决方案,而不是设置和维护基础设施,尤其是在数据库和流媒体平台(如 Redis 和 Kafka)等复杂的分布式系统方面。
  • Kafka Connect:就数据管道而言,您可以放心,因为 Kafka Connect 平台本质上是无状态的,具有水平可扩展性。在如何设计和调整 Kafka Connect 辅助进程群集方面,您有很多选择。
  • 自定义应用程序:与该解决方案类似,我们构建了一个自定义应用程序来处理 Kafka 主题中的数据。幸运的是,同样的可扩展性特性也适用于它们。就水平扩展而言,它仅受您拥有的 Kafka 主题分区的数量限制。

集成:不仅限于 Grafana!RedisTimeSeries 也与 Prometheus 和 Telegraf 集成。不过,在撰写此博客文章之时,尚未提供 Kafka 连接器,这将是一项极好的附加功能!

结论

当然,你可以将 Redis 用于(几乎)所有内容,包括时间序列负载!务必仔细考虑数据管道和集成的时间序列数据源到 Redis 及其后续步骤的端到端架构。