以下是用于克隆本教程中使用的应用程序源代码的命令
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
在构建微服务应用程序时,人们使用各种选项来进行服务之间的通信。其中
在事件驱动的微服务架构中,您可能有一些服务发布 API,而其他服务只是事件的生产者和消费者,没有外部 API。
考虑以下场景:您有一个电子商务应用程序,其架构分解为不同的微服务,包括 创建订单
, 创建发票
, 处理付款
, 履行订单
等。微服务允许您将这些命令分离到不同的服务中,以便独立扩展,从而使更多客户能够快速并同时地处理他们的订单,这将带来更好的用户体验,更高的销售额,以及更少的脾气暴躁的客户服务人员。
使用微服务时,您需要一个用于服务间通信的工具。最初,您可能考虑使用像 Kafka 这样的产品进行流式传输,但设置起来相当复杂。很多人不知道 Redis 也可以像 Kafka 一样支持流式传输。鉴于您可能已经在使用 Redis 进行缓存,因此也使用它进行流处理是有意义的。为了降低应用程序架构和维护的复杂性, Redis 是服务间通信的绝佳选择。在这里,我们分解了使用 Redis 与流式传输进行服务间通信的过程。
本教程其余部分中讨论的电子商务微服务应用程序使用以下架构
产品服务
:处理从数据库中查询产品并将其返回给前端订单服务
:处理订单的验证和创建订单历史记录服务
:处理查询客户的订单历史记录付款服务
:处理订单付款数字身份服务
:处理数字身份存储和身份分数计算API 网关
:将服务统一到单个端点下MongoDB
:充当主要数据库,存储订单、订单历史记录、产品等Redis
:充当 流处理器 和缓存数据库此图说明了 Redis Streams 如何用作 订单服务
和 付款服务
之间的消息代理
Redis Streams 比使用 Kafka 或其他类似技术更具成本效益。凭借亚毫秒级的延迟和轻量级的 Streams 日志数据结构,Redis 更易于部署、开发和操作。
以下事件流图说明了 订单服务 和 付款服务 如何通过 Redis 与流式传输进行通信
让我们概述下面使用的流和事件
orders service
inserts order data into the database.//sample order data
{
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"products": [
{
"productId": 11000,
"qty": 3,
"productPrice": 3995,
"productData": {
"productDisplayName": "Puma Men Slick 3HD Yellow Black Watches",
"variantName": "Slick 3HD Yellow",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
},
{
"productId": 11001,
"qty": 2,
"productPrice": 5450,
"productData": {
"productDisplayName": "Puma Men Top Fluctuation Red Black Watches",
"variantName": "Top Fluctuation Red",
"brandName": "Puma",
"ageGroup": "Adults-Men",
"gender": "Men"
//...
}
}
],
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"orderStatusCode": 1, //order created
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
2. The orders service
also appends minimal data (orderId, orderAmount, and userId) to the ORDERS_STREAM
to signal new order creation (i.e., it acts as PRODUCER
of the ORDERS_STREAM
).
3. The payments service
listens to the ORDERS_STREAM
and processes payments for new orders, then inserts payment data into the database (i.e, it acts as the CONSUMER
of the ORDERS_STREAM
).
//sample payment data
{
"paymentId": "6403212956a976300afbaac1",
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"orderAmount": 22885,
"paidAmount": 22885,
"orderStatusCode": 3, //payment successful
"userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"createdOn": {
"$date": {
"$numberLong": "1677926697841"
}
},
"createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
"statusCode": 1
}
4. The payments service
appends minimal data (orderId, paymentId, orderStatusCode, and userId) to the PAYMENTS_STREAM
to signal a new payment (i.e., it acts as the PRODUCER
of the PAYMENTS_STREAM
).
5. The orders service
listens to the PAYMENTS_STREAM
and updates the orderStatus and paymentId for orders in the database accordingly as the order payment is fulfilled (i.e., it acts as the CONSUMER
of the PAYMENTS_STREAM
).
{
//order collection update
"orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
"paymentId": "6403212956a976300afbaac1",
"orderStatusCode": 3 //payment success
//...
}
该电子商务微服务应用程序由一个前端组成,该前端使用 Next.js 和 TailwindCSS构建。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB 中。您将在下面找到电子商务应用程序前端的屏幕截图
仪表盘
: 显示产品列表以及搜索功能购物车
: 将产品添加到购物车,然后使用“立即购买”按钮结账
订单历史记录
: 订单一旦下达,顶部导航栏中的 订单 链接将显示订单状态和历史记录
以下是用于克隆本教程中使用的应用程序源代码的命令
git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions
我们使用 Redis 来代理在 订单服务 和 支付服务之间发送的事件。
让我们看看订单服务中的一些代码,以了解其工作原理
订单服务
将最小数据追加到 ORDERS_STREAM
以发出新订单创建的信号。const addOrderIdToStream = async (
orderId: string,
orderAmount: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'ORDERS_STREAM';
const entry = {
orderId: orderId,
orderAmount: orderAmount.toFixed(2),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
3. 支付服务
监听 ORDERS_STREAM
// Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'ORDERS_STREAM'; //stream name
const groupName = 'ORDERS_CON_GROUP'; // listening consumer group name (custom)
const consumerName = 'PAYMENTS_CON'; // listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
// process the message received (in our case, perform payment)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: processPaymentForNewOrders,
});
const processPaymentForNewOrders: IMessageHandler = async (
message,
messageId,
) => {
/*
message = {
orderId: "",
orderAmount: "",
userId: "",
}
*/
// process payment for new orderId and insert "payments" data to database
};
这里有几点需要注意
isolated: true
, 以便在 隔离执行 模式下使用 XREADGROUP
的阻塞版本。4. 支付服务将最小数据追加到 PAYMENTS_STREAM 以发出付款已完成的信号。
const addPaymentIdToStream = async (
orderId: string,
paymentId: string,
orderStatus: number,
userId: string,
) => {
const nodeRedisClient = getNodeRedisClient();
if (orderId && nodeRedisClient) {
const streamKeyName = 'PAYMENTS_STREAM';
const entry = {
orderId: orderId,
paymentId: paymentId,
orderStatusCode: orderStatus.toString(),
userId: userId,
};
const id = '*'; //* = auto generate
//xAdd adds entry to specified stream
await nodeRedisClient.xAdd(streamKeyName, id, entry);
}
};
5. 订单服务监听 PAYMENTS_STREAM 并在付款完成时更新订单。
//Below is some code for how you would use Redis to listen for the stream events:
async function listenToStream(
onMessage: (message: any, messageId: string) => Promise<void>,
) {
// using node-redis
const redis = getNodeRedisClient();
const streamKeyName = 'PAYMENTS_STREAM'; //stream name
const groupName = 'PAYMENTS_CON_GROUP'; //listening consumer group name (custom)
const consumerName = 'ORDERS_CON'; //listening consumer name (custom)
const readMaxCount = 100;
// Check if the stream group already exists
if (!(await redis.exists(streamKeyName))) {
const idPosition = '0'; //0 = start, $ = end or any specific id
await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
MKSTREAM: true,
});
}
// setup a loop to listen for stream events
while (true) {
// read set of messages from different streams
const dataArr = await nodeRedisClient.xReadGroup(
commandOptions({
isolated: true,
}),
groupName,
consumerName,
[
{
// you can specify multiple streams in array
key: streamKeyName,
id: '>', // Next entry ID that no consumer in this group has read
},
],
{
COUNT: readMaxCount, // Read n entries at a time
BLOCK: 0, // block for 0 (infinite) seconds if there are none.
},
);
for (let data of dataArr) {
for (let messageItem of data.messages) {
//process the message received (in our case, updateOrderStatus)
await onMessage(messageItem.message, messageItem.id);
// acknowledge individual messages after processing
nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
}
}
}
}
// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
onMessage: updateOrderStatus,
});
const updateOrderStatus: IMessageHandler = async (message, messageId) => {
/*
message = {
orderId: "",
paymentId: "",
orderStatusCode:"",
userId: "",
}
*/
// updates orderStatus and paymentId in database accordingly for the order which has fulfilled payment
// updateOrderStatusInRedis(orderId,paymentId,orderStatusCode,userId)
// updateOrderStatusInMongoDB(orderId,paymentId,orderStatusCode,userId)
};
验证所有传入消息以确保您可以使用它们,这是一个最佳实践。
出于应用程序的目的,我们在同一服务中调用更新 Redis 和主数据库中的订单状态(为简单起见,我们没有使用数据库之间的任何同步技术,而是专注于如何在 Redis 中存储和访问数据)。另一个常见模式是让您的服务写入一个数据库,然后分别使用 CDC 机制来更新另一个数据库。例如,您可以直接写入 Redis,然后使用 触发器和函数 来处理在后台同步 Redis 和主数据库。
如果您使用 Redis 云,您会发现 Redis Streams 在您已经用于缓存的相同多租户数据平台上可用。Redis 云还具有高可用性、消息持久性、对多个客户端的支持以及主/从数据复制的弹性……所有这些都是内置的。
就这么简单!您现在知道如何将 Redis 用于流式传输,既作为生产者,也作为消费者。希望您可以从本教程中获得一些灵感,并将其应用于您自己的事件流式传输应用程序。有关此主题的更多信息,请查看下面的附加资源