学习

微服务与 Redis Streams 通信

Will Johnston
作者
Will Johnston, Redis 开发者增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 技术解决方案开发者
GITHUB 代码

以下是用于克隆本教程中应用源代码的命令

git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

什么是服务间通信?#

构建微服务应用时,人们使用多种选项来实现服务之间的通信。其中包括:

  1. 1.发布/订阅 模型:在发布/订阅模型(即发即弃)中,发布者产生消息,而当时处于活动状态的订阅者消费这些消息。不活跃的订阅者之后无法接收消息。
  2. 2.流处理:大多数微服务应用使用事件流解决方案,原因如下:
  • 消息持久性:与发布/订阅模型不同,存储在 streams 中的消息可以随时由多个消费者读取;它们会扇出。因此,即使消费者在消息最初被附加到 stream 时不活跃,也可以在稍后时间点读取消息。
  • 固有的可重放性:即使订阅者在消息处理过程中崩溃,它也可以从 stream 中重新读取完全相同的未确认消息。例如,假设崩溃的订阅者永远不会重新上线;消费者组功能允许消费者在指定时间后处理其他消费者的未确认消息。
  • 关注点分离:生产者可以高速单独地向 stream 生产消息,而消费者可以按照自己的速度单独处理消息。这种关注点分离解决了“快生产者 -> 慢消费者”和“慢生产者 -> 快消费者”的问题,允许这些服务独立地进行扩展。

在事件驱动的微服务架构中,您可能有一些服务发布 API,而其他服务则只是事件的生产者和消费者,没有外部 API。

为何应使用 Redis 进行服务间通信#

考虑以下场景:您有一个电商应用,其架构被分解为不同的微服务,包括 创建订单创建发票处理支付履行订单 等等。微服务允许您将这些命令分离到不同的服务中,实现独立扩展,从而让更多客户能够快速同时处理他们的订单,从而带来更好的用户体验、更高的销量,并减少脾气暴躁的客服人员。

使用微服务时,您需要一个用于服务间通信的工具。最初,您可能会考虑使用 Kafka 等产品进行流处理,但设置起来相当复杂。许多人不知道的是,Redis 也以与 Kafka 相同的方式支持 streams。考虑到您可能已经在将 Redis 用于缓存,将其用于流处理也是有意义的。为了降低应用架构和维护的复杂性,Redis 是服务间通信的绝佳选择。在此,我们分解了使用 Redis 和 streams 进行服务间通信的过程。

电商应用的微服务架构#

本教程其余部分讨论的电商微服务应用使用以下架构:

  1. 1.产品服务:处理从数据库查询产品并将其返回给前端
  2. 2.订单服务:处理验证和创建订单
  3. 3.订单历史服务:处理查询客户的订单历史记录
  4. 4.支付服务:处理订单支付
  5. 5.数字身份服务:处理存储数字身份和计算身份评分
  6. 6.API 网关:将服务统一到一个端点下
  7. 7.mongodb:作为主数据库,存储订单、订单历史记录、产品等
  8. 8.redis:作为 流处理器 和缓存数据库

此图说明了 Redis Streams 如何用作 订单服务支付服务 之间的消息代理

提示

Redis Streams 比使用 Kafka 或其他类似技术更具成本效益。凭借亚毫秒级延迟和轻量级的 Streams 日志数据结构,Redis 更易于部署、开发和运维。

在事件驱动架构中使用 Redis 进行服务间通信#

以下事件流图说明了订单服务和支付服务如何通过 Redis 和 streams 进行通信

下面我们概述使用的 streams 和事件:

  1. 1.订单服务将订单数据插入数据库。
//sample order data
{
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "products": [
    {
      "productId": 11000,
      "qty": 3,
      "productPrice": 3995,
      "productData": {
        "productDisplayName": "Puma Men Slick 3HD Yellow Black Watches",
        "variantName": "Slick 3HD Yellow",
        "brandName": "Puma",
        "ageGroup": "Adults-Men",
        "gender": "Men"
        //...
      }
    },
    {
      "productId": 11001,
      "qty": 2,
      "productPrice": 5450,
      "productData": {
        "productDisplayName": "Puma Men Top Fluctuation Red Black Watches",
        "variantName": "Top Fluctuation Red",
        "brandName": "Puma",
        "ageGroup": "Adults-Men",
        "gender": "Men"
        //...
      }
    }
  ],
  "userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "orderStatusCode": 1, //order created
  "createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "statusCode": 1
}

2. 订单服务还将最少的数据(orderId、orderAmount 和 userId)附加到 ORDERS_STREAM 中,以表示创建新订单(即,它充当 ORDERS_STREAM生产者)。

3. 支付服务监听 ORDERS_STREAM 并处理新订单的支付,然后将支付数据插入数据库(即,它充当 ORDERS_STREAM消费者)。

//sample payment data
{
  "paymentId": "6403212956a976300afbaac1",
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "orderAmount": 22885,
  "paidAmount": 22885,
  "orderStatusCode": 3, //payment successful
  "userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "createdOn": {
    "$date": {
      "$numberLong": "1677926697841"
    }
  },
  "createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "statusCode": 1
}

4. 支付服务将最少的数据(orderId、paymentId、orderStatusCode 和 userId)附加到 PAYMENTS_STREAM 中,以表示新的支付(即,它充当 PAYMENTS_STREAM生产者)。

5. 订单服务监听 PAYMENTS_STREAM,并根据订单支付的完成情况相应地更新数据库中订单的 orderStatus 和 paymentId(即,它充当 PAYMENTS_STREAM消费者)。

{
  //order collection update
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "paymentId": "6403212956a976300afbaac1",
  "orderStatusCode": 3 //payment success
  //...
}

使用 Next.js 和 Tailwind 的电商应用前端#

电商微服务应用包含一个前端,使用 Next.jsTailwindCSS 构建。应用后端使用 Node.js。数据存储在 Redis 和 MongoDB 中。您将在下方找到电商应用前端的截图:

  • 仪表盘:显示带有搜索功能的产品列表

购物车:将产品添加到购物车,然后使用“立即购买”按钮结账

订单历史:下单后,顶部导航栏中的“订单”链接会显示订单状态和历史记录

GITHUB 代码

以下是用于克隆本教程中应用源代码的命令

git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

使用 Redis 构建服务间通信应用#

我们使用 Redis 在订单服务和支付服务之间代理事件。

生产者 1(订单服务)#

让我们看看订单服务中的一些代码来了解它是如何工作的:

  1. 1.创建订单。
  2. 2.订单创建后,订单服务 将最少的数据附加到 ORDERS_STREAM 中,以表示创建新订单。
server/src/services/orders/src/service-impl.ts
const addOrderIdToStream = async (
  orderId: string,
  orderAmount: number,
  userId: string,
) => {
  const nodeRedisClient = getNodeRedisClient();
  if (orderId && nodeRedisClient) {
    const streamKeyName = 'ORDERS_STREAM';
    const entry = {
      orderId: orderId,
      orderAmount: orderAmount.toFixed(2),
      userId: userId,
    };
    const id = '*'; //* = auto generate
    //xAdd adds entry to specified stream
    await nodeRedisClient.xAdd(streamKeyName, id, entry);
  }
};

消费者 1(支付服务)#

3. 支付服务监听 ORDERS_STREAM

server/src/services/payments/src/service-impl.ts
// Below is some code for how you would use Redis to listen for the stream events:

async function listenToStream(
  onMessage: (message: any, messageId: string) => Promise<void>,
) {
  // using node-redis
  const redis = getNodeRedisClient();
  const streamKeyName = 'ORDERS_STREAM'; //stream name
  const groupName = 'ORDERS_CON_GROUP'; // listening consumer group name (custom)
  const consumerName = 'PAYMENTS_CON'; // listening consumer name (custom)
  const readMaxCount = 100;

  // Check if the stream group already exists
  if (!(await redis.exists(streamKeyName))) {
    const idPosition = '0'; //0 = start, $ = end or any specific id
    await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
      MKSTREAM: true,
    });
  }

  // setup a loop to listen for stream events
  while (true) {
    // read set of messages from different streams
    const dataArr = await nodeRedisClient.xReadGroup(
      commandOptions({
        isolated: true,
      }),
      groupName,
      consumerName,
      [
        {
          // you can specify multiple streams in array
          key: streamKeyName,
          id: '>', // Next entry ID that no consumer in this group has read
        },
      ],
      {
        COUNT: readMaxCount, // Read n entries at a time
        BLOCK: 0, // block for 0 (infinite) seconds if there are none.
      },
    );

    for (let data of dataArr) {
      for (let messageItem of data.messages) {
        // process the message received (in our case, perform payment)
        await onMessage(messageItem.message, messageItem.id);

        // acknowledge individual messages after processing
        nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
      }
    }
  }
}

// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
  onMessage: processPaymentForNewOrders,
});

const processPaymentForNewOrders: IMessageHandler = async (
  message,
  messageId,
) => {
  /*
   message = {
      orderId: "",
      orderAmount: "",
      userId: "",
    }
    */
  // process payment for new orderId and insert "payments" data to database
};
注意

这里有几点重要注意事项:

  1. 1.确保 stream 组在创建之前不存在。
  2. 2.使用 isolated: true,以便在孤立执行模式下使用 XREADGROUP 的阻塞版本。
  3. 3.处理完消息后,确认单个消息,以将其从待处理订单队列中移除,并避免重复处理。

生产者 2(支付服务)#

4. 支付服务将最少的数据附加到 PAYMENTS_STREAM 中,以表示支付已完成。

server/src/services/payments/src/service-impl.ts
const addPaymentIdToStream = async (
  orderId: string,
  paymentId: string,
  orderStatus: number,
  userId: string,
) => {
  const nodeRedisClient = getNodeRedisClient();
  if (orderId && nodeRedisClient) {
    const streamKeyName = 'PAYMENTS_STREAM';
    const entry = {
      orderId: orderId,
      paymentId: paymentId,
      orderStatusCode: orderStatus.toString(),
      userId: userId,
    };
    const id = '*'; //* = auto generate
    //xAdd adds entry to specified stream
    await nodeRedisClient.xAdd(streamKeyName, id, entry);
  }
};

消费者 2(订单服务)#

5. 订单服务监听 PAYMENTS_STREAM,并在支付完成后更新订单。

server/src/services/orders/src/service-impl.ts
//Below is some code for how you would use Redis to listen for the stream events:

async function listenToStream(
  onMessage: (message: any, messageId: string) => Promise<void>,
) {
  // using node-redis
  const redis = getNodeRedisClient();
  const streamKeyName = 'PAYMENTS_STREAM'; //stream name
  const groupName = 'PAYMENTS_CON_GROUP'; //listening consumer group name (custom)
  const consumerName = 'ORDERS_CON'; //listening consumer name (custom)
  const readMaxCount = 100;

  // Check if the stream group already exists
  if (!(await redis.exists(streamKeyName))) {
    const idPosition = '0'; //0 = start, $ = end or any specific id
    await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
      MKSTREAM: true,
    });
  }

  // setup a loop to listen for stream events
  while (true) {
    // read set of messages from different streams
    const dataArr = await nodeRedisClient.xReadGroup(
      commandOptions({
        isolated: true,
      }),
      groupName,
      consumerName,
      [
        {
          // you can specify multiple streams in array
          key: streamKeyName,
          id: '>', // Next entry ID that no consumer in this group has read
        },
      ],
      {
        COUNT: readMaxCount, // Read n entries at a time
        BLOCK: 0, // block for 0 (infinite) seconds if there are none.
      },
    );

    for (let data of dataArr) {
      for (let messageItem of data.messages) {
        //process the message received (in our case, updateOrderStatus)
        await onMessage(messageItem.message, messageItem.id);

        // acknowledge individual messages after processing
        nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
      }
    }
  }
}

// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
  onMessage: updateOrderStatus,
});

const updateOrderStatus: IMessageHandler = async (message, messageId) => {
  /*
   message = {
      orderId: "",
      paymentId: "",
      orderStatusCode:"",
      userId: "",
    }
    */
  // updates orderStatus and paymentId in database accordingly for the order which has fulfilled payment
  // updateOrderStatusInRedis(orderId,paymentId,orderStatusCode,userId)
  // updateOrderStatusInMongoDB(orderId,paymentId,orderStatusCode,userId)
};
提示

最佳实践是验证所有接收到的消息,确保可以对其进行处理。

为了本应用的演示目的,我们在同一服务中调用以更新 Redis 和主数据库中的订单状态(为简化起见,我们没有使用数据库之间的任何同步技术,而是专注于数据在 Redis 中的存储和访问方式)。另一种常见模式是让您的服务写入一个数据库,然后单独使用 CDC 机制更新另一个数据库。例如,您可以直接写入 Redis,然后使用 触发器和函数 在后台处理 Redis 和主数据库的同步。

提示

如果您使用 Redis Cloud,您会发现 Redis Streams 可用于您已用于缓存的同一多租户数据平台。Redis Cloud 还内置了高可用性、消息持久性、对多种客户端的支持以及主/备数据复制带来的弹性……所有功能都已集成。

准备好使用 Redis 进行流处理了吗?#

就是这样!现在您知道如何将 Redis 用作生产者和消费者进行流处理了。希望本教程能给您一些启发,并将其应用于您自己的事件流应用。有关此主题的更多信息,请查看下面的其他资源:

更多资源#

Redis Streams

使用 Redis 的微服务

通用