以下是用于克隆本教程中应用源代码的命令
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
构建微服务应用时,人们使用多种选项来实现服务之间的通信。其中包括:
在事件驱动的微服务架构中,您可能有一些服务发布 API,而其他服务则只是事件的生产者和消费者,没有外部 API。
考虑以下场景:您有一个电商应用,其架构被分解为不同的微服务,包括 创建订单
、创建发票
、处理支付
、履行订单
等等。微服务允许您将这些命令分离到不同的服务中,实现独立扩展,从而让更多客户能够快速同时处理他们的订单,从而带来更好的用户体验、更高的销量,并减少脾气暴躁的客服人员。
使用微服务时,您需要一个用于服务间通信的工具。最初,您可能会考虑使用 Kafka 等产品进行流处理,但设置起来相当复杂。许多人不知道的是,Redis 也以与 Kafka 相同的方式支持 streams。考虑到您可能已经在将 Redis 用于缓存,将其用于流处理也是有意义的。为了降低应用架构和维护的复杂性,Redis 是服务间通信的绝佳选择。在此,我们分解了使用 Redis 和 streams 进行服务间通信的过程。
本教程其余部分讨论的电商微服务应用使用以下架构:
产品服务
:处理从数据库查询产品并将其返回给前端订单服务
:处理验证和创建订单订单历史服务
:处理查询客户的订单历史记录支付服务
:处理订单支付数字身份服务
:处理存储数字身份和计算身份评分API 网关
:将服务统一到一个端点下mongodb
:作为主数据库,存储订单、订单历史记录、产品等redis
:作为 流处理器 和缓存数据库此图说明了 Redis Streams 如何用作 订单服务
和 支付服务
之间的消息代理
Redis Streams 比使用 Kafka 或其他类似技术更具成本效益。凭借亚毫秒级延迟和轻量级的 Streams 日志数据结构,Redis 更易于部署、开发和运维。
以下事件流图说明了订单服务和支付服务如何通过 Redis 和 streams 进行通信
下面我们概述使用的 streams 和事件:
//sample order data
{
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"products": [
{
"productId": 11000,
"qty": 3,
"productPrice": 3995,
"productData": {
"productDisplayName": "Puma Men Slick 3HD Yellow Black Watches",
"variantName": "Slick 3HD Yellow",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
},
{
"productId": 11001,
"qty": 2,
"productPrice": 5450,
"productData": {
"productDisplayName": "Puma Men Top Fluctuation Red Black Watches",
"variantName": "Top Fluctuation Red",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
}
],
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"orderStatusCode": 1, //order created
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
2. 订单服务还将最少的数据(orderId、orderAmount 和 userId)附加到 ORDERS_STREAM
中,以表示创建新订单(即,它充当 ORDERS_STREAM
的 生产者
)。
3. 支付服务监听 ORDERS_STREAM
并处理新订单的支付,然后将支付数据插入数据库(即,它充当 ORDERS_STREAM
的 消费者
)。
//sample payment data
{
"paymentId": "6403212956a976300afbaac1",
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"orderAmount": 22885,
"paidAmount": 22885,
"orderStatusCode": 3, //payment successful
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"createdOn": {
"$date": {
"$numberLong": "1677926697841"
}
},
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
4. 支付服务将最少的数据(orderId、paymentId、orderStatusCode 和 userId)附加到 PAYMENTS_STREAM
中,以表示新的支付(即,它充当 PAYMENTS_STREAM
的 生产者
)。
5. 订单服务监听 PAYMENTS_STREAM
,并根据订单支付的完成情况相应地更新数据库中订单的 orderStatus 和 paymentId(即,它充当 PAYMENTS_STREAM
的 消费者
)。
{
//order collection update
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"paymentId": "6403212956a976300afbaac1",
"orderStatusCode": 3 //payment success
//...
}
电商微服务应用包含一个前端,使用 Next.js 和 TailwindCSS 构建。应用后端使用 Node.js。数据存储在 Redis 和 MongoDB 中。您将在下方找到电商应用前端的截图:
仪表盘
:显示带有搜索功能的产品列表购物车
:将产品添加到购物车,然后使用“立即购买”按钮结账
订单历史
:下单后,顶部导航栏中的“订单”链接会显示订单状态和历史记录
以下是用于克隆本教程中应用源代码的命令
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
我们使用 Redis 在订单服务和支付服务之间代理事件。
让我们看看订单服务中的一些代码来了解它是如何工作的:
订单服务
将最少的数据附加到 ORDERS_STREAM
中,以表示创建新订单。const addOrderIdToStream = async (
orderId: string,
orderAmount: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'ORDERS_STREAM';
const entry = {
orderId: orderId,
orderAmount: orderAmount.toFixed(2),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
3. 支付服务监听 ORDERS_STREAM
// Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'ORDERS_STREAM'; //stream name
const groupName = 'ORDERS_CON_GROUP'; // listening consumer group name (custom)
const consumerName = 'PAYMENTS_CON'; // listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
// process the message received (in our case, perform payment)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: processPaymentForNewOrders,
});
const processPaymentForNewOrders: IMessageHandler = async (
message,
messageId,
) => {
/*
message = {
orderId: "",
orderAmount: "",
userId: "",
}
*/
// process payment for new orderId and insert "payments" data to database
};
这里有几点重要注意事项:
isolated: true
,以便在孤立执行模式下使用 XREADGROUP
的阻塞版本。4. 支付服务将最少的数据附加到 PAYMENTS_STREAM 中,以表示支付已完成。
const addPaymentIdToStream = async (
orderId: string,
paymentId: string,
orderStatus: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'PAYMENTS_STREAM';
const entry = {
orderId: orderId,
paymentId: paymentId,
orderStatusCode: orderStatus.toString(),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
5. 订单服务监听 PAYMENTS_STREAM,并在支付完成后更新订单。
//Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'PAYMENTS_STREAM'; //stream name
const groupName = 'PAYMENTS_CON_GROUP'; //listening consumer group name (custom)
const consumerName = 'ORDERS_CON'; //listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
//process the message received (in our case, updateOrderStatus)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: updateOrderStatus,
});
const updateOrderStatus: IMessageHandler = async (message, messageId) => {
/*
message = {
orderId: "",
paymentId: "",
orderStatusCode:"",
userId: "",
}
*/
// updates orderStatus and paymentId in database accordingly for the order which has fulfilled payment
// updateOrderStatusInRedis(orderId,paymentId,orderStatusCode,userId)
// updateOrderStatusInMongoDB(orderId,paymentId,orderStatusCode,userId)
};
最佳实践是验证所有接收到的消息,确保可以对其进行处理。
为了本应用的演示目的,我们在同一服务中调用以更新 Redis 和主数据库中的订单状态(为简化起见,我们没有使用数据库之间的任何同步技术,而是专注于数据在 Redis 中的存储和访问方式)。另一种常见模式是让您的服务写入一个数据库,然后单独使用 CDC 机制更新另一个数据库。例如,您可以直接写入 Redis,然后使用 触发器和函数 在后台处理 Redis 和主数据库的同步。
如果您使用 Redis Cloud,您会发现 Redis Streams 可用于您已用于缓存的同一多租户数据平台。Redis Cloud 还内置了高可用性、消息持久性、对多种客户端的支持以及主/备数据复制带来的弹性……所有功能都已集成。
就是这样!现在您知道如何将 Redis 用作生产者和消费者进行流处理了。希望本教程能给您一些启发,并将其应用于您自己的事件流应用。有关此主题的更多信息,请查看下面的其他资源:
Redis Streams
使用 Redis 的微服务
通用