自版本 (Redis Stack) 7.4 起,Redis Triggers and Functions (实验性预览功能) 目前已被标记为废弃。
我们建议探索替代方案,例如 RDI (Redis Data Integration)
在本篇关于 Redis 7.2 的 触发器和函数 的综合教程中,您将获得以下领域的见解和实践技能
On-demand
Triggers)、键空间触发器(KeySpace
Triggers)和流触发器(Stream
Triggers)的区别和用例。以下是用于克隆本教程中使用的应用程序源代码的命令
git clone --branch v9.2.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
让我们看一下演示应用程序的架构
products service
: 处理从数据库查询产品并将其返回给前端的功能orders service
: 处理验证和创建订单的功能order history service
: 处理查询客户订单历史的功能payments service
: 处理订单支付的功能api gateway
: 将服务统一在一个单一端点下mongodb/ postgresql
: 用作存储订单、订单历史、产品等的写优化数据库您无需在演示应用中使用 MongoDB/ Postgresql 作为写优化数据库;您也可以使用其他 prisma 支持的数据库 。这只是一个示例。
该电子商务微服务应用由前端组成,前端使用 Next.js 和 TailwindCSS 构建。应用后端使用 Node.js。数据存储在 Redis 和 MongoDB 或 PostgreSQL 中,使用 Prisma。以下是展示电子商务应用前端的截图。
仪表盘:显示带有不同搜索功能的产品列表,可在设置页面配置。
设置:通过点击仪表盘右上角的齿轮图标访问。在此控制搜索栏、聊天机器人可见性以及其他功能。
仪表盘(语义文本搜索):配置为语义文本搜索后,搜索栏支持自然语言查询。示例:“纯棉蓝色衬衫”。
购物车:将产品添加到购物车,然后点击“立即购买”按钮结账。
订单历史:购买后,顶部导航栏中的“订单”链接显示订单状态和历史记录。
管理面板:通过顶部导航中的“admin”链接访问。显示购买统计数据和热门产品。
触发器和函数是 Redis 可编程性方面具有革命性的一步,于 Redis 7.2 中引入。此功能使开发者能够直接在 Redis 数据库中编程、存储和执行 JavaScript 代码 以响应数据变化,类似于传统 SQL 数据库中的存储过程或触发器。
此功能使开发者能够定义事件(称为 triggers
),以执行更靠近数据的 functions
。也就是说,开发者定义了响应数据库事件或命令执行的业务逻辑。这加快了代码和相关交互的速度,因为无需等待将代码从客户端带入数据库。
将触发器和函数集成到 Redis 中,利用了其著名的实时性能和简单性
Redis 中的触发器和函数根据其激活方法可分为三种类型
为了说明触发器和函数的应用,我们来看一个简化的电子商务数据集。该数据集包含详细的产品信息,我们将在整个教程中使用它。
const products = [
{
productId: '11000',
price: 3995,
productDisplayName: 'Puma Men Slick 3HD Yellow Black Watches',
variantName: 'Slick 3HD Yellow',
brandName: 'Puma',
ageGroup: 'Adults-Men',
gender: 'Men',
displayCategories: 'Accessories',
masterCategory_typeName: 'Accessories',
subCategory_typeName: 'Watches',
styleImages_default_imageURL:
'http://host.docker.internal:8080/images/11000.jpg',
productDescriptors_description_value: 'Stylish and comfortable, ...',
stockQty: 25,
},
//...
];
Redis 中的按需触发器是显式调用以执行特定任务的 JavaScript 函数。
在我们的电子商务演示中,考虑一个需要重置所有产品库存数量的功能。我们将通过点击 UI 仪表盘中的 RESET STOCK QTY
按钮来实现,这将触发 resetInventory
函数。
让我们创建一个名为 resetInventory
的函数,命名空间为 OnDemandTriggers
。此函数将所有产品的库存(库存数量)重置为 25。
#!js name=OnDemandTriggers api_version=1.0
redis.registerAsyncFunction('resetInventory', async function (client) {
let cursor = '0';
const DEFAULT_PRODUCT_QTY = 25;
redis.log('resetInventory');
do {
client.block((client) => {
//scan all the product keys in the database
let res = client.call('scan', cursor, 'match', 'products:productId:*');
cursor = res[0];
let keys = res[1];
// loop through all the product keys and set the stockQty to 25
keys.forEach((key) => {
if (!key.match('index:hash')) {
client.call(
'JSON.SET',
key,
'$.stockQty',
DEFAULT_PRODUCT_QTY.toString(),
);
}
});
});
} while (cursor != '0');
return 'resetInventory completed !';
});
我们可以使用多种方法将函数添加到 Redis
redis-cli -x TFUNCTION LOAD < ./on-demand-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./on-demand-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/on-demand-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的 触发器和函数 部分,然后到 库,并使用创建库来粘贴和保存您的函数。
1. 使用 redis-cli
redis-cli TFCALLASYNC OnDemandTriggers.resetInventory 0
2. 使用代码
点击“RESET STOCK QTY”按钮触发 triggerResetInventory API。
POST https://:3000/products/triggerResetInventory
{
}
这将调用 resetInventory
函数
const triggerResetInventory = async () => {
const redisClient = getNodeRedisClient();
//@ts-ignore
const result = await redisClient.sendCommand(
['TFCALLASYNC', 'OnDemandTriggers.resetInventory', '0'],
{
isolated: true,
},
);
console.log(`triggerResetInventory : `, result);
return result;
};
3. 使用 RedisInsight
在 RedisInsight 的工作台测试命令并查看结果。
执行后,检查每个产品的 stockQty
是否已重置为默认值。
键空间触发器允许您在 Redis 数据库中添加/修改符合特定模式的键时执行自定义逻辑。它提供了一种响应数据变化并根据这些变化执行操作的方法。
在我们的电子商务演示中,让我们解决一个常见需求:下单后减少产品库存数量。我们将使用一个 KeySpace trigger
来实现,该触发器监听 orders:orderId
键,并相应地更新产品库存数量。
我们将开发 updateProductStockQty
函数,命名空间为 KeySpaceTriggers
。此函数将负责根据订单详情调整库存数量。
#!js name=KeySpaceTriggers api_version=1.0
redis.registerKeySpaceTrigger(
'updateProductStockQty',
'orders:orderId:', // Keys starting with this prefix are monitored
function (client, data) {
const errors = [];
try {
if (
client &&
data?.event == 'json.set' &&
data?.key != 'orders:orderId:index:hash'
) {
const orderId = data.key;
// get the order details from the orderId key
let result = client.call('JSON.GET', orderId);
result = result ? JSON.parse(result) : '';
const order = Array.isArray(result) ? result[0] : result;
if (order?.products?.length && !order.triggerProcessed) {
try {
//create a log stream to log the trigger events and errors
client.call(
'XGROUP',
'CREATE',
'TRIGGER_LOGS_STREAM',
'TRIGGER_LOGS_GROUP',
'$',
'MKSTREAM',
);
} catch (streamConErr) {
// if log stream already exists
}
// reduce stockQty for each product in the order
for (const product of order.products) {
let decreaseQtyBy = (-1 * product.qty).toString();
client.call(
'JSON.NUMINCRBY',
`products:productId:${product.productId}`,
'.stockQty',
decreaseQtyBy,
);
// add log entry
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
`For productId ${product.productId}, stockQty ${decreaseQtyBy}`,
'orderId',
orderId,
'function',
'updateProductStockQty',
);
}
// set triggerProcessed flag to avoid duplicate processing
client.call('JSON.SET', orderId, '.triggerProcessed', '1');
}
}
} catch (generalErr) {
generalErr = JSON.stringify(
generalErr,
Object.getOwnPropertyNames(generalErr),
);
errors.push(generalErr);
}
if (errors.length) {
//log error
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
JSON.stringify(errors),
'orderId',
data.key,
'function',
'updateProductStockQty',
);
}
},
);
在此脚本中,我们监听 orders:orderId:
键的变化。检测到新订单后,函数将检索订单详情并相应地减少订单中每个产品的库存数量。
我们可以使用多种方法将函数添加到 Redis
1. 使用 redis-cli
redis-cli -x TFUNCTION LOAD < ./key-space-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./key-space-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/key-space-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的触发器和函数部分,然后到库,并使用创建库来粘贴和保存您的函数。
在我们的演示中,通过点击 Buy Now
按钮下单会触发 createOrder
API,该 API 进而创建一个新的 orders:orderId:
键,从而激活 updateProductStockQty
函数。
示例 createOrder API 请求
POST http://:3000/orders/createOrder
{
"products": [
{
"productId": "11002",
"qty": 1,
"productPrice": 4950,
},
{
"productId": "11012",
"qty": 2,
"productPrice": 1195,
}
]
}
Redis 中的示例订单创建命令
"JSON.SET" "orders:orderId:24b38a47-2b7d-4c5d-ba25-b74749e34c65" "$" "{"products":[{"productId":"10381","qty":1,"productPrice":2499,"productData":{}},{"productId":"11030","qty":1,"productPrice":1099,"productData":{}}],"userId":"USR_f0f00a86-7131-40e1-9d89-765b4cc1927f","orderId":"24b38a47-2b7d-4c5d-ba25-b74749e34c65","orderStatusCode":1}"
此新键的创建触发 updateProductStockQty
,从而导致库存数量的调整。
在 TRIGGER_LOGS_STREAM
中监控触发器的活动,查看日志和潜在错误。
函数执行后,验证每个相关产品的 stockQty
是否已减少。
流触发器允许您监听 Redis 流,并在数据添加到流时执行函数。它常用于实时数据处理和事件驱动架构。
在我们的电子商务演示中,考虑一个需要计算产品销售统计数据的功能。我们将使用一个 Stream trigger
来实现,该触发器监听 TRANSACTION_STREAM
,并相应地更新销售统计数据。
我们将开发 calculateStats
函数,命名空间为 StreamTriggers
。此函数将负责根据订单详情计算销售统计数据。
#!js name=StreamTriggers api_version=1.0
redis.registerStreamTrigger(
'calculateStats', // trigger name
'TRANSACTION_STREAM', // Detects new data added to the stream
function (client, data) {
var streamEntry = {};
for (let i = 0; i < data.record?.length; i++) {
streamEntry[data.record[i][0]] = data.record[i][1];
}
streamEntry.transactionPipeline = JSON.parse(
streamEntry.transactionPipeline,
);
streamEntry.orderDetails = JSON.parse(streamEntry.orderDetails);
if (
streamEntry.transactionPipeline?.length == 1 &&
streamEntry.transactionPipeline[0] == 'PAYMENT_PROCESSED' &&
streamEntry.orderDetails
) {
//log
client.call(
'XADD',
'TRIGGER_LOGS_STREAM',
'*',
'message',
`${streamEntry.transactionPipeline}`,
'orderId',
`orders:orderId:${streamEntry.orderDetails.orderId}`,
'function',
'calculateStats',
);
const orderAmount = parseInt(streamEntry.orderDetails.orderAmount); //remove decimal
const products = streamEntry.orderDetails.products;
// sales
client.call('INCRBY', 'statsTotalPurchaseAmount', orderAmount.toString());
for (let product of products) {
const totalProductAmount =
parseInt(product.qty) * parseInt(product.productPrice);
// trending products
client.call(
'ZINCRBY',
'statsProductPurchaseQtySet',
product.qty.toString(),
product.productId,
);
// category wise purchase interest
const category = (
product.productData.masterCategory_typeName +
':' +
product.productData.subCategory_typeName
).toLowerCase();
client.call(
'ZINCRBY',
'statsCategoryPurchaseAmountSet',
totalProductAmount.toString(),
category,
);
// largest brand purchases
const brand = product.productData.brandName;
client.call(
'ZINCRBY',
'statsBrandPurchaseAmountSet',
totalProductAmount.toString(),
brand,
);
}
}
},
{
isStreamTrimmed: false, //whether the stream should be trimmed automatically after the data is processed by the consumer.
window: 1,
},
);
在上面的 calculateStats
函数中,我们正在监听 TRANSACTION_STREAM
并更新不同的销售统计数据,例如
statsTotalPurchaseAmount
变量存储总购买金额statsProductPurchaseQtySet
是一个有序集合,用于根据最高购买数量跟踪热门产品statsCategoryPurchaseAmountSet
是一个有序集合,用于跟踪按类别划分的购买兴趣statsBrandPurchaseAmountSet
是一个有序集合,用于跟踪最大品牌购买量我们可以使用多种方法将函数添加到 Redis
1. 使用 redis-cli
redis-cli -x TFUNCTION LOAD < ./stream-trigger.js
# or if you want to replace the function
redis-cli -x TFUNCTION LOAD REPLACE . < ./stream-trigger.js
2. 使用代码
import type { NodeRedisClientType } from './config.js';
import * as path from 'path';
import * as fs from 'fs/promises';
async function addTriggerToRedis(
fileRelativePath: string,
redisClient: NodeRedisClientType,
) {
const filePath = path.join(__dirname, fileRelativePath);
const fileData = await fs.readFile(filePath);
let jsCode = fileData.toString();
jsCode = jsCode.replace(/\r?\n/g, '\n');
try {
const result = await redisClient.sendCommand([
'TFUNCTION',
'LOAD',
'REPLACE',
jsCode,
]);
console.log(`addTriggersToRedis ${fileRelativePath}`, result);
} catch (err) {
console.log(err);
}
}
addTriggerToRedis('triggers/stream-trigger.js', redisClient);
3. 使用 RedisInsight
导航到 RedisInsight 中的 Triggers and Functions
部分,然后到 Libraries
,并使用创建库来粘贴和保存您的函数。
在我们的演示中,通过 Buy Now
按钮下单会创建一个新订单,涉及不同的交易步骤,例如交易风险评估、支付完成等。所有这些步骤以及订单详情都记录在 TRANSACTION_STREAM
中。
向流添加详情的示例代码
const addMessageToTransactionStream = async (message) => {
if (message) {
const streamKeyName = 'TRANSACTION_STREAM';
try {
const nodeRedisClient = getNodeRedisClient();
if (nodeRedisClient && message) {
const id = '*'; //* = auto generate
await nodeRedisClient.xAdd(streamKeyName, id, message);
}
} catch (err) {
console.error('addMessageToTransactionStream error !', err);
}
}
};
向流添加详情的示例命令
"XADD" "TRANSACTION_STREAM" "*" "action" "PAYMENT_PROCESSED" "userId" "USR_f0f00a86-7131" "orderDetails" "{'orderId':'bc438c5d-117e-41bd-97fa-943c03be0b1c','products':[],'paymentId':'clrrl8yp50007pf851m7f92u2'}" "transactionPipeline" "['PAYMENT_PROCESSED']"
calculateStats
函数监听 TRANSACTION_STREAM
流的 PAYMENT_PROCESSED
操作,并相应地更新销售统计数据。
在 RedisInsight 中检查触发器函数 calculateStats
中使用的不同统计变量值。
函数执行后,验证更新的管理仪表盘。
管理面板:通过顶部导航中的“admin”链接访问。在 UI 中查看购买统计数据和热门产品。
我们已经介绍了诸如 On-demand
、KeySpace
和 Stream
触发器等关键概念,并将它们应用于实际的电子商务场景中。Redis 的这些高级功能为数据处理和自动化开辟了无数的可能性,让您能够构建不仅更快而且更智能的应用。
随着你继续探索 Redis 及其不断发展的生态系统,请记住这些 触发器和函数
仅仅是一个开始。Redis 提供了丰富的功能集,可以创造性地组合起来解决复杂问题并提供高性能解决方案。
请点击您的文本内容,然后按回车。