从 (Redis Stack) 7.4 版本开始,Redis 触发器和函数(实验性预览功能)目前已标记为弃用
我们建议您探索其他替代方案,例如 RDI (Redis 数据集成)
在本篇关于 Redis 7.2 的 触发器和函数 的综合教程中,您将获得以下方面的见解和实践技能
按需
触发器、 KeySpace
触发器和 Stream
触发器的区别和用例。以下命令用于克隆本教程中使用的应用程序的源代码
git clone --branch v9.2.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
让我们看一下演示应用程序的架构
产品服务
:处理从数据库查询产品并将其返回给前端订单服务
:处理验证和创建订单订单历史服务
:处理查询客户的订单历史记录支付服务
:处理订单支付API 网关
:将服务统一在单个端点下mongodb/ postgresql
:充当写入优化的数据库,用于存储订单、订单历史记录、产品等。您无需在演示应用程序中使用 MongoDB/ Postgresql 作为写入优化的数据库;您也可以使用其他 prisma 支持的数据库 。这只是一个例子。
电子商务微服务应用程序包含一个前端,使用 Next.js 与 TailwindCSS 构建。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB 或 PostgreSQL 中,使用 Prisma。以下是展示电子商务应用程序前端的屏幕截图。
仪表板: 显示产品列表,具有不同的搜索功能,可在设置页面中配置。
设置: 可以通过单击仪表板右上角的齿轮图标访问。在这里控制搜索栏、聊天机器人可见性和其他功能。
仪表板(语义文本搜索): 配置为语义文本搜索,搜索栏启用自然语言查询。示例:“纯棉蓝色衬衫”。
购物车: 将产品添加到购物车,并使用“立即购买”按钮结账。
订单历史记录: 购买后,顶部导航栏中的“订单”链接将显示订单状态和历史记录。
管理面板: 可以通过顶部导航栏中的“管理”链接访问。显示购买统计数据和趋势产品。
触发器和函数代表了 Redis 可编程性的一项革命性进步,在 Redis 7.2 中引入。此功能使开发人员能够直接在 Redis 数据库中对数据更改进行编程、存储和执行 JavaScript 代码 ,类似于传统 SQL 数据库中的存储过程或触发器。
此功能允许开发人员定义事件(称为 触发器
)以执行 函数
,更靠近数据。也就是说,开发人员定义了响应数据库事件或命令执行的业务逻辑。这将加快代码和相关交互的速度,因为无需等待将代码从客户端引入数据库。
将触发器和函数集成到 Redis 中,可以利用其著名的实时性能和简单性
Redis 中的触发器和函数可以根据其激活方法分为三种类型
为了说明触发器和函数的应用,让我们考虑一个简化的电子商务数据集。此数据集包括详细的产品信息,我们将在本教程中使用。
const products = [
{
productId: '11000',
price: 3995,
productDisplayName: 'Puma Men Slick 3HD Yellow Black Watches',
variantName: 'Slick 3HD Yellow',
brandName: 'Puma',
ageGroup: 'Adults-Men',
gender: 'Men',
displayCategories: 'Accessories',
masterCategory_typeName: 'Accessories',
subCategory_typeName: 'Watches',
styleImages_default_imageURL:
'https://host.docker.internal:8080/images/11000.jpg',
productDescriptors_description_value: 'Stylish and comfortable, ...',
stockQty: 25,
},
//...
];
Redis 中的按需触发器是显式调用的 JavaScript 函数,用于执行特定任务。
在我们的电子商务演示中,考虑一个需要重置所有产品库存数量的功能。我们将通过点击 UI 仪表板中的 RESET STOCK QTY
按钮来实现这一点,从而触发 resetInventory
函数。
让我们在 OnDemandTriggers
命名空间下创建一个名为 resetInventory
的函数。该函数将重置所有产品的库存(库存数量)到 25。
#!js name=OnDemandTriggers api_version=1.0
redis.registerAsyncFunction('resetInventory', async function (client) {
let cursor = '0';
const DEFAULT_PRODUCT_QTY = 25;
redis.log('resetInventory');
do {
client.block((client) => {
//scan all the product keys in the database
let res = client.call('scan', cursor, 'match', 'products:productId:*');
cursor = res[0];
let keys = res[1];
// loop through all the product keys and set the stockQty to 25
keys.forEach((key) => {
if (!key.match('index:hash')) {
client.call(
'JSON.SET',
key,
'$.stockQty',
DEFAULT_PRODUCT_QTY.toString(),
);
}
});
});
} while (cursor != '0');
return 'resetInventory completed !';
});
我们可以使用多种方法将函数添加到 Redis
redis-cli -x TFUNCTION LOAD < ./on-demand-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./on-demand-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/on-demand-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的“触发器和函数”部分,然后导航到“库”,并使用创建库来粘贴并保存您的函数。
1. 使用 redis-cli
redis-cli TFCALLASYNC OnDemandTriggers.resetInventory 0
2. 使用代码
点击“重置库存数量”按钮会触发 triggerResetInventory API。
POST https://localhost:3000/products/triggerResetInventory
{
}
这将调用 resetInventory
函数
const triggerResetInventory = async () => {
const redisClient = getNodeRedisClient();
//@ts-ignore
const result = await redisClient.sendCommand(
['TFCALLASYNC', 'OnDemandTriggers.resetInventory', '0'],
{
isolated: true,
},
);
console.log(`triggerResetInventory : `, result);
return result;
};
3. 使用 RedisInsight
在 RedisInsight 的工作台中测试该命令并查看结果。
执行完成后,检查每个产品的 stockQty 是否重置为默认值。
键空间触发器允许您在 Redis 数据库中添加/修改与特定模式匹配的键集时执行自定义逻辑。它提供了一种对数据更改做出反应并根据这些更改执行操作的方法。
在我们的电子商务演示中,让我们解决一个常见需求:在下了订单后减少产品库存数量。我们将使用一个 键空间触发器
来实现这一点,该触发器监听 orders:orderId
键,并相应地更新产品库存数量。
我们将开发 updateProductStockQty
函数,该函数位于 KeySpaceTriggers
命名空间下。该函数将负责根据订单详细信息调整库存数量。
#!js name=KeySpaceTriggers api_version=1.0
redis.registerKeySpaceTrigger(
'updateProductStockQty',
'orders:orderId:', // Keys starting with this prefix are monitored
function (client, data) {
const errors = [];
try {
if (
client &&
data?.event == 'json.set' &&
data?.key != 'orders:orderId:index:hash'
) {
const orderId = data.key;
// get the order details from the orderId key
let result = client.call('JSON.GET', orderId);
result = result ? JSON.parse(result) : '';
const order = Array.isArray(result) ? result[0] : result;
if (order?.products?.length && !order.triggerProcessed) {
try {
//create a log stream to log the trigger events and errors
client.call(
'XGROUP',
'CREATE',
'TRIGGER_LOGS_STREAM',
'TRIGGER_LOGS_GROUP',
'$',
'MKSTREAM',
);
} catch (streamConErr) {
// if log stream already exists
}
// reduce stockQty for each product in the order
for (const product of order.products) {
let decreaseQtyBy = (-1 * product.qty).toString();
client.call(
'JSON.NUMINCRBY',
`products:productId:${product.productId}`,
'.stockQty',
decreaseQtyBy,
);
// add log entry
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
`For productId ${product.productId}, stockQty ${decreaseQtyBy}`,
'orderId',
orderId,
'function',
'updateProductStockQty',
);
}
// set triggerProcessed flag to avoid duplicate processing
client.call('JSON.SET', orderId, '.triggerProcessed', '1');
}
}
} catch (generalErr) {
generalErr = JSON.stringify(
generalErr,
Object.getOwnPropertyNames(generalErr),
);
errors.push(generalErr);
}
if (errors.length) {
//log error
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
JSON.stringify(errors),
'orderId',
data.key,
'function',
'updateProductStockQty',
);
}
},
);
在这个脚本中,我们监听 orders:orderId:
键的变化。当检测到新订单时,该函数会检索订单详细信息,并相应地减少订单中每个产品的库存数量。
我们可以使用多种方法将函数添加到 Redis
1. 使用 redis-cli
redis-cli -x TFUNCTION LOAD < ./key-space-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./key-space-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/key-space-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的“触发器和函数”部分,然后导航到“库”,并使用创建库来粘贴并保存您的函数。
在我们的演示中,通过 立即购买
按钮下订单会触发 createOrder
API,该 API 反过来会创建一个新的 orders:orderId:
键,从而激活 updateProductStockQty
函数。
示例 createOrder API 请求
POST http://localhost:3000/orders/createOrder
{
"products": [
{
"productId": "11002",
"qty": 1,
"productPrice": 4950,
},
{
"productId": "11012",
"qty": 2,
"productPrice": 1195,
}
]
}
在 Redis 中创建订单的示例命令
"JSON.SET" "orders:orderId:24b38a47-2b7d-4c5d-ba25-b74749e34c65" "$" "{"products":[{"productId":"10381","qty":1,"productPrice":2499,"productData":{}},{"productId":"11030","qty":1,"productPrice":1099,"productData":{}}],"userId":"USR_f0f00a86-7131-40e1-9d89-765b4cc1927f","orderId":"24b38a47-2b7d-4c5d-ba25-b74749e34c65","orderStatusCode":1}"
创建这个新键会触发 updateProductStockQty
,从而导致库存数量的调整。
在 TRIGGER_LOGS_STREAM
中监控触发器的活动,以查看日志和潜在的错误。
函数执行后,验证每个参与产品的 stockQty
是否已减少。
流触发器允许您监听 Redis 流,并在流中添加新数据时执行函数。它通常用于实时数据处理和事件驱动架构。
在我们的电子商务演示中,让我们考虑一个需要计算产品销售统计数据的功能。我们将使用一个 流触发器
来实现这一点,该触发器监听 TRANSACTION_STREAM
,并相应地更新销售统计数据。
我们将开发 calculateStats
函数,该函数位于 StreamTriggers
命名空间下。该函数将负责根据订单详细信息计算销售统计数据。
#!js name=StreamTriggers api_version=1.0
redis.registerStreamTrigger(
'calculateStats', // trigger name
'TRANSACTION_STREAM', // Detects new data added to the stream
function (client, data) {
var streamEntry = {};
for (let i = 0; i < data.record?.length; i++) {
streamEntry[data.record[i][0]] = data.record[i][1];
}
streamEntry.transactionPipeline = JSON.parse(
streamEntry.transactionPipeline,
);
streamEntry.orderDetails = JSON.parse(streamEntry.orderDetails);
if (
streamEntry.transactionPipeline?.length == 1 &&
streamEntry.transactionPipeline[0] == 'PAYMENT_PROCESSED' &&
streamEntry.orderDetails
) {
//log
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
`${streamEntry.transactionPipeline}`,
'orderId',
`orders:orderId:${streamEntry.orderDetails.orderId}`,
'function',
'calculateStats',
);
const orderAmount = parseInt(streamEntry.orderDetails.orderAmount); //remove decimal
const products = streamEntry.orderDetails.products;
// sales
client.call('INCRBY', 'statsTotalPurchaseAmount', orderAmount.toString());
for (let product of products) {
const totalProductAmount =
parseInt(product.qty) * parseInt(product.productPrice);
// trending products
client.call(
'ZINCRBY',
'statsProductPurchaseQtySet',
product.qty.toString(),
product.productId,
);
// category wise purchase interest
const category = (
product.productData.masterCategory_typeName +
':' +
product.productData.subCategory_typeName
).toLowerCase();
client.call(
'ZINCRBY',
'statsCategoryPurchaseAmountSet',
totalProductAmount.toString(),
category,
);
// largest brand purchases
const brand = product.productData.brandName;
client.call(
'ZINCRBY',
'statsBrandPurchaseAmountSet',
totalProductAmount.toString(),
brand,
);
}
}
},
{
isStreamTrimmed: false, //whether the stream should be trimmed automatically after the data is processed by the consumer.
window: 1,
},
);
在上面的 calculateStats
函数中,我们监听 TRANSACTION_STREAM
并更新不同的销售统计数据,例如:
statsTotalPurchaseAmount
变量存储总购买金额statsProductPurchaseQtySet
是一个排序集,它根据最高的购买数量跟踪趋势产品statsCategoryPurchaseAmountSet
是一个排序集,它跟踪按类别划分的购买兴趣statsBrandPurchaseAmountSet
是一个排序集,它跟踪最大的品牌购买我们可以使用多种方法将函数添加到 Redis
1. 使用 redis-cli
redis-cli -x TFUNCTION LOAD < ./stream-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./stream-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/stream-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的 触发器和函数
部分,然后导航到 库
,并使用创建库来粘贴并保存您的函数。
在我们的演示中,通过 立即购买
按钮下订单会创建一个新订单,其中涉及不同的交易步骤,例如交易风险评估、支付完成等。所有这些步骤以及订单详细信息都记录在 TRANSACTION_STREAM
中。
向流中添加详细信息的示例代码
const addMessageToTransactionStream = async (message) => {
if (message) {
const streamKeyName = 'TRANSACTION_STREAM';
try {
const nodeRedisClient = getNodeRedisClient();
if (nodeRedisClient && message) {
const id = '*'; //* = auto generate
await nodeRedisClient.xAdd(streamKeyName, id, message);
}
} catch (err) {
console.error('addMessageToTransactionStream error !', err);
}
}
};
向流中添加详细信息的示例命令
"XADD" "TRANSACTION_STREAM" "*" "action" "PAYMENT_PROCESSED" "userId" "USR_f0f00a86-7131" "orderDetails" "{'orderId':'bc438c5d-117e-41bd-97fa-943c03be0b1c','products':[],'paymentId':'clrrl8yp50007pf851m7f92u2'}" "transactionPipeline" "['PAYMENT_PROCESSED']"
该 calculateStats
函数监听 TRANSACTION_STREAM
流中的 PAYMENT_PROCESSED
操作,并相应地更新销售统计数据。
检查 RedisInsight 中使用的不同统计变量值,这些变量在触发器函数 calculateStats
中使用。
函数执行后,验证更新后的管理面板。
**管理面板**: 可通过顶部导航中的“admin”链接访问。在 UI 中检查购买统计数据和趋势产品。
我们已经涵盖了 按需
、 键空间
和 流
触发器等关键概念,并将它们应用于真实的电子商务场景中。Redis 的这些高级功能为数据处理和自动化开辟了无限的可能性,使您能够构建不仅速度更快而且更智能的应用程序。
当您继续探索 Redis 及其不断发展的生态系统时,请记住这些 触发器和函数
仅仅是开始。Redis 提供了一套丰富的功能,可以以创造性的方式组合起来,以解决复杂的问题并提供高性能解决方案。
请输入您的文本内容,然后按回车键。