快速入门
开始创建 Write-behind 管道
本指南将引导您创建 Write-behind 管道。
概念
Write-behind 是一种处理管道,用于将 Redis 数据库中的数据与下游数据存储同步。您可以将其视为一个管道,它从 Redis 数据库的变更数据捕获 (CDC) 事件开始,然后过滤、转换并将数据映射到目标数据存储的数据结构。
Write-behind 管道连接并写入数据的目标数据存储。
Write-behind 管道由一个或多个作业组成。每个作业负责捕获 Redis 中一个键模式的变更,并将其映射到下游数据存储中的一个或多个表。每个作业都在 YAML 文件中定义。

支持的数据存储
Write-behind 目前支持以下目标数据存储
数据存储 |
---|
Cassandra |
MariaDB |
MySQL |
Oracle |
PostgreSQL |
Redis Enterprise |
SQL Server |
先决条件
运行 Write-behind 的唯一先决条件是在 Redis Enterprise Cluster 上安装并启用 Redis Gears Python >= 1.2.6,用于您希望镜像到下游数据存储的数据库。更多信息请参阅 RedisGears 安装。
准备 Write-behind 管道
-
在能够连接到您的 Redis Enterprise Cluster 的 Linux 主机上安装 Write-behind CLI。
-
如果您之前未使用过此 Redis 数据库的 Write-behind 功能,请运行
configure
命令,在您的 Redis 数据库上安装 Write-behind Engine。 -
使用您想要使用的数据存储类型运行
scaffold
命令,例如redis-di scaffold --strategy write_behind --dir . --db-type mysql
这将在当前目录下创建一个模板
config.yaml
文件和一个名为jobs
的文件夹。您可以使用--dir
指定任何文件夹名称,或者,如果您的 Write-behind CLI 部署在 Kubernetes (K8s) Pod 中,可以使用--preview config.yaml
选项将config.yaml
模板输出到终端。 -
在
config.yaml
文件的connections
部分添加下游目标所需的连接信息,例如connections: my-postgres: type: postgresql host: 172.17.0.3 port: 5432 database: postgres user: postgres password: postgres #query_args: # sslmode: verify-ca # sslrootcert: /opt/work/ssl/ca.crt # sslkey: /opt/work/ssl/client.key # sslcert: /opt/work/ssl/client.crt my-mysql: type: mysql host: 172.17.0.4 port: 3306 database: test user: test password: test #connect_args: # ssl_ca: /opt/ssl/ca.crt # ssl_cert: /opt/ssl/client.crt # ssl_key: /opt/ssl/client.key
这是
config.yaml
文件的第一部分,通常也是唯一需要编辑的部分。connections
部分旨在包含多个目标连接。在前面的示例中,有两个下游连接,分别命名为my-postgres
和my-mysql
。为了使用 TLS 获得安全连接,您可以根据具体目标数据库的术语,在连接定义中添加更多
connect_args
或query_args
。名称可以是任意名称,只要满足以下条件即可
- 对于此 Write-behind 引擎是唯一的
- 在相应的 YAML 文件中被作业正确引用
为了准备管道,请填写目标数据存储的正确信息。可以通过引用秘密 (见下文) 或指定路径来提供秘密信息。
applier
部分包含有关写入目标数据的批处理大小和频率的信息。
applier
的一些属性,例如 target_data_type
、wait_enabled
和 retry_on_replica_failure
,是 Write-behind 摄取管道特有的,可以忽略。
Write-behind 作业
Write-behind 作业是 Write-behind 管道配置的强制部分。在 jobs
目录(与 config.yaml
同级)下,您应该为每个您想要写入下游数据库表的键模式提供一个 YAML 文件形式的作业定义。
YAML 文件可以使用目标表名称或其他命名约定来命名,但必须具有唯一的名称。
作业定义具有以下结构
source:
redis:
key_pattern: emp:*
trigger: write-behind
exclude_commands: ["json.del"]
transform:
- uses: rename_field
with:
from_field: after.country
to_field: after.my_country
output:
- uses: relational.write
with:
connection: my-connection
schema: my-schema
table: my-table
keys:
- first_name
- last_name
mapping:
- first_name
- last_name
- address
- gender
源部分
source
部分描述了管道中的数据源。
redis
部分对于由 Redis 中的事件(例如应用数据变更)启动的每个管道都是通用的。对于 Write-behind,它包含激活处理数据变更的管道所需的信息。它包括以下属性
-
key_pattern
属性指定要监听的 Redis 键的模式。该模式必须对应于 Hash 或 JSON 类型的键。 -
exclude_commands
属性指定要忽略哪些命令。例如,如果您监听 Hash 值的键模式,您可以排除HDEL
命令,这样数据删除就不会传播到下游数据库。如果您不指定此属性,Write-behind 将作用于所有相关命令。 -
trigger
属性是强制性的,必须设置为write-behind
。 -
row_format
属性可以与值full
一起使用,以接收负载的before
和after
部分。请注意,对于 Write-behind 事件,永远不会提供键的before
值。
注意:Write-behind 不支持
expired
事件。因此,在 Redis 中过期的键不会自动从目标数据库中删除。注意:redis
属性是一个重大更改,取代了keyspace
属性。key_pattern
属性取代了pattern
属性。exclude_commands
属性取代了exclude-commands
属性。如果您升级到 0.105 或更高版本,必须编辑并重新部署现有作业。
输出部分
output
部分至关重要。它指定对 config.yaml
文件的 connections
部分中的连接的引用
-
uses
属性指定 Write-behind 将用来准备并将数据写入目标的写入器类型。在此示例中,它是relational.write
,这是一个将数据转换为具有下游关系数据库特定方言的 SQL 语句的写入器。有关支持的写入器完整列表,请参阅 数据转换块类型。 -
schema
属性指定要使用的模式/数据库(不同的数据库在对象层次结构中对模式使用不同的名称)。 -
table
属性指定要使用的下游表。 -
keys
部分指定表中用作唯一约束的字段。 -
mapping
部分用于将数据库列映射到具有不同名称的 Redis 字段或表达式。映射可以是所有 Redis 数据字段,也可以是其中的一部分。
注意:在
keys
中使用的列将自动包含,因此无需在mapping
部分重复它们。
将过滤器和转换应用于 Write-behind
Write-behind 作业可以在数据写入目标之前对其应用过滤器和转换。在 transform
部分下指定过滤器和转换。
过滤器
使用过滤器跳过部分数据,不将其应用于目标。过滤器可以应用简单或复杂的表达式,这些表达式接受 Redis 条目键、字段,甚至变更操作码(创建、删除、更新等)作为参数。更多信息请参阅 过滤器。
转换
转换通过以下方式之一操作数据
- 重命名字段
- 添加字段
- 移除字段
- 映射源字段以用于输出
要了解有关转换的更多信息,请参阅 数据转换管道。
提供目标的秘密信息
目标的秘密信息(例如 TLS 证书)可以从 Redis 节点的 文件系统 路径中读取。这允许使用从秘密存储注入的秘密信息。
部署 Write-behind 管道
要启动管道,运行 deploy
命令
redis-di deploy
您可以使用 status
命令检查管道是否正在运行、接收和写入数据
redis-di status
监控 Write-behind 管道
Write-behind 管道收集以下指标
指标描述 | Prometheus 中的指标 |
---|---|
按流统计的总传入事件 | 计算为 Prometheus DB 查询: sum(pending, rejected, filtered, inserted, updated, deleted) |
按流统计的创建的传入事件 | rdi_metrics_incoming_entries{data_source:"…",operation="inserted"} |
按流统计的更新的传入事件 | rdi_metrics_incoming_entries{data_source:"…",operation="updated"} |
按流统计的删除的传入事件 | rdi_metrics_incoming_entries{data_source:"…",operation="deleted"} |
按流统计的过滤的传入事件 | rdi_metrics_incoming_entries{data_source:"…",operation="filtered"} |
按流统计的格式错误的传入事件 | rdi_metrics_incoming_entries{data_source:"…",operation="rejected"} |
每流总事件数(快照) | rdi_metrics_stream_size{data_source:""} |
在流中的时间(快照) | rdi_metrics_stream_last_latency_ms{data_source:"…"} |
您可以使用以下方式使用指标
-
运行
status
命令redis-di status
-
使用 Write-behind 的 Prometheus 导出器抓取指标
升级
如果您需要升级 Write-behind,应使用提供零停机升级的 upgrade
命令
redis-di upgrade ...