学习

Redis 中的触发器和函数入门

Will Johnston
作者
Will Johnston, Redis 开发者增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 技术解决方案开发人员

您将在本教程中学到的内容#

弃用警告

从 (Redis Stack) 7.4 版本开始,Redis 触发器和函数(实验性预览功能)目前已标记为弃用

我们建议您探索其他替代方案,例如 RDI (Redis 数据集成)

在本篇关于 Redis 7.2 的 触发器和函数 的综合教程中,您将获得以下方面的见解和实践技能

  • 了解 Redis 触发器和函数:掌握 Redis 新的可编程性功能的基础知识,包括如何使用 JavaScript 代码创建数据驱动的触发器和函数。
  • 应用程序场景:探索电子商务环境中的实际应用程序,例如库存管理和销售统计计算。
  • 触发器类型:学习 按需 触发器、 KeySpace 触发器和 Stream 触发器的区别和用例。
  • 动手实践:通过在模拟的电子商务环境中创建和部署各种触发器和函数,获得实践经验。

电子商务应用程序的微服务架构#

GITHUB 代码

以下命令用于克隆本教程中使用的应用程序的源代码

git clone --branch v9.2.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

让我们看一下演示应用程序的架构

  1. 1.产品服务:处理从数据库查询产品并将其返回给前端
  2. 2.订单服务:处理验证和创建订单
  3. 3.订单历史服务:处理查询客户的订单历史记录
  4. 4.支付服务:处理订单支付
  5. 5.API 网关:将服务统一在单个端点下
  6. 6.mongodb/ postgresql:充当写入优化的数据库,用于存储订单、订单历史记录、产品等。
信息

您无需在演示应用程序中使用 MongoDB/ Postgresql 作为写入优化的数据库;您也可以使用其他 prisma 支持的数据库 。这只是一个例子。

使用 Next.js 和 Tailwind 的电子商务应用程序前端#

电子商务微服务应用程序包含一个前端,使用 Next.js 与 TailwindCSS 构建。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB 或 PostgreSQL 中,使用 Prisma。以下是展示电子商务应用程序前端的屏幕截图。

仪表板: 显示产品列表,具有不同的搜索功能,可在设置页面中配置。

设置: 可以通过单击仪表板右上角的齿轮图标访问。在这里控制搜索栏、聊天机器人可见性和其他功能。

仪表板(语义文本搜索): 配置为语义文本搜索,搜索栏启用自然语言查询。示例:“纯棉蓝色衬衫”。

购物车: 将产品添加到购物车,并使用“立即购买”按钮结账。

订单历史记录: 购买后,顶部导航栏中的“订单”链接将显示订单状态和历史记录。

管理面板: 可以通过顶部导航栏中的“管理”链接访问。显示购买统计数据和趋势产品。

什么是触发器和函数?#

触发器和函数代表了 Redis 可编程性的一项革命性进步,在 Redis 7.2 中引入。此功能使开发人员能够直接在 Redis 数据库中对数据更改进行编程、存储和执行 JavaScript 代码 ,类似于传统 SQL 数据库中的存储过程或触发器。

此功能允许开发人员定义事件(称为 触发器)以执行 函数 ,更靠近数据。也就是说,开发人员定义了响应数据库事件或命令执行的业务逻辑。这将加快代码和相关交互的速度,因为无需等待将代码从客户端引入数据库。

优势#

将触发器和函数集成到 Redis 中,可以利用其著名的实时性能和简单性

  • 降低延迟:通过直接在 Redis 中处理任务,您可以最大限度地减少网络开销,节省时间和计算资源。
  • 实时事件处理:触发器在 实时 中执行,并保持命令的 原子性 。这消除了通过应用程序代码应用异步逻辑引入的潜在数据不一致问题。
  • JavaScript:使用专业开发人员最熟知的语言。与早期 Redis Gears 中不太为人知的 Lua 函数相比,这降低了学习曲线。
  • 兼容性:与现有的 Redis Stack 功能和数据结构无缝集成。

触发器和函数类型#

Redis 中的触发器和函数可以根据其激活方法分为三种类型

  • 按需触发器:这些是通过直接调用显式调用的。
  • KeySpace 触发器:由对键的操作(如创建、更新或删除)触发。
  • Stream 触发器:当向 Redis 流添加新条目时激活。

示例产品数据#

为了说明触发器和函数的应用,让我们考虑一个简化的电子商务数据集。此数据集包括详细的产品信息,我们将在本教程中使用。

database/fashion-dataset/001/products/*.json
const products = [
  {
    productId: '11000',
    price: 3995,
    productDisplayName: 'Puma Men Slick 3HD Yellow Black Watches',
    variantName: 'Slick 3HD Yellow',
    brandName: 'Puma',
    ageGroup: 'Adults-Men',
    gender: 'Men',
    displayCategories: 'Accessories',
    masterCategory_typeName: 'Accessories',
    subCategory_typeName: 'Watches',
    styleImages_default_imageURL:
      'https://host.docker.internal:8080/images/11000.jpg',
    productDescriptors_description_value: 'Stylish and comfortable, ...',
    stockQty: 25,
  },
  //...
];

按需触发器#

Redis 中的按需触发器是显式调用的 JavaScript 函数,用于执行特定任务。

应用程序场景:重置库存#

在我们的电子商务演示中,考虑一个需要重置所有产品库存数量的功能。我们将通过点击 UI 仪表板中的 RESET STOCK QTY 按钮来实现这一点,从而触发 resetInventory 函数。

创建函数#

让我们在 OnDemandTriggers 命名空间下创建一个名为 resetInventory 的函数。该函数将重置所有产品的库存(库存数量)到 25。

database/src/triggers/on-demand-trigger.js
#!js name=OnDemandTriggers api_version=1.0

redis.registerAsyncFunction('resetInventory', async function (client) {
  let cursor = '0';
  const DEFAULT_PRODUCT_QTY = 25;

  redis.log('resetInventory');
  do {
    client.block((client) => {
      //scan all the product keys in the database
      let res = client.call('scan', cursor, 'match', 'products:productId:*');
      cursor = res[0];
      let keys = res[1];
      // loop through all the product keys and set the stockQty to 25
      keys.forEach((key) => {
        if (!key.match('index:hash')) {
          client.call(
            'JSON.SET',
            key,
            '$.stockQty',
            DEFAULT_PRODUCT_QTY.toString(),
          );
        }
      });
    });
  } while (cursor != '0');

  return 'resetInventory completed !';
});

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

  1. 1.使用 redis-cli
redis-cli  -x TFUNCTION LOAD < ./on-demand-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./on-demand-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/on-demand-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的“触发器和函数”部分,然后导航到“库”,并使用创建库来粘贴并保存您的函数。

测试函数#

1. 使用 redis-cli

redis-cli TFCALLASYNC OnDemandTriggers.resetInventory 0

2. 使用代码

点击“重置库存数量”按钮会触发 triggerResetInventory API。

POST https://localhost:3000/products/triggerResetInventory
{
}

这将调用 resetInventory 函数

server/src/services/products/src/service-impl.ts
const triggerResetInventory = async () => {
  const redisClient = getNodeRedisClient();

  //@ts-ignore
  const result = await redisClient.sendCommand(
    ['TFCALLASYNC', 'OnDemandTriggers.resetInventory', '0'],
    {
      isolated: true,
    },
  );
  console.log(`triggerResetInventory :  `, result);

  return result;
};

3. 使用 RedisInsight

在 RedisInsight 的工作台中测试该命令并查看结果。

验证数据完整性#

执行完成后,检查每个产品的 stockQty 是否重置为默认值。

键空间触发器#

键空间触发器允许您在 Redis 数据库中添加/修改与特定模式匹配的键集时执行自定义逻辑。它提供了一种对数据更改做出反应并根据这些更改执行操作的方法。

应用场景:管理产品库存数量#

在我们的电子商务演示中,让我们解决一个常见需求:在下了订单后减少产品库存数量。我们将使用一个 键空间触发器 来实现这一点,该触发器监听 orders:orderId 键,并相应地更新产品库存数量。

创建函数#

我们将开发 updateProductStockQty 函数,该函数位于 KeySpaceTriggers 命名空间下。该函数将负责根据订单详细信息调整库存数量。

database/src/triggers/key-space-trigger.js
#!js name=KeySpaceTriggers api_version=1.0
redis.registerKeySpaceTrigger(
  'updateProductStockQty',
  'orders:orderId:', // Keys starting with this prefix are monitored
  function (client, data) {
    const errors = [];

    try {
      if (
        client &&
        data?.event == 'json.set' &&
        data?.key != 'orders:orderId:index:hash'
      ) {
        const orderId = data.key;
        // get the order details from the orderId key
        let result = client.call('JSON.GET', orderId);
        result = result ? JSON.parse(result) : '';
        const order = Array.isArray(result) ? result[0] : result;

        if (order?.products?.length && !order.triggerProcessed) {
          try {
            //create a log stream to log the trigger events and errors
            client.call(
              'XGROUP',
              'CREATE',
              'TRIGGER_LOGS_STREAM',
              'TRIGGER_LOGS_GROUP',
              '$',
              'MKSTREAM',
            );
          } catch (streamConErr) {
            // if log stream already exists
          }

          // reduce stockQty for each product in the order
          for (const product of order.products) {
            let decreaseQtyBy = (-1 * product.qty).toString();
            client.call(
              'JSON.NUMINCRBY',
              `products:productId:${product.productId}`,
              '.stockQty',
              decreaseQtyBy,
            );

            // add log entry
            client.call(
              'XADD',
              'TRIGGER_LOGS_STREAM',
              '*',
              'message',
              `For productId ${product.productId}, stockQty ${decreaseQtyBy}`,
              'orderId',
              orderId,
              'function',
              'updateProductStockQty',
            );
          }

          // set triggerProcessed flag to avoid duplicate processing
          client.call('JSON.SET', orderId, '.triggerProcessed', '1');
        }
      }
    } catch (generalErr) {
      generalErr = JSON.stringify(
        generalErr,
        Object.getOwnPropertyNames(generalErr),
      );
      errors.push(generalErr);
    }

    if (errors.length) {
      //log error
      client.call(
        'XADD',
        'TRIGGER_LOGS_STREAM',
        '*',
        'message',
        JSON.stringify(errors),
        'orderId',
        data.key,
        'function',
        'updateProductStockQty',
      );
    }
  },
);

在这个脚本中,我们监听 orders:orderId: 键的变化。当检测到新订单时,该函数会检索订单详细信息,并相应地减少订单中每个产品的库存数量。

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

1. 使用 redis-cli

redis-cli  -x TFUNCTION LOAD < ./key-space-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./key-space-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/key-space-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的“触发器和函数”部分,然后导航到“库”,并使用创建库来粘贴并保存您的函数。

测试函数#

在我们的演示中,通过 立即购买 按钮下订单会触发 createOrder API,该 API 反过来会创建一个新的 orders:orderId: 键,从而激活 updateProductStockQty 函数。

示例 createOrder API 请求

POST http://localhost:3000/orders/createOrder
{
  "products": [
    {
      "productId": "11002",
      "qty": 1,
      "productPrice": 4950,
    },
    {
      "productId": "11012",
      "qty": 2,
      "productPrice": 1195,
    }
  ]
}

在 Redis 中创建订单的示例命令

"JSON.SET" "orders:orderId:24b38a47-2b7d-4c5d-ba25-b74749e34c65" "$" "{"products":[{"productId":"10381","qty":1,"productPrice":2499,"productData":{}},{"productId":"11030","qty":1,"productPrice":1099,"productData":{}}],"userId":"USR_f0f00a86-7131-40e1-9d89-765b4cc1927f","orderId":"24b38a47-2b7d-4c5d-ba25-b74749e34c65","orderStatusCode":1}"

创建这个新键会触发 updateProductStockQty,从而导致库存数量的调整。

在 TRIGGER_LOGS_STREAM 中监控触发器的活动,以查看日志和潜在的错误。

验证数据完整性#

函数执行后,验证每个参与产品的 stockQty 是否已减少。

流触发器#

流触发器允许您监听 Redis 流,并在流中添加新数据时执行函数。它通常用于实时数据处理和事件驱动架构。

应用场景:计算销售统计数据#

在我们的电子商务演示中,让我们考虑一个需要计算产品销售统计数据的功能。我们将使用一个 流触发器 来实现这一点,该触发器监听 TRANSACTION_STREAM,并相应地更新销售统计数据。

创建函数#

我们将开发 calculateStats 函数,该函数位于 StreamTriggers 命名空间下。该函数将负责根据订单详细信息计算销售统计数据。

database/src/triggers/stream-trigger.js
#!js name=StreamTriggers api_version=1.0

redis.registerStreamTrigger(
  'calculateStats', // trigger name
  'TRANSACTION_STREAM', // Detects new data added to the stream
  function (client, data) {
    var streamEntry = {};
    for (let i = 0; i < data.record?.length; i++) {
      streamEntry[data.record[i][0]] = data.record[i][1];
    }

    streamEntry.transactionPipeline = JSON.parse(
      streamEntry.transactionPipeline,
    );
    streamEntry.orderDetails = JSON.parse(streamEntry.orderDetails);

    if (
      streamEntry.transactionPipeline?.length == 1 &&
      streamEntry.transactionPipeline[0] == 'PAYMENT_PROCESSED' &&
      streamEntry.orderDetails
    ) {
      //log
      client.call(
        'XADD',
        'TRIGGER_LOGS_STREAM',
        '*',
        'message',
        `${streamEntry.transactionPipeline}`,
        'orderId',
        `orders:orderId:${streamEntry.orderDetails.orderId}`,
        'function',
        'calculateStats',
      );

      const orderAmount = parseInt(streamEntry.orderDetails.orderAmount); //remove decimal
      const products = streamEntry.orderDetails.products;

      // sales
      client.call('INCRBY', 'statsTotalPurchaseAmount', orderAmount.toString());

      for (let product of products) {
        const totalProductAmount =
          parseInt(product.qty) * parseInt(product.productPrice);

        // trending products
        client.call(
          'ZINCRBY',
          'statsProductPurchaseQtySet',
          product.qty.toString(),
          product.productId,
        );

        // category wise purchase interest
        const category = (
          product.productData.masterCategory_typeName +
          ':' +
          product.productData.subCategory_typeName
        ).toLowerCase();
        client.call(
          'ZINCRBY',
          'statsCategoryPurchaseAmountSet',
          totalProductAmount.toString(),
          category,
        );

        // largest brand purchases
        const brand = product.productData.brandName;
        client.call(
          'ZINCRBY',
          'statsBrandPurchaseAmountSet',
          totalProductAmount.toString(),
          brand,
        );
      }
    }
  },
  {
    isStreamTrimmed: false, //whether the stream should be trimmed automatically after the data is processed by the consumer.
    window: 1,
  },
);

在上面的 calculateStats 函数中,我们监听 TRANSACTION_STREAM 并更新不同的销售统计数据,例如:

  • statsTotalPurchaseAmount 变量存储总购买金额
  • statsProductPurchaseQtySet 是一个排序集,它根据最高的购买数量跟踪趋势产品
  • statsCategoryPurchaseAmountSet 是一个排序集,它跟踪按类别划分的购买兴趣
  • statsBrandPurchaseAmountSet 是一个排序集,它跟踪最大的品牌购买

将函数添加到 Redis#

我们可以使用多种方法将函数添加到 Redis

1. 使用 redis-cli

redis-cli  -x TFUNCTION LOAD < ./stream-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./stream-trigger.js

2. 使用代码

database/src/triggers.ts
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';

async function addTriggerToRedis(
  fileRelativePath: string,
  redisClient: NodeRedisClientType,
) {
  const filePath = path.join(__dirname, fileRelativePath);
  const fileData = await fs.readFile(filePath);
  let jsCode = fileData.toString();
  jsCode = jsCode.replace(/\r?\n/g, '\n');

  try {
    const result = await redisClient.sendCommand([
      'TFUNCTION',
      'LOAD',
      'REPLACE',
      jsCode,
    ]);
    console.log(`addTriggersToRedis ${fileRelativePath}`, result);
  } catch (err) {
    console.log(err);
  }
}
addTriggerToRedis('triggers/stream-trigger.js', redisClient);

3. 使用 RedisInsight

导航到 RedisInsight 中的 触发器和函数 部分,然后导航到 ,并使用创建库来粘贴并保存您的函数。

测试函数#

在我们的演示中,通过 立即购买 按钮下订单会创建一个新订单,其中涉及不同的交易步骤,例如交易风险评估、支付完成等。所有这些步骤以及订单详细信息都记录在 TRANSACTION_STREAM 中。

向流中添加详细信息的示例代码

const addMessageToTransactionStream = async (message) => {
  if (message) {
    const streamKeyName = 'TRANSACTION_STREAM';
    try {
      const nodeRedisClient = getNodeRedisClient();
      if (nodeRedisClient && message) {
        const id = '*'; //* = auto generate
        await nodeRedisClient.xAdd(streamKeyName, id, message);
      }
    } catch (err) {
      console.error('addMessageToTransactionStream error !', err);
    }
  }
};

向流中添加详细信息的示例命令

"XADD" "TRANSACTION_STREAM" "*" "action" "PAYMENT_PROCESSED" "userId" "USR_f0f00a86-7131"  "orderDetails" "{'orderId':'bc438c5d-117e-41bd-97fa-943c03be0b1c','products':[],'paymentId':'clrrl8yp50007pf851m7f92u2'}" "transactionPipeline" "['PAYMENT_PROCESSED']"

该 calculateStats 函数监听 TRANSACTION_STREAM 流中的 PAYMENT_PROCESSED 操作,并相应地更新销售统计数据。

检查 RedisInsight 中使用的不同统计变量值,这些变量在触发器函数 calculateStats 中使用。

验证数据完整性#

函数执行后,验证更新后的管理面板。

**管理面板**: 可通过顶部导航中的“admin”链接访问。在 UI 中检查购买统计数据和趋势产品。

可用的 Redis 触发器和函数#

我们已经涵盖了 按需、 键空间 和  触发器等关键概念,并将它们应用于真实的电子商务场景中。Redis 的这些高级功能为数据处理和自动化开辟了无限的可能性,使您能够构建不仅速度更快而且更智能的应用程序。

当您继续探索 Redis 及其不断发展的生态系统时,请记住这些 触发器和函数 仅仅是开始。Redis 提供了一套丰富的功能,可以以创造性的方式组合起来,以解决复杂的问题并提供高性能解决方案。

参考资料#

创建函数#

请输入您的文本内容,然后按回车键。