数据转换管道

了解如何将数据转换为 Redis 类型

Write-behind 的数据转换功能允许用户将数据转换为超出源类型到 Redis 类型的默认转换范围。此转换不涉及编码。相反,它在每张源表对应的一个人类可读的 YAML 文件集中进行描述。

摄取格式和类型因源而异。目前,唯一支持的源是 Debezium。从 Debezium 类型到包含 Redis 类型的原生 JSON 的首次转换会自动完成,无需用户指示。然后,此 JSON 会传递到用户定义的数据转换管道。

每个作业描述了对单个源中的数据执行的转换逻辑。源通常是一个数据库表或集合,并指定为此表/集合的全名。作业可能包含过滤逻辑,以跳过符合特定条件的数据。作业中的其他逻辑步骤会将数据转换为所需的输出,这些输出将作为哈希或 JSON 存储在 Redis 中。

默认作业

在需要对所有摄取记录执行转换但不对特定表创建特定作业的情况下,使用默认作业。与此作业关联的转换将应用于所有缺乏自身明确定义作业的表。默认作业的表名必须是“*”,并且只允许存在一个此类作业实例。

例如,默认作业可以简化任务,例如为 Redis 键添加前缀或后缀,或为新的哈希和 JSON 添加字段,而无需自定义每个源表。

目前,默认作业仅支持摄取管道。

示例

此示例演示了使用 add_field 块向所有缺乏明确定义作业的表添加一个值为 fooapp_code 字段的过程。此外,它还为每个生成的哈希键附加一个 aws 前缀和一个 gcp 后缀。

default.yaml

source:
  table: "*"
  row_format: full
transform:
  - uses: add_field
    with:
      fields:
        - field: after.app_code
          expression: "`foo`"
          language: jmespath
output:
  - uses: redis.write
    with:
      data_type: hash
      key:
        expression: concat(['aws', '#', table, '#', keys(key)[0], '#', values(key)[0], '#gcp'])
        language: jmespath

作业

每个作业都在单独的 YAML 文件中定义。所有这些文件将使用 deploy 命令上传到 Write-behind。有关更多信息,请参阅部署配置)。如果您使用 scaffold 命令,请将作业文件放在 jobs 文件夹中。

作业 YAML 结构

字段

  • :

    此部分描述作业操作的表

    • server_name:逻辑服务器名(可选)。对应于 Debezium Server 的 application.properties 配置文件中指定的 debezium.source.topic.prefix 属性
    • db:数据库名(可选)
    • schema:数据库 schema(可选)
    • table:数据库表名
    • row_format:要转换的数据格式:data_only(默认)- 仅有效载荷,full - 完整变更记录

注意:默认情况下,对属性 server_namedbschematable 的任何引用都将不区分大小写。可以通过将 case_insensitive 设置为 false 来更改此行为。

仅限 Cassandra:在 Cassandra 中,keyspace 大致相当于其他数据库中的 schema。Write-behind 使用作业文件中声明的 schema 属性来匹配传入变更记录的 keyspace 属性。

仅限 MongoDB:在 MongoDB 中,replica set 是包含数据的分片集群,大致可以视为关系数据库中的 schema。MongoDB 的 collection 类似于其他数据库中的 table。Write-behind 使用作业文件中声明的 schematable 属性分别匹配传入变更记录的 replica setcollection 属性。

  • 转换:

    此部分包含一系列定义如何转换数据的块。有关更多信息,请参阅支持的块JMESPath 自定义函数

  • 输出:

    此部分定义处理后数据的输出目标

    • Cassandra
      • usescassandra.write:写入 Cassandra 数据存储
      • with:
        • connection:连接名称
        • keyspace:keyspace
        • table:目标表
        • keys:键列数组
        • mapping:映射列数组
        • opcode_field:有效载荷中保存此记录在数据库中操作(c - 创建,d - 删除,u - 更新)的字段名称
    • Redis
      • usesredis.write:写入 Redis 数据结构。同一定义作业中允许多个此类块
      • with:
        • connection:在 config.yaml 中定义的连接名称(默认使用名为 'target' 的连接)
        • data_type:写入 Redis 时目标数据结构(支持的值有 hash、json、set 和 stream)
        • key:允许通过应用自定义逻辑覆盖记录的键
          • expression:要执行的表达式
          • language:表达式语言,JMESPath 或 SQL
        • expire:一个正整数值,表示键的过期秒数。如果未设置,键将永不过期
    • SQL
      • usesrelational.write:写入兼容 SQL 的数据存储
      • with:
        • connection:连接名称
        • schema:schema
        • table:目标表名
        • keys:键列数组
        • mapping:映射列数组
        • opcode_field:有效载荷中保存此记录在数据库中操作(c - 创建,d - 删除,u - 更新)的字段名称

注意事项

  • source 是必需的。
  • 必须指定 transformkey 或两者。

在转换中使用键

要访问 Redis 键(例如在 write-behind 作业中),您需要执行以下步骤

  • 设置 row_format: full 以允许访问作为完整数据条目一部分的键。
  • 使用表达式 key.key 以字符串形式获取 Redis 键。

Before 和 after 值

更新事件通常报告 beforeafter 部分,提供对更新前后的数据状态的访问。要显式访问“before”值,您需要

  • 设置 row_format: full 以允许访问作为完整数据条目一部分的键。
  • 使用 before.<FIELD_NAME> 模式。

示例

此示例演示了如何使用 rename_field 块将表 emp 中的 fname 字段重命名为 first_name。它还演示了如何设置此记录的键,而不是依赖默认逻辑。

redislabs.dbo.emp.yaml

source:
  server_name: redislabs
  schema: dbo
  table: emp
transform:
  - uses: rename_field
    with:
      from_field: fname
      to_field: first_name
output:
  - uses: redis.write
    with:
      connection: target
      key:
        expression: concat(['emp:fname:',fname,':lname:',lname])
        language: jmespath

部署配置

为了将您的作业部署到远程 Write-behind 数据库,请运行

redis-di deploy

在 Kubernetes 上部署配置

如果 Write-behind CLI 作为 pod 部署在 Kubernetes 集群中,请执行以下步骤部署您的作业

  • jobs 文件夹中的 YAML 文件创建一个 ConfigMap

    kubectl create configmap redis-di-jobs --from-file=jobs/
    
  • 部署您的作业

    kubectl exec -it pod/redis-di-cli -- redis-di deploy
    

注意:创建/修改 ConfigMap 与其在 redis-di-cli pod 中可用之间存在延迟。请等待约 30 秒后再运行 redis-di deploy 命令。

您有两种更新 ConfigMap 的选项

  • 对于较小的更改,您可以使用此命令直接编辑 ConfigMap

    kubectl edit configmap redis-di-jobs
    
  • 对于较大的更改,例如添加另一个作业文件,请编辑本地 jobs 文件夹中的文件,然后运行此命令

    kubectl create configmap redis-di-jobs --from-file=jobs/ --dry-run=client -o yaml | kubectl apply -f -
    

注意:使用任一选项更新 ConfigMap 后,您需要运行 kubectl exec -it pod/redis-di-cli -- redis-di deploy

评价此页
返回顶部 ↑