数据转换管道
了解如何将数据转换为 Redis 类型
Write-behind 的数据转换功能允许用户将数据转换为超出源类型到 Redis 类型的默认转换范围。此转换不涉及编码。相反,它在每张源表对应的一个人类可读的 YAML 文件集中进行描述。
摄取格式和类型因源而异。目前,唯一支持的源是 Debezium。从 Debezium 类型到包含 Redis 类型的原生 JSON 的首次转换会自动完成,无需用户指示。然后,此 JSON 会传递到用户定义的数据转换管道。
每个作业描述了对单个源中的数据执行的转换逻辑。源通常是一个数据库表或集合,并指定为此表/集合的全名。作业可能包含过滤逻辑,以跳过符合特定条件的数据。作业中的其他逻辑步骤会将数据转换为所需的输出,这些输出将作为哈希或 JSON 存储在 Redis 中。

默认作业
在需要对所有摄取记录执行转换但不对特定表创建特定作业的情况下,使用默认作业。与此作业关联的转换将应用于所有缺乏自身明确定义作业的表。默认作业的表名必须是“*”,并且只允许存在一个此类作业实例。
例如,默认作业可以简化任务,例如为 Redis 键添加前缀或后缀,或为新的哈希和 JSON 添加字段,而无需自定义每个源表。
目前,默认作业仅支持摄取管道。
示例
此示例演示了使用 add_field 块向所有缺乏明确定义作业的表添加一个值为 foo
的 app_code
字段的过程。此外,它还为每个生成的哈希键附加一个 aws
前缀和一个 gcp
后缀。
default.yaml
source:
table: "*"
row_format: full
transform:
- uses: add_field
with:
fields:
- field: after.app_code
expression: "`foo`"
language: jmespath
output:
- uses: redis.write
with:
data_type: hash
key:
expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
language: jmespath
作业
每个作业都在单独的 YAML 文件中定义。所有这些文件将使用 deploy
命令上传到 Write-behind。有关更多信息,请参阅部署配置)。如果您使用 scaffold 命令,请将作业文件放在 jobs
文件夹中。
作业 YAML 结构
字段
-
源
:此部分描述作业操作的表
server_name
:逻辑服务器名(可选)。对应于 Debezium Server 的application.properties
配置文件中指定的debezium.source.topic.prefix
属性db
:数据库名(可选)schema
:数据库 schema(可选)table
:数据库表名row_format
:要转换的数据格式:data_only
(默认)- 仅有效载荷,full - 完整变更记录
注意:默认情况下,对属性
server_name
、db
、schema
和table
的任何引用都将不区分大小写。可以通过将case_insensitive
设置为false
来更改此行为。
仅限 Cassandra:在 Cassandra 中,
keyspace
大致相当于其他数据库中的schema
。Write-behind 使用作业文件中声明的schema
属性来匹配传入变更记录的keyspace
属性。
仅限 MongoDB:在 MongoDB 中,
replica set
是包含数据的分片集群,大致可以视为关系数据库中的schema
。MongoDB 的collection
类似于其他数据库中的table
。Write-behind 使用作业文件中声明的schema
和table
属性分别匹配传入变更记录的replica set
和collection
属性。
-
转换
:此部分包含一系列定义如何转换数据的块。有关更多信息,请参阅支持的块和JMESPath 自定义函数。
-
输出
:此部分定义处理后数据的输出目标
- Cassandra
uses
:cassandra.write
:写入 Cassandra 数据存储with
:connection
:连接名称keyspace
:keyspacetable
:目标表keys
:键列数组mapping
:映射列数组opcode_field
:有效载荷中保存此记录在数据库中操作(c - 创建,d - 删除,u - 更新)的字段名称
- Redis
uses
:redis.write
:写入 Redis 数据结构。同一定义作业中允许多个此类块with
:connection
:在config.yaml
中定义的连接名称(默认使用名为 'target' 的连接)data_type
:写入 Redis 时目标数据结构(支持的值有 hash、json、set 和 stream)key
:允许通过应用自定义逻辑覆盖记录的键expression
:要执行的表达式language
:表达式语言,JMESPath 或 SQL
expire
:一个正整数值,表示键的过期秒数。如果未设置,键将永不过期
- SQL
uses
:relational.write
:写入兼容 SQL 的数据存储with
:connection
:连接名称schema
:schematable
:目标表名keys
:键列数组mapping
:映射列数组opcode_field
:有效载荷中保存此记录在数据库中操作(c - 创建,d - 删除,u - 更新)的字段名称
- Cassandra
注意事项
source
是必需的。- 必须指定
transform
、key
或两者。
在转换中使用键
要访问 Redis 键(例如在 write-behind 作业中),您需要执行以下步骤
- 设置
row_format: full
以允许访问作为完整数据条目一部分的键。 - 使用表达式
key.key
以字符串形式获取 Redis 键。
Before 和 after 值
更新事件通常报告 before
和 after
部分,提供对更新前后的数据状态的访问。要显式访问“before”值,您需要
- 设置
row_format: full
以允许访问作为完整数据条目一部分的键。 - 使用
before.<FIELD_NAME>
模式。
示例
此示例演示了如何使用 rename_field
块将表 emp
中的 fname
字段重命名为 first_name
。它还演示了如何设置此记录的键,而不是依赖默认逻辑。
redislabs.dbo.emp.yaml
source:
server_name: redislabs
schema: dbo
table: emp
transform:
- uses: rename_field
with:
from_field: fname
to_field: first_name
output:
- uses: redis.write
with:
connection: target
key:
expression: concat(['emp:fname:',fname,':lname:',lname])
language: jmespath
部署配置
为了将您的作业部署到远程 Write-behind 数据库,请运行
redis-di deploy
在 Kubernetes 上部署配置
如果 Write-behind CLI 作为 pod 部署在 Kubernetes 集群中,请执行以下步骤部署您的作业
-
从
jobs
文件夹中的 YAML 文件创建一个 ConfigMapkubectl create configmap redis-di-jobs --from-file=jobs/
-
部署您的作业
kubectl exec -it pod/redis-di-cli -- redis-di deploy
注意:创建/修改 ConfigMap 与其在
redis-di-cli
pod 中可用之间存在延迟。请等待约 30 秒后再运行redis-di deploy
命令。
您有两种更新 ConfigMap 的选项
-
对于较小的更改,您可以使用此命令直接编辑 ConfigMap
kubectl edit configmap redis-di-jobs
-
对于较大的更改,例如添加另一个作业文件,请编辑本地
jobs
文件夹中的文件,然后运行此命令kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
注意:使用任一选项更新 ConfigMap 后,您需要运行
kubectl exec -it pod/redis-di-cli -- redis-di deploy
。