学习

如何使用 Redis 进行交易风险评分

Will Johnston
作者
Will Johnston, Redis 开发者增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 技术解决方案开发人员
GITHUB 代码

以下命令用于克隆本教程中使用的应用程序的源代码

git clone --branch v3.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

什么是交易风险评分#

"交易风险评分"是一种利用数据科学、机器学习和统计分析来持续监控交易并评估与每个交易相关的相对风险的方法。通过将交易数据与已知欺诈模型进行比较,可以计算出风险得分,并且交易越接近欺诈行为,风险得分就越高。

该分数通常基于对历史交易数据的统计分析,以识别与欺诈活动相关的模式和趋势。然后可以使用该分数触发警报或自动拒绝超过特定风险阈值的交易。它还可以用于对高风险交易触发额外的身份验证步骤。额外的步骤可能包括通过短信、电子邮件或生物识别扫描发送的一次性密码 (OTP)。

提示

交易风险评分通常与其他欺诈检测方法结合在一个系统中,例如 数字身份验证.

为什么要将 Redis 用于交易风险评分#

必须设计一种基于风险的方法来创建无缝流程并 避免减缓 对合法客户的交易体验,同时防止欺诈。如果你的基于风险的方法过于严格,它将 阻止合法交易 并让客户感到沮丧。如果它过于宽松,它将 允许欺诈性交易 通过。

如何使用规则引擎避免误报#

基于规则的自动化欺诈检测系统使用简单的“是或否”逻辑来确定给定交易是否可能是欺诈性的。一个规则示例是“阻止来自有风险地区的 500 美元以上的交易”。使用这样的简单二进制决策,系统可能会阻止许多真正的客户。老练的欺诈者很容易愚弄这样的系统,而欺诈的复杂性意味着简单的“是或否”规则可能不足以准确评估每笔交易的风险。

更准确的风险评分与 AI/ML 解决了这些问题。现代欺诈检测系统使用机器学习模型,这些模型在大量不同数据集(称为“特征”(用户配置文件、交易模式、行为属性等等))上进行训练,以准确识别欺诈性交易。这些模型被设计为灵活的,因此它们可以适应新的欺诈类型。例如,神经网络可以检查可疑活动,例如客户在订购前浏览了多少页面,以及他们是在复制粘贴信息还是手动输入信息,并将客户标记为进一步审查。

这些模型使用历史数据和最新数据为每个客户创建风险概况。通过分析过去的行为,可以创建每个客户的正常行为概况。任何偏离此概况的交易都可能被标记为可疑,从而降低误报的可能性。这些模型对正常行为的变化适应速度也非常快,并且可以快速识别欺诈交易模式。

这正是 Redis Cloud 在交易风险评分中表现出色的地方。

如何在交易风险评分中使用 Redis Cloud#

人们使用 Redis Cloud 作为 内存中 在线特征存储,以在线和 实时访问 交易风险评分系统的一部分的特征数据。通过低延迟提供在线特征,Redis Cloud 使风险评分模型能够实时返回结果,从而使整个系统能够在批准合法在线交易时实现高精度和 即时响应 。

Redis Cloud 在交易风险评分中的另一个非常常见的用途是 交易过滤器。交易过滤器可以实现为一个 布隆 过滤器,该过滤器存储有关用户行为的信息。它可以回答诸如“我们以前是否看到过此用户在此商家处购买?”或“我们以前是否看到过此用户在此商家处以 X 到 Y 的价格范围购买?”。作为一种概率数据结构,Redis 布隆过滤器确实牺牲了一些准确性,但作为回报,它们获得了非常低的内存占用和响应时间。

提示

你可能会问,为什么不使用 Redis 集合来回答上面的一些问题。Redis 集合用于存储无序的唯一字符串(成员)集合。它们非常高效,大多数操作都采用 O(1) 时间复杂度。但是, SMEMBERS 命令是 O(N),其中 N 是集合的基数,并且对于大型集合来说可能非常慢,并且还会占用大量内存。这在单实例存储和地理复制中都存在问题,因为更多数据需要更多时间来移动。这就是 Redis 布隆过滤器是交易过滤器的更好选择的原因。应用程序每天经历数百万次交易,而布隆过滤器在规模上保持快速响应时间。

电子商务应用程序中的微服务架构中的交易风险评分#

本教程其余部分中讨论的电子商务微服务应用程序使用以下架构

  1. 1.products service: 处理从数据库查询产品并将其返回给前端
  2. 2.orders service: 处理订单验证和创建
  3. 3.order history service: 处理查询客户的订单历史记录
  4. 4.payments service: 处理订单付款
  5. 5.digital identity service: 处理存储数字身份和计算身份分数
  6. 6.api gateway: 将服务统一到单个端点下
  7. 7.mongodb/ postgresql: 作为主要数据库,存储订单、订单历史记录、产品等
  8. 8.redis: 作为 流处理器 和缓存数据库
信息

在演示应用程序中,您不需要使用 MongoDB/PostgreSQL 作为您的主要数据库;您可以使用其他prisma 支持的数据库。这只是一个示例。

交易风险评分结账流程#

当用户进入结账页面时,系统需要检查用户的数字身份和资料,以确定交易风险。然后系统可以决定批准交易或触发额外的身份验证步骤。下图显示了电子商务应用程序中交易风险评分的流程

结账流程中执行以下步骤

  1. 1.客户将商品添加到购物车并继续结账。
  2. 2.该 订单服务 接收结账请求并在数据库中创建订单。
  3. 3.该 订单服务 发布一个 CALCULATE_IDENTITY_SCORE 事件到 TRANSACTIONS Redis 流。
  4. 4.该 身份服务 订阅 TRANSACTIONS Redis 流并接收 CALCULATE_IDENTITY_SCORE 事件。
  5. 5.该 身份服务 计算用户的身份评分 并发布一个 CALCULATE_PROFILE_SCORE 事件到 TRANSACTIONS Redis 流。
  6. 6.该 资料服务 订阅 TRANSACTIONS Redis 流并接收 CALCULATE_PROFILE_SCORE 事件。
  7. 7.该 资料服务 通过将购物车中的商品与客户的已知资料进行比较来计算资料评分。
  8. 8.该 资料服务 发布一个 ASSESS_RISK 事件到 TRANSACTIONS Redis 流。
  9. 9.该 订单服务 订阅 TRANSACTIONS Redis 流并接收 ASSESS_RISK 事件。
  10. 10.该 订单服务 根据身份和资料评分确定是否有欺诈可能性。如果有欺诈可能性,则 订单服务 会触发额外的身份验证步骤。如果没有欺诈可能性,则 订单服务 会批准订单并继续处理付款。

使用 Next.js 和 Tailwind 的电子商务应用程序前端#

电子商务微服务应用程序包含一个前端,使用 Next.js 和 TailwindCSS 构建。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB/Postgressql 中,使用 Prisma。您将在下面找到电子商务应用程序前端的屏幕截图

  • 仪表板:显示具有搜索功能的商品列表

购物车:将商品添加到购物车,然后使用“立即购买”按钮结账

订单历史:订单一旦下达,顶部导航栏中的“订单”链接就会显示订单状态和历史记录

GITHUB 代码

以下命令用于克隆本教程中使用的应用程序的源代码

git clone --branch v3.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

使用 Redis 进行交易风险评分的编码示例#

现在您已经了解了交易风险评分结账流程中涉及的步骤,让我们看一下 订单服务 和 资料服务 的代码,以促进此流程:

注意

要查看 身份服务 的代码,请查看 数字身份验证 解决方案。

在订单服务中启动结账流程#

当 订单服务 接收到结账请求时,它会在数据库中创建订单并发布一个 CALCULATE_IDENTITY_SCORE 事件到 TRANSACTIONS Redis 流。该事件包含有关订单以及客户的信息,例如浏览器指纹、IP 地址和角色(资料)。该数据将在交易期间由 身份服务 和 资料服务 用于计算身份和资料评分。该 订单服务 还指定了交易管道,这意味着它确定了调用事件的顺序,以便 身份服务 和 资料服务 无需了解彼此。该 订单服务 最终拥有交易。下面的示例代码展示了 createOrder 函数在 订单服务中。下面的代码示例非常简化。有关更多详细信息,请参见上面链接的源代码:

const TransactionPipelines = {
  CHECKOUT: [
    TransactionStreamActions.CALCULATE_IDENTITY_SCORE,
    TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    TransactionStreamActions.ASSESS_RISK,
    TransactionStreamActions.PROCESS_PAYMENT,
    TransactionStreamActions.PAYMENT_PROCESSED,
  ],
};

async function createOrder(
  order: IOrder,
  browserAgent: string,
  ipAddress: string,
  sessionId: string,
  sessionData: ISessionData,
) {
  order = await validateOrder(order);

  const orderId = await addOrderToRedis(order);
  order.orderId = orderId;

  await addOrderToMongoDB(order);

  // Log order creation to the LOGS stream
  await streamLog({
    action: 'CREATE_ORDER',
    message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order created with id ${orderId} for the user ${userId}`,
    metadata: {
      userId: userId,
      persona: sessionData.persona,
      sessionId: sessionId,
    },
  });

  let orderAmount = 0;
  order.products?.forEach((product) => {
    orderAmount += product.productPrice * product.qty;
  });

  const orderDetails: IOrderDetails = {
    orderId: orderId,
    orderAmount: orderAmount.toFixed(2),
    userId: userId,
    sessionId: sessionId,
    orderStatus: order.orderStatusCode,
    products: order.products,
  };

  // Initiate the transaction by adding the order details to the transaction stream and sending the first event
  await addMessageToTransactionStream({
    action: TransactionPipelines.CHECKOUT[0],
    logMessage: `[${REDIS_STREAMS.CONSUMERS.IDENTITY}] Digital identity to be validated/ scored for the user ${userId}`,
    userId: userId,
    persona: sessionData.persona,
    sessionId: sessionId,
    orderDetails: orderDetails ? JSON.stringify(orderDetails) : '',
    transactionPipeline: JSON.stringify(TransactionPipelines.CHECKOUT),

    identityBrowserAgent: browserAgent,
    identityIpAddress: ipAddress,
  });

  return orderId;
}

让我们更详细地看一下 addMessageToTransactionStream 函数

async function addMessageToStream(message, streamKeyName) {
  try {
    const nodeRedisClient = getNodeRedisClient();
    if (nodeRedisClient && message && streamKeyName) {
      const id = '*'; //* = auto generate
      await nodeRedisClient.xAdd(streamKeyName, id, message);
    }
  } catch (err) {
    LoggerCls.error('addMessageToStream error !', err);
    LoggerCls.error(streamKeyName, message);
  }
}

async function addMessageToTransactionStream(
  message: ITransactionStreamMessage,
) {
  if (message) {
    const streamKeyName = REDIS_STREAMS.STREAMS.TRANSACTIONS;
    await addMessageToStream(message, streamKeyName);
  }
}

在资料服务中根据已知资料检查订单#

如您所见,交易管道遵循 CALCULATE_IDENTITY_SCORE -> CALCULATE_PROFILE_SCORE -> ASSESS_RISK。现在让我们看一下 资料服务 如何订阅 TRANSACTIONS Redis 流并接收 CALCULATE_PROFILE_SCORE 事件。当 资料服务 启动时,它会订阅 TRANSACTIONS Redis 流并监听事件。

function listen() {
  listenToStreams({
    streams: [
      {
        streamKeyName: REDIS_STREAMS.STREAMS.TRANSACTIONS,
        eventHandlers: {
          [TransactionStreamActions.CALCULATE_PROFILE_SCORE]:
            calculateProfileScore,
        },
      },
    ],
    groupName: REDIS_STREAMS.GROUPS.PROFILE,
    consumerName: REDIS_STREAMS.CONSUMERS.PROFILE,
  });
}

该 listenToStreams 方法的简化版本如下。它接受一个包含关联对象的流列表,该对象将流上的事件映射到用于处理事件的回调。它还接受一个流组和一个消费者名称。然后它处理对流的订阅,并在事件到来时调用相应的方法:

interface ListenStreamOptions {
  streams: {
    streamKeyName: string;
    eventHandlers: {
      [messageAction: string]: IMessageHandler;
    };
  }[];
  groupName: string;
  consumerName: string;
  maxNoOfEntriesToReadAtTime?: number;
}

const listenToStreams = async (options: ListenStreamOptions) => {
  /*
   (A) create consumer group for the stream
   (B) read set of messages from the stream
   (C) process all messages received
   (D) trigger appropriate action callback for each message
   (E) acknowledge individual messages after processing
  */

  const nodeRedisClient = getNodeRedisClient();
  if (nodeRedisClient) {
    const streams = options.streams;
    const groupName = options.groupName;
    const consumerName = options.consumerName;
    const readMaxCount = options.maxNoOfEntriesToReadAtTime || 100;
    const idInitialPosition = '0'; //0 = start, $ = end or any specific id
    const streamKeyIdArr: {
      key: string;
      id: string;
    }[] = [];

    streams.map(async (stream) => {
      LoggerCls.info(
        `Creating consumer group ${groupName} in stream ${stream.streamKeyName}`,
      );

      try {
        // (A) create consumer group for the stream
        await nodeRedisClient.xGroupCreate(
          stream.streamKeyName,
          groupName,
          idInitialPosition,
          {
            MKSTREAM: true,
          },
        );
      } catch (err) {
        LoggerCls.error(
          `Consumer group ${groupName} already exists in stream ${stream.streamKeyName}!`,
        ); //, err
      }

      streamKeyIdArr.push({
        key: stream.streamKeyName,
        id: '>', // Next entry ID that no consumer in this group has read
      });
    });

    LoggerCls.info(`Starting consumer ${consumerName}.`);

    while (true) {
      try {
        // (B) read set of messages from different streams
        const dataArr = await nodeRedisClient.xReadGroup(
          commandOptions({
            isolated: true,
          }),
          groupName,
          consumerName,
          //can specify multiple streams in array [{key, id}]
          streamKeyIdArr,
          {
            COUNT: readMaxCount, // Read n entries at a time
            BLOCK: 5, //block for 0 (infinite) seconds if there are none.
          },
        );

        // dataArr = [
        //   {
        //     name: 'streamName',
        //     messages: [
        //       {
        //         id: '1642088708425-0',
        //         message: {
        //           key1: 'value1',
        //         },
        //       },
        //     ],
        //   },
        // ];

        //(C) process all messages received
        if (dataArr && dataArr.length) {
          for (let data of dataArr) {
            for (let messageItem of data.messages) {
              const streamKeyName = data.name;

              const stream = streams.find(
                (s) => s.streamKeyName == streamKeyName,
              );

              if (stream && messageItem.message) {
                const streamEventHandlers = stream.eventHandlers;
                const messageAction = messageItem.message.action;
                const messageHandler = streamEventHandlers[messageAction];

                if (messageHandler) {
                  // (D) trigger appropriate action callback for each message
                  await messageHandler(messageItem.message, messageItem.id);
                }
                //(E) acknowledge individual messages after processing
                nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
              }
            }
          }
        } else {
          // LoggerCls.info('No new stream entries.');
        }
      } catch (err) {
        LoggerCls.error('xReadGroup error !', err);
      }
    }
  }
};

当新事件到来时,会调用 processTransactionStream 方法。它会验证事件,确保它是 CALCULATE_PROFILE_SCORE 事件,如果是,则计算资料评分。它使用 Redis 布隆过滤器来检查用户之前是否订购过类似的商品组合。它使用预定义的角色来进行演示,但在实际情况中,您需要随着时间的推移构建用户的资料。在演示应用程序中,每个商品都有一个“主类别”和“子类别”。布隆过滤器为“主类别”以及“主类别+子类别”设置。评分逻辑如下所示

async function calculateProfileScore(
  message: ITransactionStreamMessage,
  messageId,
) {
  LoggerCls.info(`Incoming message in Profile Service ${messageId}`);
  if (!(message.orderDetails && message.persona)) {
    return false;
  }

  await streamLog({
    action: TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    message: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Calculating profile score for the user ${message.userId}`,
    metadata: message,
  });

  // check profile score
  const { products }: IOrderDetails = JSON.parse(message.orderDetails);
  const persona = message.persona.toLowerCase();
  let score = 0;
  const nodeRedisClient = getNodeRedisClient();

  if (!nodeRedisClient) {
    return false;
  }

  const categories = products.reduce((cat, product) => {
    const masterCategory = product.productData?.masterCategory?.typeName;
    const subCategory = product.productData?.subCategory?.typeName;

    if (masterCategory) {
      cat[`${masterCategory}`.toLowerCase()] = true;

      if (subCategory) {
        cat[`${masterCategory}:${subCategory}`.toLowerCase()] = true;
      }
    }

    return cat;
  }, {} as Record<string, boolean>);

  const categoryKeys = Object.keys(categories);
  const checks = categoryKeys.length;

  LoggerCls.info(
    `Checking ${checks} categories: ${JSON.stringify(categoryKeys)}`,
  );

  await Promise.all(
    categoryKeys.map(async (category) => {
      const exists = await nodeRedisClient.bf.exists(
        `bfprofile:${category}`.toLowerCase(),
        persona,
      );

      if (exists) {
        score += 1;
      }
    }),
  );

  LoggerCls.info(`After ${checks} checks, total score is ${score}`);
  score = score / (checks || 1);

  await streamLog({
    action: TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    message: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Profile score for the user ${message.userId} is ${score}`,
    metadata: message,
  });

  await nextTransactionStep({
    ...message,
    logMessage: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Requesting next step in transaction risk scoring for the user ${message.userId}`,
    profileScore: `${score}`,
  });

  return true;
}

该 nextTransactionStep 方法是在计算资料评分后调用的。它使用 transactionPipeline 在 订单服务 中设置来发布 ASSESS_RISK 事件。此逻辑如下

async function nextTransactionStep(message: ITransactionStreamMessage) {
  const transactionPipeline: TransactionStreamActions[] = JSON.parse(
    message.transactionPipeline,
  );
  transactionPipeline.shift();

  if (transactionPipeline.length <= 0) {
    return;
  }

  const streamKeyName = REDIS_STREAMS.STREAMS.TRANSACTIONS;
  await addMessageToStream(
    {
      ...message,
      action: transactionPipeline[0],
      transactionPipeline: JSON.stringify(transactionPipeline),
    },
    streamKeyName,
  );
}

简而言之, nextTransactionStep 方法将当前事件从 transactionPipeline 中弹出,然后发布管道中的下一个事件,在本例中为 ASSESS_RISK 事件。

在订单服务中使用交易风险评分完成订单#

该 订单服务 负责在付款之前完成订单。它监听 ASSESS_RISK 事件,然后检查计算出的评分以确定是否存在潜在欺诈。

注意

演示应用程序将事情保持非常简单,它只在订单上设置一个“potentialFraud”标志。在现实世界中,您需要选择不仅对您的应用程序有意义的评分,还要选择如何处理潜在欺诈。例如,您可能希望从客户那里索取其他信息,例如一次性密码。您可能还想将订单发送给人工审核。这取决于您的业务以及您的风险承受能力和缓解策略。

在 订单服务 中处理和完成订单的逻辑如下

async function checkOrderRiskScore(message: ITransactionStreamMessage) {
  LoggerCls.info(`Incoming message in Order Service`);
  if (!message.orderDetails) {
    return false;
  }

  const orderDetails: IOrderDetails = JSON.parse(message.orderDetails);

  if (!(orderDetails.orderId && orderDetails.userId)) {
    return false;
  }

  LoggerCls.info(
    `Transaction risk scoring for user ${message.userId} and order ${orderDetails.orderId}`,
  );

  const { identityScore, profileScore } = message;
  const identityScoreNumber = Number(identityScore);
  const profileScoreNumber = Number(profileScore);
  let potentialFraud = false;

  if (identityScoreNumber <= 0 || profileScoreNumber < 0.5) {
    LoggerCls.info(
      `Transaction risk score is too low for user ${message.userId} and order ${orderDetails.orderId}`,
    );

    await streamLog({
      action: TransactionStreamActions.ASSESS_RISK,
      message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order failed fraud checks for orderId ${orderDetails.orderId} and user ${message.userId}`,
      metadata: message,
    });

    potentialFraud = true;
  }

  orderDetails.orderStatus = ORDER_STATUS.PENDING;
  orderDetails.potentialFraud = potentialFraud;

  updateOrderStatusInRedis(orderDetails);
  /**
   * In real world scenario : can use RDI/ redis gears/ any other database to database sync strategy for REDIS-> Store of record data transfer.
   * To keep it simple, adding  data to MongoDB manually in the same service
   */
  updateOrderStatusInMongoDB(orderDetails);

  message.orderDetails = JSON.stringify(orderDetails);

  await streamLog({
    action: TransactionStreamActions.ASSESS_RISK,
    message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order status updated after fraud checks for orderId ${orderDetails.orderId} and user ${message.userId}`,
    metadata: message,
  });

  await nextTransactionStep(message);

  return true;
}

在 RedisInsight 中可视化交易风险评分数据和事件管道#

提示

RedisInsight 是用于查看 Redis 中数据的免费 Redis GUI。 点击此处下载。

现在您已经了解了处理交易中涉及的一些代码,让我们在 RedisInsight 中查看一下数据。首先,让我们看一下 TRANSACTION_STREAM 键,它存储了结账交易的流数据:

您可以看到 action 列显示了之前讨论的交易管道。在 RedisInsight 中要查看的另一个东西是布隆过滤器:

这些过滤器在演示应用程序中基于特征存储进行预先填充。Redis 还会存储特征,在本例中是每个角色的配置文件。以下是其中一个配置文件特征的示例

结论#

在这篇文章中,您学习了如何使用 Redis Streams 构建交易风险评分管道。您还学习了如何使用 Redis Cloud 作为特征存储以及 Redis 布隆过滤器来计算配置文件分数。每个应用程序都是独一无二的,因此本教程旨在作为您构建自己的交易风险评分管道的起点。

其他资源#

请点击您的文本内容,然后按回车键。