学习

微服务与Redis Streams的通信

Will Johnston
作者
Will Johnston, Redis 的开发人员增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 的技术解决方案开发人员
GITHUB 代码

以下是用于克隆本教程中使用的应用程序源代码的命令

git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

什么是服务间通信?#

在构建微服务应用程序时,人们使用各种选项来进行服务之间的通信。其中

  1. 1.发布/订阅 模型:在发布/订阅模型(即发即忘)中,发布者生成消息,而当时处于活动状态的订阅者会消费这些消息。处于非活动状态的订阅者不能在以后的时间点接收这些消息。
  2. 2.流式传输:大多数微服务应用程序使用事件流式传输解决方案,因为:
  • 消息持久性:与发布/订阅模型不同,存储在流中的消息可以随时被多个消费者读取;它们会扇出。因此,即使消费者在消息最初附加到流时处于非活动状态,它们也可以在以后的时间点读取消息。
  • 固有的可重放性:即使订阅者在消息处理期间崩溃,它也可以从流中重新读取完全相同未确认的消息。例如,假设崩溃的订阅者从未恢复在线状态;消费者组功能允许消费者在指定时间后处理其他消费者的未确认消息。
  • 关注点分离:生产者可以以 高速 独立地将消息生产到流中,消费者可以独立地以自己的速度处理消息。这种关注点分离解决了“快速生产者 -> 慢速消费者”和“慢速生产者 -> 快速消费者”问题,允许这些服务独立扩展。

在事件驱动的微服务架构中,您可能有一些服务发布 API,而其他服务只是事件的生产者和消费者,没有外部 API。

为什么要将 Redis 用于服务间通信?#

考虑以下场景:您有一个电子商务应用程序,其架构分解为不同的微服务,包括 创建订单, 创建发票, 处理付款, 履行订单等。微服务允许您将这些命令分离到不同的服务中,以便独立扩展,从而使更多客户能够快速并同时地处理他们的订单,这将带来更好的用户体验,更高的销售额,以及更少的脾气暴躁的客户服务人员。

使用微服务时,您需要一个用于服务间通信的工具。最初,您可能考虑使用像 Kafka 这样的产品进行流式传输,但设置起来相当复杂。很多人不知道 Redis 也可以像 Kafka 一样支持流式传输。鉴于您可能已经在使用 Redis 进行缓存,因此也使用它进行流处理是有意义的。为了降低应用程序架构和维护的复杂性, Redis 是服务间通信的绝佳选择。在这里,我们分解了使用 Redis 与流式传输进行服务间通信的过程。

电子商务应用程序的微服务架构#

本教程其余部分中讨论的电子商务微服务应用程序使用以下架构

  1. 1.产品服务:处理从数据库中查询产品并将其返回给前端
  2. 2.订单服务:处理订单的验证和创建
  3. 3.订单历史记录服务:处理查询客户的订单历史记录
  4. 4.付款服务:处理订单付款
  5. 5.数字身份服务:处理数字身份存储和身份分数计算
  6. 6.API 网关:将服务统一到单个端点下
  7. 7.MongoDB:充当主要数据库,存储订单、订单历史记录、产品等
  8. 8.Redis:充当 流处理器 和缓存数据库

此图说明了 Redis Streams 如何用作 订单服务 和 付款服务 之间的消息代理

提示

Redis Streams 比使用 Kafka 或其他类似技术更具成本效益。凭借亚毫秒级的延迟和轻量级的 Streams 日志数据结构,Redis 更易于部署、开发和操作。

在事件驱动的架构中使用 Redis 进行服务间通信#

以下事件流图说明了 订单服务 和 付款服务 如何通过 Redis 与流式传输进行通信

让我们概述下面使用的流和事件

  1. 1.The orders service inserts order data into the database.
//sample order data
{
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "products": [
    {
      "productId": 11000,
      "qty": 3,
      "productPrice": 3995,
      "productData": {
        "productDisplayName": "Puma Men Slick 3HD Yellow Black Watches",
        "variantName": "Slick 3HD Yellow",
        "brandName": "Puma",
        "ageGroup": "Adults-Men",
        "gender": "Men"
        //...
      }
    },
    {
      "productId": 11001,
      "qty": 2,
      "productPrice": 5450,
      "productData": {
        "productDisplayName": "Puma Men Top Fluctuation Red Black Watches",
        "variantName": "Top Fluctuation Red",
        "brandName": "Puma",
        "ageGroup": "Adults-Men",
        "gender": "Men"
        //...
      }
    }
  ],
  "userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "orderStatusCode": 1, //order created
  "createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "statusCode": 1
}

2. The orders service also appends minimal data (orderId, orderAmount, and userId) to the ORDERS_STREAM to signal new order creation (i.e., it acts as PRODUCER of the ORDERS_STREAM).

3. The payments service listens to the ORDERS_STREAM and processes payments for new orders, then inserts payment data into the database (i.e, it acts as the CONSUMER of the ORDERS_STREAM).

//sample payment data
{
  "paymentId": "6403212956a976300afbaac1",
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "orderAmount": 22885,
  "paidAmount": 22885,
  "orderStatusCode": 3, //payment successful
  "userId": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "createdOn": {
    "$date": {
      "$numberLong": "1677926697841"
    }
  },
  "createdBy": "USR_4e7acc44-e91e-4c5c-9112-bdd99d799dd3",
  "statusCode": 1
}

4. The payments service appends minimal data (orderId, paymentId, orderStatusCode, and userId) to the PAYMENTS_STREAM to signal a new payment (i.e., it acts as the PRODUCER of the PAYMENTS_STREAM).

5. The orders service listens to the PAYMENTS_STREAM and updates the orderStatus and paymentId for orders in the database accordingly as the order payment is fulfilled (i.e., it acts as the CONSUMER of the PAYMENTS_STREAM).

{
  //order collection update
  "orderId": "01GTP3K2TZQQCQ0T2G43DSSMTD",
  "paymentId": "6403212956a976300afbaac1",
  "orderStatusCode": 3 //payment success
  //...
}

使用 Next.js 和 Tailwind 的电子商务应用程序前端#

该电子商务微服务应用程序由一个前端组成,该前端使用 Next.js 和 TailwindCSS构建。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB 中。您将在下面找到电子商务应用程序前端的屏幕截图

  • 仪表盘: 显示产品列表以及搜索功能

购物车: 将产品添加到购物车,然后使用“立即购买”按钮结账

订单历史记录: 订单一旦下达,顶部导航栏中的 订单 链接将显示订单状态和历史记录

GITHUB 代码

以下是用于克隆本教程中使用的应用程序源代码的命令

git clone --branch v1.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

使用 Redis 构建一个服务间通信应用程序#

我们使用 Redis 来代理在 订单服务 和 支付服务之间发送的事件。

生产者 1 (订单服务)#

让我们看看订单服务中的一些代码,以了解其工作原理

  1. 1.创建订单。
  2. 2.创建订单后, 订单服务 将最小数据追加到 ORDERS_STREAM 以发出新订单创建的信号。
server/src/services/orders/src/service-impl.ts
const addOrderIdToStream = async (
  orderId: string,
  orderAmount: number,
  userId: string,
) => {
  const nodeRedisClient = getNodeRedisClient();
  if (orderId && nodeRedisClient) {
    const streamKeyName = 'ORDERS_STREAM';
    const entry = {
      orderId: orderId,
      orderAmount: orderAmount.toFixed(2),
      userId: userId,
    };
    const id = '*'; //* = auto generate
    //xAdd adds entry to specified stream
    await nodeRedisClient.xAdd(streamKeyName, id, entry);
  }
};

消费者 1 (支付服务)#

3.  支付服务 监听 ORDERS_STREAM

server/src/services/payments/src/service-impl.ts
// Below is some code for how you would use Redis to listen for the stream events:

async function listenToStream(
  onMessage: (message: any, messageId: string) => Promise<void>,
) {
  // using node-redis
  const redis = getNodeRedisClient();
  const streamKeyName = 'ORDERS_STREAM'; //stream name
  const groupName = 'ORDERS_CON_GROUP'; // listening consumer group name (custom)
  const consumerName = 'PAYMENTS_CON'; // listening consumer name (custom)
  const readMaxCount = 100;

  // Check if the stream group already exists
  if (!(await redis.exists(streamKeyName))) {
    const idPosition = '0'; //0 = start, $ = end or any specific id
    await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
      MKSTREAM: true,
    });
  }

  // setup a loop to listen for stream events
  while (true) {
    // read set of messages from different streams
    const dataArr = await nodeRedisClient.xReadGroup(
      commandOptions({
        isolated: true,
      }),
      groupName,
      consumerName,
      [
        {
          // you can specify multiple streams in array
          key: streamKeyName,
          id: '>', // Next entry ID that no consumer in this group has read
        },
      ],
      {
        COUNT: readMaxCount, // Read n entries at a time
        BLOCK: 0, // block for 0 (infinite) seconds if there are none.
      },
    );

    for (let data of dataArr) {
      for (let messageItem of data.messages) {
        // process the message received (in our case, perform payment)
        await onMessage(messageItem.message, messageItem.id);

        // acknowledge individual messages after processing
        nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
      }
    }
  }
}

// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
  onMessage: processPaymentForNewOrders,
});

const processPaymentForNewOrders: IMessageHandler = async (
  message,
  messageId,
) => {
  /*
   message = {
      orderId: "",
      orderAmount: "",
      userId: "",
    }
    */
  // process payment for new orderId and insert "payments" data to database
};
注意

这里有几点需要注意

  1. 1.确保在创建流组之前,该流组不存在。
  2. 2.使用 isolated: true, 以便在 隔离执行 模式下使用 XREADGROUP 的阻塞版本。
  3. 3.在处理完单个消息后确认它们,以将消息从待处理订单队列中删除,并避免重复处理。

生产者 2 (支付服务)#

4. 支付服务将最小数据追加到 PAYMENTS_STREAM 以发出付款已完成的信号。

server/src/services/payments/src/service-impl.ts
const addPaymentIdToStream = async (
  orderId: string,
  paymentId: string,
  orderStatus: number,
  userId: string,
) => {
  const nodeRedisClient = getNodeRedisClient();
  if (orderId && nodeRedisClient) {
    const streamKeyName = 'PAYMENTS_STREAM';
    const entry = {
      orderId: orderId,
      paymentId: paymentId,
      orderStatusCode: orderStatus.toString(),
      userId: userId,
    };
    const id = '*'; //* = auto generate
    //xAdd adds entry to specified stream
    await nodeRedisClient.xAdd(streamKeyName, id, entry);
  }
};

消费者 2 (订单服务)#

5. 订单服务监听 PAYMENTS_STREAM 并在付款完成时更新订单。

server/src/services/orders/src/service-impl.ts
//Below is some code for how you would use Redis to listen for the stream events:

async function listenToStream(
  onMessage: (message: any, messageId: string) => Promise<void>,
) {
  // using node-redis
  const redis = getNodeRedisClient();
  const streamKeyName = 'PAYMENTS_STREAM'; //stream name
  const groupName = 'PAYMENTS_CON_GROUP'; //listening consumer group name (custom)
  const consumerName = 'ORDERS_CON'; //listening consumer name (custom)
  const readMaxCount = 100;

  // Check if the stream group already exists
  if (!(await redis.exists(streamKeyName))) {
    const idPosition = '0'; //0 = start, $ = end or any specific id
    await nodeRedisClient.xGroupCreate(streamKeyName, groupName, idPosition, {
      MKSTREAM: true,
    });
  }

  // setup a loop to listen for stream events
  while (true) {
    // read set of messages from different streams
    const dataArr = await nodeRedisClient.xReadGroup(
      commandOptions({
        isolated: true,
      }),
      groupName,
      consumerName,
      [
        {
          // you can specify multiple streams in array
          key: streamKeyName,
          id: '>', // Next entry ID that no consumer in this group has read
        },
      ],
      {
        COUNT: readMaxCount, // Read n entries at a time
        BLOCK: 0, // block for 0 (infinite) seconds if there are none.
      },
    );

    for (let data of dataArr) {
      for (let messageItem of data.messages) {
        //process the message received (in our case, updateOrderStatus)
        await onMessage(messageItem.message, messageItem.id);

        // acknowledge individual messages after processing
        nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
      }
    }
  }
}

// `listenToStream` listens for events and calls the `onMessage` callback to further handle the events.
listenToStream({
  onMessage: updateOrderStatus,
});

const updateOrderStatus: IMessageHandler = async (message, messageId) => {
  /*
   message = {
      orderId: "",
      paymentId: "",
      orderStatusCode:"",
      userId: "",
    }
    */
  // updates orderStatus and paymentId in database accordingly for the order which has fulfilled payment
  // updateOrderStatusInRedis(orderId,paymentId,orderStatusCode,userId)
  // updateOrderStatusInMongoDB(orderId,paymentId,orderStatusCode,userId)
};
提示

验证所有传入消息以确保您可以使用它们,这是一个最佳实践。

出于应用程序的目的,我们在同一服务中调用更新 Redis 和主数据库中的订单状态(为简单起见,我们没有使用数据库之间的任何同步技术,而是专注于如何在 Redis 中存储和访问数据)。另一个常见模式是让您的服务写入一个数据库,然后分别使用 CDC 机制来更新另一个数据库。例如,您可以直接写入 Redis,然后使用 触发器和函数 来处理在后台同步 Redis 和主数据库。

提示

如果您使用 Redis 云,您会发现 Redis Streams 在您已经用于缓存的相同多租户数据平台上可用。Redis 云还具有高可用性、消息持久性、对多个客户端的支持以及主/从数据复制的弹性……所有这些都是内置的。

准备使用 Redis 进行流式传输?#

就这么简单!您现在知道如何将 Redis 用于流式传输,既作为生产者,也作为消费者。希望您可以从本教程中获得一些灵感,并将其应用于您自己的事件流式传输应用程序。有关此主题的更多信息,请查看下面的附加资源

附加资源#