学习

Redis 触发器和函数入门

Will Johnston
作者
Will Johnston, Redis 开发者增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 技术解决方案开发者

您将在此教程中学到什么#

已废弃警告

自版本 (Redis Stack) 7.4 起,Redis Triggers and Functions (实验性预览功能) 目前已被标记为废弃。

我们建议探索替代方案,例如 RDI (Redis Data Integration)

在本篇关于 Redis 7.2 的 触发器和函数 的综合教程中,您将获得以下领域的见解和实践技能

  • 理解 Redis 触发器和函数: 掌握 Redis 新的可编程性功能的基础知识,包括如何使用 JavaScript 代码来实现数据驱动的触发器和函数。
  • 应用场景: 探索电子商务环境中的实际应用,例如库存管理和销售统计计算。
  • 触发器类型: 了解按需触发器(On-demand Triggers)、键空间触发器(KeySpace Triggers)和流触发器(Stream Triggers)的区别和用例。
  • 动手实践: 通过在模拟的电子商务环境中创建和部署各种触发器和函数来获得实践经验。

电子商务应用的微服务架构#

GITHUB 代码

以下是用于克隆本教程中使用的应用程序源代码的命令

git clone --branch v9.2.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

让我们看一下演示应用程序的架构

  1. 1.products service: 处理从数据库查询产品并将其返回给前端的功能
  2. 2.orders service: 处理验证和创建订单的功能
  3. 3.order history service: 处理查询客户订单历史的功能
  4. 4.payments service: 处理订单支付的功能
  5. 5.api gateway: 将服务统一在一个单一端点下
  6. 6.mongodb/ postgresql: 用作存储订单、订单历史、产品等的写优化数据库
信息

您无需在演示应用中使用 MongoDB/ Postgresql 作为写优化数据库;您也可以使用其他 prisma 支持的数据库 。这只是一个示例。

使用 Next.js 和 Tailwind 的电子商务应用前端#

该电子商务微服务应用由前端组成,前端使用 Next.js 和 TailwindCSS 构建。应用后端使用 Node.js。数据存储在 Redis 和 MongoDB 或 PostgreSQL 中,使用 Prisma。以下是展示电子商务应用前端的截图。

仪表盘:显示带有不同搜索功能的产品列表,可在设置页面配置。

设置通过点击仪表盘右上角的齿轮图标访问。在此控制搜索栏、聊天机器人可见性以及其他功能。

仪表盘(语义文本搜索)配置为语义文本搜索后,搜索栏支持自然语言查询。示例:“纯棉蓝色衬衫”。

购物车将产品添加到购物车,然后点击“立即购买”按钮结账。

订单历史购买后,顶部导航栏中的“订单”链接显示订单状态和历史记录。

管理面板通过顶部导航中的“admin”链接访问。显示购买统计数据和热门产品。

什么是触发器和函数?#

触发器和函数是 Redis 可编程性方面具有革命性的一步,于 Redis 7.2 中引入。此功能使开发者能够直接在 Redis 数据库中编程、存储和执行 JavaScript 代码 以响应数据变化,类似于传统 SQL 数据库中的存储过程或触发器。

此功能使开发者能够定义事件(称为 triggers),以执行更靠近数据的 functions。也就是说,开发者定义了响应数据库事件或命令执行的业务逻辑。这加快了代码和相关交互的速度,因为无需等待将代码从客户端带入数据库。

优势#

将触发器和函数集成到 Redis 中,利用了其著名的实时性能和简单性

  • 降低延迟: 通过直接在 Redis 中处理任务,可以最大程度地减少网络开销,节省时间和计算资源。
  • 实时事件处理: 触发器以 实时 方式执行,并保持命令的 原子性。这消除了通过应用代码应用异步逻辑可能引入的数据不一致性。
  • JavaScript: 使用专业开发者最熟悉的语言。与早期 Redis Gears 中不太为人所知的 Lua 函数相比,这降低了学习曲线。
  • 兼容性: 与现有的 Redis Stack 功能和数据结构无缝集成。

触发器和函数类型#

Redis 中的触发器和函数根据其激活方法可分为三种类型

  • 按需触发器: 这些通过直接调用它们来显式触发。
  • 键空间触发器: 由对键的操作触发,例如创建、更新或删除。
  • 流触发器: 当新的条目添加到 Redis 流时激活。

示例产品数据#

为了说明触发器和函数的应用,我们来看一个简化的电子商务数据集。该数据集包含详细的产品信息,我们将在整个教程中使用它。

database/fashion-dataset/001/products/*.json
const products = [
  {
    productId: '11000',
    price: 3995,
    productDisplayName: 'Puma Men Slick 3HD Yellow Black Watches',
    variantName: 'Slick 3HD Yellow',
    brandName: 'Puma',
    ageGroup: 'Adults-Men',
    gender: 'Men',
    displayCategories: 'Accessories',
    masterCategory_typeName: 'Accessories',
    subCategory_typeName: 'Watches',
    styleImages_default_imageURL:
      'http://host.docker.internal:8080/images/11000.jpg',
    productDescriptors_description_value: 'Stylish and comfortable, ...',
    stockQty: 25,
  },
  //...
];

按需触发器#

Redis 中的按需触发器是显式调用以执行特定任务的 JavaScript 函数。

应用场景:重置库存#

在我们的电子商务演示中,考虑一个需要重置所有产品库存数量的功能。我们将通过点击 UI 仪表盘中的 RESET STOCK QTY 按钮来实现,这将触发 resetInventory 函数。

创建函数#

让我们创建一个名为 resetInventory 的函数,命名空间为 OnDemandTriggers。此函数将所有产品的库存(库存数量)重置为 25。

database/src/triggers/on-demand-trigger.js
#!js name=OnDemandTriggers api_version=1.0

redis.registerAsyncFunction('resetInventory', async function (client) {
  let cursor = '0';
  const DEFAULT_PRODUCT_QTY = 25;

  redis.log('resetInventory');
  do {
    client.block((client) => {
      //scan all the product keys in the database
      let res = client.call('scan', cursor, 'match', 'products:productId:*');
      cursor = res[0];
      let keys = res[1];
      // loop through all the product keys and set the stockQty to 25
      keys.forEach((key) => {
        if (!key.match('index:hash')) {
          client.call(
            'JSON.SET',
            key,
            '$.stockQty',
            DEFAULT_PRODUCT_QTY.toString(),
          );
        }
      });
    });
  } while (cursor != '0');

  return 'resetInventory completed !';
});

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

  1. 1.使用 redis-cli
redis-cli  -x TFUNCTION LOAD < ./on-demand-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./on-demand-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/on-demand-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的 触发器和函数 部分,然后到 库,并使用创建库来粘贴和保存您的函数。

测试函数#

1. 使用 redis-cli

redis-cli TFCALLASYNC OnDemandTriggers.resetInventory 0

2. 使用代码

点击“RESET STOCK QTY”按钮触发 triggerResetInventory API。

POST https://:3000/products/triggerResetInventory
{
}

这将调用 resetInventory 函数

server/src/services/products/src/service-impl.ts
const triggerResetInventory = async () => {
  const redisClient = getNodeRedisClient();

  //@ts-ignore
  const result = await redisClient.sendCommand(
    ['TFCALLASYNC', 'OnDemandTriggers.resetInventory', '0'],
    {
      isolated: true,
    },
  );
  console.log(`triggerResetInventory :  `, result);

  return result;
};

3. 使用 RedisInsight

在 RedisInsight 的工作台测试命令并查看结果。

验证数据完整性#

执行后,检查每个产品的 stockQty 是否已重置为默认值。

键空间触发器#

键空间触发器允许您在 Redis 数据库中添加/修改符合特定模式的键时执行自定义逻辑。它提供了一种响应数据变化并根据这些变化执行操作的方法。

应用场景:管理产品库存数量#

在我们的电子商务演示中,让我们解决一个常见需求:下单后减少产品库存数量。我们将使用一个 KeySpace trigger 来实现,该触发器监听 orders:orderId 键,并相应地更新产品库存数量。

创建函数#

我们将开发 updateProductStockQty 函数,命名空间为 KeySpaceTriggers。此函数将负责根据订单详情调整库存数量。

database/src/triggers/key-space-trigger.js
#!js name=KeySpaceTriggers api_version=1.0
redis.registerKeySpaceTrigger(
  'updateProductStockQty',
  'orders:orderId:', // Keys starting with this prefix are monitored
  function (client, data) {
    const errors = [];

    try {
      if (
        client &&
        data?.event == 'json.set' &&
        data?.key != 'orders:orderId:index:hash'
      ) {
        const orderId = data.key;
        // get the order details from the orderId key
        let result = client.call('JSON.GET', orderId);
        result = result ? JSON.parse(result) : '';
        const order = Array.isArray(result) ? result[0] : result;

        if (order?.products?.length && !order.triggerProcessed) {
          try {
            //create a log stream to log the trigger events and errors
            client.call(
              'XGROUP',
              'CREATE',
              'TRIGGER_LOGS_STREAM',
              'TRIGGER_LOGS_GROUP',
              '$',
              'MKSTREAM',
            );
          } catch (streamConErr) {
            // if log stream already exists
          }

          // reduce stockQty for each product in the order
          for (const product of order.products) {
            let decreaseQtyBy = (-1 * product.qty).toString();
            client.call(
              'JSON.NUMINCRBY',
              `products:productId:${product.productId}`,
              '.stockQty',
              decreaseQtyBy,
            );

            // add log entry
            client.call(
              'XADD',
              'TRIGGER_LOGS_STREAM',
              '*',
              'message',
              `For productId ${product.productId}, stockQty ${decreaseQtyBy}`,
              'orderId',
              orderId,
              'function',
              'updateProductStockQty',
            );
          }

          // set triggerProcessed flag to avoid duplicate processing
          client.call('JSON.SET', orderId, '.triggerProcessed', '1');
        }
      }
    } catch (generalErr) {
      generalErr = JSON.stringify(
        generalErr,
        Object.getOwnPropertyNames(generalErr),
      );
      errors.push(generalErr);
    }

    if (errors.length) {
      //log error
      client.call(
        'XADD',
        'TRIGGER_LOGS_STREAM',
        '*',
        'message',
        JSON.stringify(errors),
        'orderId',
        data.key,
        'function',
        'updateProductStockQty',
      );
    }
  },
);

在此脚本中,我们监听 orders:orderId: 键的变化。检测到新订单后,函数将检索订单详情并相应地减少订单中每个产品的库存数量。

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

1. 使用 redis-cli

redis-cli  -x TFUNCTION LOAD < ./key-space-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./key-space-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/key-space-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的触发器和函数部分,然后到库,并使用创建库来粘贴和保存您的函数。

测试函数#

在我们的演示中,通过点击 Buy Now 按钮下单会触发 createOrder API,该 API 进而创建一个新的 orders:orderId: 键,从而激活 updateProductStockQty 函数。

示例 createOrder API 请求

POST http://:3000/orders/createOrder
{
  "products": [
    {
      "productId": "11002",
      "qty": 1,
      "productPrice": 4950,
    },
    {
      "productId": "11012",
      "qty": 2,
      "productPrice": 1195,
    }
  ]
}

Redis 中的示例订单创建命令

"JSON.SET" "orders:orderId:24b38a47-2b7d-4c5d-ba25-b74749e34c65" "$" "{"products":[{"productId":"10381","qty":1,"productPrice":2499,"productData":{}},{"productId":"11030","qty":1,"productPrice":1099,"productData":{}}],"userId":"USR_f0f00a86-7131-40e1-9d89-765b4cc1927f","orderId":"24b38a47-2b7d-4c5d-ba25-b74749e34c65","orderStatusCode":1}"

此新键的创建触发 updateProductStockQty,从而导致库存数量的调整。

在 TRIGGER_LOGS_STREAM 中监控触发器的活动,查看日志和潜在错误。

验证数据完整性#

函数执行后,验证每个相关产品的 stockQty 是否已减少。

流触发器#

流触发器允许您监听 Redis 流,并在数据添加到流时执行函数。它常用于实时数据处理和事件驱动架构。

应用场景:计算销售统计#

在我们的电子商务演示中,考虑一个需要计算产品销售统计数据的功能。我们将使用一个 Stream trigger 来实现,该触发器监听 TRANSACTION_STREAM,并相应地更新销售统计数据。

创建函数#

我们将开发 calculateStats 函数,命名空间为 StreamTriggers。此函数将负责根据订单详情计算销售统计数据。

database/src/triggers/stream-trigger.js
#!js name=StreamTriggers api_version=1.0

redis.registerStreamTrigger(
  'calculateStats', // trigger name
  'TRANSACTION_STREAM', // Detects new data added to the stream
  function (client, data) {
    var streamEntry = {};
    for (let i = 0; i < data.record?.length; i++) {
      streamEntry[data.record[i][0]] = data.record[i][1];
    }

    streamEntry.transactionPipeline = JSON.parse(
      streamEntry.transactionPipeline,
    );
    streamEntry.orderDetails = JSON.parse(streamEntry.orderDetails);

    if (
      streamEntry.transactionPipeline?.length == 1 &&
      streamEntry.transactionPipeline[0] == 'PAYMENT_PROCESSED' &&
      streamEntry.orderDetails
    ) {
      //log
      client.call(
        'XADD',
        'TRIGGER_LOGS_STREAM',
        '*',
        'message',
        `${streamEntry.transactionPipeline}`,
        'orderId',
        `orders:orderId:${streamEntry.orderDetails.orderId}`,
        'function',
        'calculateStats',
      );

      const orderAmount = parseInt(streamEntry.orderDetails.orderAmount); //remove decimal
      const products = streamEntry.orderDetails.products;

      // sales
      client.call('INCRBY', 'statsTotalPurchaseAmount', orderAmount.toString());

      for (let product of products) {
        const totalProductAmount =
          parseInt(product.qty) * parseInt(product.productPrice);

        // trending products
        client.call(
          'ZINCRBY',
          'statsProductPurchaseQtySet',
          product.qty.toString(),
          product.productId,
        );

        // category wise purchase interest
        const category = (
          product.productData.masterCategory_typeName +
          ':' +
          product.productData.subCategory_typeName
        ).toLowerCase();
        client.call(
          'ZINCRBY',
          'statsCategoryPurchaseAmountSet',
          totalProductAmount.toString(),
          category,
        );

        // largest brand purchases
        const brand = product.productData.brandName;
        client.call(
          'ZINCRBY',
          'statsBrandPurchaseAmountSet',
          totalProductAmount.toString(),
          brand,
        );
      }
    }
  },
  {
    isStreamTrimmed: false, //whether the stream should be trimmed automatically after the data is processed by the consumer.
    window: 1,
  },
);

在上面的 calculateStats 函数中,我们正在监听 TRANSACTION_STREAM 并更新不同的销售统计数据,例如

  • statsTotalPurchaseAmount 变量存储总购买金额
  • statsProductPurchaseQtySet 是一个有序集合,用于根据最高购买数量跟踪热门产品
  • statsCategoryPurchaseAmountSet 是一个有序集合,用于跟踪按类别划分的购买兴趣
  • statsBrandPurchaseAmountSet 是一个有序集合,用于跟踪最大品牌购买量

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

1. 使用 redis-cli

redis-cli  -x TFUNCTION LOAD < ./stream-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./stream-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/stream-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的 Triggers and Functions 部分,然后到 Libraries,并使用创建库来粘贴和保存您的函数。

测试函数#

在我们的演示中,通过 Buy Now 按钮下单会创建一个新订单,涉及不同的交易步骤,例如交易风险评估、支付完成等。所有这些步骤以及订单详情都记录在 TRANSACTION_STREAM 中。

向流添加详情的示例代码

const addMessageToTransactionStream = async (message) => {
  if (message) {
    const streamKeyName = 'TRANSACTION_STREAM';
    try {
      const nodeRedisClient = getNodeRedisClient();
      if (nodeRedisClient && message) {
        const id = '*'; //* = auto generate
        await nodeRedisClient.xAdd(streamKeyName, id, message);
      }
    } catch (err) {
      console.error('addMessageToTransactionStream error !', err);
    }
  }
};

向流添加详情的示例命令

"XADD" "TRANSACTION_STREAM" "*" "action" "PAYMENT_PROCESSED" "userId" "USR_f0f00a86-7131"  "orderDetails" "{'orderId':'bc438c5d-117e-41bd-97fa-943c03be0b1c','products':[],'paymentId':'clrrl8yp50007pf851m7f92u2'}" "transactionPipeline" "['PAYMENT_PROCESSED']"

calculateStats 函数监听 TRANSACTION_STREAM 流的 PAYMENT_PROCESSED 操作,并相应地更新销售统计数据。

在 RedisInsight 中检查触发器函数 calculateStats 中使用的不同统计变量值。

验证数据完整性#

函数执行后,验证更新的管理仪表盘。

管理面板:通过顶部导航中的“admin”链接访问。在 UI 中查看购买统计数据和热门产品。

准备好使用 Redis 触发器和函数#

我们已经介绍了诸如 On-demandKeySpace 和 Stream 触发器等关键概念,并将它们应用于实际的电子商务场景中。Redis 的这些高级功能为数据处理和自动化开辟了无数的可能性,让您能够构建不仅更快而且更智能的应用。

随着你继续探索 Redis 及其不断发展的生态系统,请记住这些 触发器和函数 仅仅是一个开始。Redis 提供了丰富的功能集,可以创造性地组合起来解决复杂问题并提供高性能解决方案。

参考资料#

创建函数#

请点击您的文本内容,然后按回车。