学习

如何使用 Redis 进行交易风险评分

Will Johnston
作者
Will Johnston, Redis 的开发者增长经理
Prasan Kumar
作者
Prasan Kumar, Redis 的技术解决方案开发者
GITHUB 代码

以下是克隆本教程中使用的应用源代码的命令

git clone --branch v3.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

什么是交易风险评分#

“交易风险评分”是一种利用数据科学、机器学习和统计分析持续监控交易并评估每笔交易相关相对风险的方法。通过将交易数据与已知欺诈模型进行比较,可以计算出风险评分,交易与欺诈行为越匹配,风险评分越高。

该评分通常基于对历史交易数据的统计分析,以识别与欺诈活动相关的模式和趋势。该评分可用于触发警报或自动拒绝超过特定风险阈值的交易。它还可用于对高风险交易触发额外的身份验证步骤。额外的步骤可能包括通过短信、电子邮件或生物识别扫描发送一次性密码 (OTP)。

提示

交易风险评分通常与数字身份验证等其他欺诈检测方法结合在一个系统中。数字身份验证

为什么应该使用 Redis 进行交易风险评分#

基于风险的方法必须旨在创建无摩擦的流程,并避免减慢合法客户的交易体验,同时防止欺诈。如果您的基于风险的方法过于严格,它将阻止合法交易并让客户感到沮丧。如果过于宽松,它将允许欺诈交易通过。

如何使用规则引擎避免误报#

基于规则的自动化欺诈检测系统基于简单的“是或否”逻辑来确定给定交易是否可能存在欺诈。一个规则的例子是“阻止来自高风险地区的所有超过 500 美元的交易”。像这种简单的二元决策系统很可能会阻止许多真正的客户。复杂的欺诈者很容易骗过这类系统,而且欺诈的复杂性意味着简单的“是或否”规则可能不足以准确评估每笔交易的风险。

使用 AI/ML 进行更准确的风险评分可以解决这些问题。现代欺诈检测系统使用在大量不同数据集(称为“特征”,例如用户画像、交易模式、行为属性等)上训练的机器学习模型来准确识别欺诈交易。这些模型设计灵活,可以适应新型欺诈。例如,神经网络可以检查客户在下单前浏览了多少页面、他们是复制粘贴信息还是手动输入等可疑活动,并将客户标记出来以供进一步审查。

模型使用历史数据以及最新数据为每个客户创建风险画像。通过分析过去的行为,可以为每个客户创建其正常行为的画像。任何偏离此画像的交易都可以被标记为可疑,从而降低误报的可能性。模型也能非常快速地适应正常行为的变化,并能快速识别欺诈交易模式。

这正是 Redis Cloud 在交易风险评分方面的优势所在。

如何使用 Redis Cloud 进行交易风险评分#

人们使用 Redis Cloud 作为在线和 实时访问 特征数据的 内存中 在线特征存储,作为交易风险评分系统的一部分。通过以低延迟服务在线特征,Redis Cloud 使风险评分模型能够实时返回结果,从而使整个系统在批准合法在线交易时实现高准确性和 即时响应 。

Redis Cloud 在交易风险评分中的另一个非常常见的用途是用于 交易过滤器。交易过滤器可以实现为一个 Bloom 过滤器,用于存储用户行为信息。它可以回答诸如“我们以前见过该用户在此商家处购买过吗?”或“我们以前见过该用户在此商家处购买过 X 到 Y 价格范围内的商品吗?”之类的问题。作为一种概率数据结构,Redis Bloom 过滤器确实牺牲了一些准确性,但作为回报,它们具有非常低的内存占用和响应时间。

提示

您可能会问,为什么不使用 Redis Set 来回答上面的一些问题。Redis Sets 用于存储唯一的无序字符串集合(成员)。它们非常高效,大多数操作的时间复杂度为 O(1)。然而,SMEMBERS 命令的时间复杂度为 O(N),其中 N 是集合的基数,对于大型集合可能非常慢,并且也会占用大量内存。这在单实例存储和地理复制方面都存在问题,因为更多的数据需要更多的时间来传输。这就是为什么 Redis Bloom 过滤器是交易过滤器的更好选择。应用每天都会进行数百万笔交易,而 Bloom 过滤器可以在大规模情况下保持快速响应时间。

电子商务应用的微服务架构中的交易风险评分#

本教程其余部分讨论的电子商务微服务应用采用以下架构

  1. 1.products service: 处理从数据库查询产品并将其返回给前端
  2. 2.orders service: 处理订单验证和创建
  3. 3.order history service: 处理客户订单历史查询
  4. 4.payments service: 处理订单支付
  5. 5.digital identity service: 处理数字身份存储和身份评分计算
  6. 6.api gateway: 在单个端点下统一服务
  7. 7.mongodb/ postgresql: 作为主数据库,存储订单、订单历史、产品等。
  8. 8.redis: 作为 流处理器 和缓存数据库
信息

在演示应用中,您不必使用 MongoDB/Postgresql 作为主数据库;您也可以使用其他prisma 支持的数据库。这只是一个示例。

交易风险评分结账流程#

当用户进行结账时,系统需要检查用户的数字身份和画像以确定交易风险。然后系统可以决定是批准交易还是触发额外的身份验证步骤。下图展示了电子商务应用中交易风险评分的流程

结账流程中执行以下步骤

  1. 1.客户将商品添加到购物车并进行结账。
  2. 2.order service 接收结账请求并在数据库中创建订单。
  3. 3.order services 向 TRANSACTIONS Redis 流发布 CALCULATE_IDENTITY_SCORE 事件。
  4. 4.identity service 订阅 TRANSACTIONS Redis 流并接收 CALCULATE_IDENTITY_SCORE 事件。
  5. 5.identity service 计算用户的身份评分 并向 TRANSACTIONS Redis 流发布 CALCULATE_PROFILE_SCORE 事件。
  6. 6.profile service 订阅 TRANSACTIONS Redis 流并接收 CALCULATE_PROFILE_SCORE 事件。
  7. 7.profile service 通过对照客户已知画像检查购物车中的商品来计算画像评分。
  8. 8.profile service 将 ASSESS_RISK 事件发布到 TRANSACTIONS Redis Stream。
  9. 9.order service 订阅 TRANSACTIONS Redis Stream 并接收 ASSESS_RISK 事件。
  10. 10.order service 根据身份和资料评分确定是否存在欺诈的可能性。如果存在欺诈的可能性,order service 会触发额外的身份验证步骤。如果不存在欺诈的可能性,order service 会批准订单并继续处理付款。

使用 Next.js 和 Tailwind 构建的电子商务应用前端#

该电子商务微服务应用包含一个使用 Next.js 和 TailwindCSS 构建的前端。应用程序后端使用 Node.js。数据存储在 Redis 和 MongoDB/Postgressql 中,使用 Prisma。下面是电子商务应用前端的截图:

  • 仪表盘:显示产品列表及搜索功能

购物车:将产品添加到购物车,然后点击“立即购买”按钮结账

订单历史记录:下单后,顶部导航栏中的 Orders 链接会显示订单状态和历史记录

GITHUB 代码

以下是克隆本教程中使用的应用源代码的命令

git clone --branch v3.0.0 https://github.com/redis-developer/redis-microservices-ecommerce-solutions

使用 redis 进行交易风险评分的编码示例#

现在您已经了解了交易风险评分结账过程中涉及的步骤,接下来我们看看 order service 和 profile service 的代码是如何促进这一过程的:

注意

要查看 identity service 的代码,请参阅 数字身份验证 解决方案。

在 order service 中启动结账流程#

当 order service 收到结账请求时,它会在数据库中创建一个订单,并将一个 CALCULATE_IDENTITY_SCORE 事件发布到 TRANSACTIONS Redis Stream。该事件包含有关订单和客户的信息,例如浏览器指纹、IP 地址和身份画像 (profile)。这些数据将在交易过程中由 identity service 和 profile service 用于计算身份和资料评分。order service 还指定了交易管道(transaction pipeline),这意味着它决定了事件调用的顺序,以便 identity service 和 profile service 无需相互了解。order service 最终拥有该交易。下面的示例代码展示了 order service 中的 createOrder 函数。下面的代码示例是高度简化的。有关更多详细信息,请参阅上面链接的源代码:

const TransactionPipelines = {
  CHECKOUT: [
    TransactionStreamActions.CALCULATE_IDENTITY_SCORE,
    TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    TransactionStreamActions.ASSESS_RISK,
    TransactionStreamActions.PROCESS_PAYMENT,
    TransactionStreamActions.PAYMENT_PROCESSED,
  ],
};

async function createOrder(
  order: IOrder,
  browserAgent: string,
  ipAddress: string,
  sessionId: string,
  sessionData: ISessionData,
) {
  order = await validateOrder(order);

  const orderId = await addOrderToRedis(order);
  order.orderId = orderId;

  await addOrderToMongoDB(order);

  // Log order creation to the LOGS stream
  await streamLog({
    action: 'CREATE_ORDER',
    message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order created with id ${orderId} for the user ${userId}`,
    metadata: {
      userId: userId,
      persona: sessionData.persona,
      sessionId: sessionId,
    },
  });

  let orderAmount = 0;
  order.products?.forEach((product) => {
    orderAmount += product.productPrice * product.qty;
  });

  const orderDetails: IOrderDetails = {
    orderId: orderId,
    orderAmount: orderAmount.toFixed(2),
    userId: userId,
    sessionId: sessionId,
    orderStatus: order.orderStatusCode,
    products: order.products,
  };

  // Initiate the transaction by adding the order details to the transaction stream and sending the first event
  await addMessageToTransactionStream({
    action: TransactionPipelines.CHECKOUT[0],
    logMessage: `[${REDIS_STREAMS.CONSUMERS.IDENTITY}] Digital identity to be validated/ scored for the user ${userId}`,
    userId: userId,
    persona: sessionData.persona,
    sessionId: sessionId,
    orderDetails: orderDetails ? JSON.stringify(orderDetails) : '',
    transactionPipeline: JSON.stringify(TransactionPipelines.CHECKOUT),

    identityBrowserAgent: browserAgent,
    identityIpAddress: ipAddress,
  });

  return orderId;
}

让我们更详细地看看 addMessageToTransactionStream 函数

async function addMessageToStream(message, streamKeyName) {
  try {
    const nodeRedisClient = getNodeRedisClient();
    if (nodeRedisClient && message && streamKeyName) {
      const id = '*'; //* = auto generate
      await nodeRedisClient.xAdd(streamKeyName, id, message);
    }
  } catch (err) {
    LoggerCls.error('addMessageToStream error !', err);
    LoggerCls.error(streamKeyName, message);
  }
}

async function addMessageToTransactionStream(
  message: ITransactionStreamMessage,
) {
  if (message) {
    const streamKeyName = REDIS_STREAMS.STREAMS.TRANSACTIONS;
    await addMessageToStream(message, streamKeyName);
  }
}

在 profile service 中对照已知资料检查订单#

因此,正如您在上面看到的,交易管道遵循 CALCULATE_IDENTITY_SCORE -> CALCULATE_PROFILE_SCORE -> ASSESS_RISK。现在我们来看看 profile service 如何订阅 TRANSACTIONS Redis Stream 并接收 CALCULATE_PROFILE_SCORE 事件。profile service 启动时,它会订阅 TRANSACTIONS Redis Stream 并监听事件。

function listen() {
  listenToStreams({
    streams: [
      {
        streamKeyName: REDIS_STREAMS.STREAMS.TRANSACTIONS,
        eventHandlers: {
          [TransactionStreamActions.CALCULATE_PROFILE_SCORE]:
            calculateProfileScore,
        },
      },
    ],
    groupName: REDIS_STREAMS.GROUPS.PROFILE,
    consumerName: REDIS_STREAMS.CONSUMERS.PROFILE,
  });
}

一个高度简化的 listenToStreams 方法如下所示。它接收一个流列表,以及一个关联的对象,该对象将流上的事件映射到用于处理事件的回调函数。它还接收一个流组和一个消费者名称。然后,它处理对流的订阅,并在事件到来时调用适当的方法:

interface ListenStreamOptions {
  streams: {
    streamKeyName: string;
    eventHandlers: {
      [messageAction: string]: IMessageHandler;
    };
  }[];
  groupName: string;
  consumerName: string;
  maxNoOfEntriesToReadAtTime?: number;
}

const listenToStreams = async (options: ListenStreamOptions) => {
  /*
   (A) create consumer group for the stream
   (B) read set of messages from the stream
   (C) process all messages received
   (D) trigger appropriate action callback for each message
   (E) acknowledge individual messages after processing
  */

  const nodeRedisClient = getNodeRedisClient();
  if (nodeRedisClient) {
    const streams = options.streams;
    const groupName = options.groupName;
    const consumerName = options.consumerName;
    const readMaxCount = options.maxNoOfEntriesToReadAtTime || 100;
    const idInitialPosition = '0'; //0 = start, $ = end or any specific id
    const streamKeyIdArr: {
      key: string;
      id: string;
    }[] = [];

    streams.map(async (stream) => {
      LoggerCls.info(
        `Creating consumer group ${groupName} in stream ${stream.streamKeyName}`,
      );

      try {
        // (A) create consumer group for the stream
        await nodeRedisClient.xGroupCreate(
          stream.streamKeyName,
          groupName,
          idInitialPosition,
          {
            MKSTREAM: true,
          },
        );
      } catch (err) {
        LoggerCls.error(
          `Consumer group ${groupName} already exists in stream ${stream.streamKeyName}!`,
        ); //, err
      }

      streamKeyIdArr.push({
        key: stream.streamKeyName,
        id: '>', // Next entry ID that no consumer in this group has read
      });
    });

    LoggerCls.info(`Starting consumer ${consumerName}.`);

    while (true) {
      try {
        // (B) read set of messages from different streams
        const dataArr = await nodeRedisClient.xReadGroup(
          commandOptions({
            isolated: true,
          }),
          groupName,
          consumerName,
          //can specify multiple streams in array [{key, id}]
          streamKeyIdArr,
          {
            COUNT: readMaxCount, // Read n entries at a time
            BLOCK: 5, //block for 0 (infinite) seconds if there are none.
          },
        );

        // dataArr = [
        //   {
        //     name: 'streamName',
        //     messages: [
        //       {
        //         id: '1642088708425-0',
        //         message: {
        //           key1: 'value1',
        //         },
        //       },
        //     ],
        //   },
        // ];

        //(C) process all messages received
        if (dataArr && dataArr.length) {
          for (let data of dataArr) {
            for (let messageItem of data.messages) {
              const streamKeyName = data.name;

              const stream = streams.find(
                (s) => s.streamKeyName == streamKeyName,
              );

              if (stream && messageItem.message) {
                const streamEventHandlers = stream.eventHandlers;
                const messageAction = messageItem.message.action;
                const messageHandler = streamEventHandlers[messageAction];

                if (messageHandler) {
                  // (D) trigger appropriate action callback for each message
                  await messageHandler(messageItem.message, messageItem.id);
                }
                //(E) acknowledge individual messages after processing
                nodeRedisClient.xAck(streamKeyName, groupName, messageItem.id);
              }
            }
          }
        } else {
          // LoggerCls.info('No new stream entries.');
        }
      } catch (err) {
        LoggerCls.error('xReadGroup error !', err);
      }
    }
  }
};

当新事件到来时,会调用 processTransactionStream 方法。它验证事件,确保它是 CALCULATE_PROFILE_SCORE 事件,如果是,则计算资料评分。它使用 Redis Bloom filter 检查用户之前是否订购过类似的产品集。出于演示目的,它使用预定义的身份画像(persona),但实际上您会随着时间建立用户的资料。在演示应用中,每个产品都有一个“主类别(master category)”和“子类别(subcategory)”。为这些主类别以及主类别+子类别设置了 Bloom filters。评分逻辑在下面突出显示:

async function calculateProfileScore(
  message: ITransactionStreamMessage,
  messageId,
) {
  LoggerCls.info(`Incoming message in Profile Service ${messageId}`);
  if (!(message.orderDetails && message.persona)) {
    return false;
  }

  await streamLog({
    action: TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    message: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Calculating profile score for the user ${message.userId}`,
    metadata: message,
  });

  // check profile score
  const { products }: IOrderDetails = JSON.parse(message.orderDetails);
  const persona = message.persona.toLowerCase();
  let score = 0;
  const nodeRedisClient = getNodeRedisClient();

  if (!nodeRedisClient) {
    return false;
  }

  const categories = products.reduce((cat, product) => {
    const masterCategory = product.productData?.masterCategory?.typeName;
    const subCategory = product.productData?.subCategory?.typeName;

    if (masterCategory) {
      cat[`${masterCategory}`.toLowerCase()] = true;

      if (subCategory) {
        cat[`${masterCategory}:${subCategory}`.toLowerCase()] = true;
      }
    }

    return cat;
  }, {} as Record<string, boolean>);

  const categoryKeys = Object.keys(categories);
  const checks = categoryKeys.length;

  LoggerCls.info(
    `Checking ${checks} categories: ${JSON.stringify(categoryKeys)}`,
  );

  await Promise.all(
    categoryKeys.map(async (category) => {
      const exists = await nodeRedisClient.bf.exists(
        `bfprofile:${category}`.toLowerCase(),
        persona,
      );

      if (exists) {
        score += 1;
      }
    }),
  );

  LoggerCls.info(`After ${checks} checks, total score is ${score}`);
  score = score / (checks || 1);

  await streamLog({
    action: TransactionStreamActions.CALCULATE_PROFILE_SCORE,
    message: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Profile score for the user ${message.userId} is ${score}`,
    metadata: message,
  });

  await nextTransactionStep({
    ...message,
    logMessage: `[${REDIS_STREAMS.CONSUMERS.PROFILE}] Requesting next step in transaction risk scoring for the user ${message.userId}`,
    profileScore: `${score}`,
  });

  return true;
}

计算资料评分后,会调用 nextTransactionStep 方法。它使用在 order service 中设置的 transactionPipeline 来发布 ASSESS_RISK 事件。其逻辑如下:

async function nextTransactionStep(message: ITransactionStreamMessage) {
  const transactionPipeline: TransactionStreamActions[] = JSON.parse(
    message.transactionPipeline,
  );
  transactionPipeline.shift();

  if (transactionPipeline.length <= 0) {
    return;
  }

  const streamKeyName = REDIS_STREAMS.STREAMS.TRANSACTIONS;
  await addMessageToStream(
    {
      ...message,
      action: transactionPipeline[0],
      transactionPipeline: JSON.stringify(transactionPipeline),
    },
    streamKeyName,
  );
}

简而言之,nextTransactionStep 方法从 transactionPipeline 中弹出当前事件,然后发布管道中的下一个事件,在本例中是 ASSESS_RISK 事件。

在 order service 中通过交易风险评分完成订单#

order service 负责在付款前完成订单。它监听 ASSESS_RISK 事件,然后检查计算出的评分以确定是否存在潜在欺诈。

注意

演示应用程序非常简单,它只在订单上设置一个“potentialFraud”标志。在实际应用中,您不仅需要选择适合您应用的评分方式,还需要考虑如何处理潜在欺诈。例如,您可能需要向客户请求额外信息,如一次性密码。您也可能希望将订单发送给人工进行审核。这取决于您的业务、风险承受能力和缓解策略。

在 order service 中处理和完成订单的逻辑如下:

async function checkOrderRiskScore(message: ITransactionStreamMessage) {
  LoggerCls.info(`Incoming message in Order Service`);
  if (!message.orderDetails) {
    return false;
  }

  const orderDetails: IOrderDetails = JSON.parse(message.orderDetails);

  if (!(orderDetails.orderId && orderDetails.userId)) {
    return false;
  }

  LoggerCls.info(
    `Transaction risk scoring for user ${message.userId} and order ${orderDetails.orderId}`,
  );

  const { identityScore, profileScore } = message;
  const identityScoreNumber = Number(identityScore);
  const profileScoreNumber = Number(profileScore);
  let potentialFraud = false;

  if (identityScoreNumber <= 0 || profileScoreNumber < 0.5) {
    LoggerCls.info(
      `Transaction risk score is too low for user ${message.userId} and order ${orderDetails.orderId}`,
    );

    await streamLog({
      action: TransactionStreamActions.ASSESS_RISK,
      message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order failed fraud checks for orderId ${orderDetails.orderId} and user ${message.userId}`,
      metadata: message,
    });

    potentialFraud = true;
  }

  orderDetails.orderStatus = ORDER_STATUS.PENDING;
  orderDetails.potentialFraud = potentialFraud;

  updateOrderStatusInRedis(orderDetails);
  /**
   * In real world scenario : can use RDI/ redis gears/ any other database to database sync strategy for REDIS-> Store of record data transfer.
   * To keep it simple, adding  data to MongoDB manually in the same service
   */
  updateOrderStatusInMongoDB(orderDetails);

  message.orderDetails = JSON.stringify(orderDetails);

  await streamLog({
    action: TransactionStreamActions.ASSESS_RISK,
    message: `[${REDIS_STREAMS.CONSUMERS.ORDERS}] Order status updated after fraud checks for orderId ${orderDetails.orderId} and user ${message.userId}`,
    metadata: message,
  });

  await nextTransactionStep(message);

  return true;
}

在 RedisInsight 中可视化交易风险评分数据和事件管道#

提示

RedisInsight 是用于在 redis 中查看数据的免费 redis GUI。 点击此处下载。

既然您已经了解了处理事务的一些相关代码,让我们来看看 RedisInsight 中的数据。首先,我们来看看 TRANSACTION_STREAM 键,这里存储了结账事务的流数据:

您可以看到 action 列显示了前面讨论过的交易管道(transaction pipeline)。在 RedisInsight 中要看的另一件事是 Bloom filters:

这些过滤器在演示应用程序中基于一个 feature store 进行了预填充。Redis 也存储了这些 features,在这种情况下,它们是每个身份画像(persona)的资料。下面是一个资料 feature 的示例:

总结#

在这篇文章中,您学习了如何使用 Redis Streams 构建交易风险评分管道。您还学习了如何使用 Redis Cloud 作为 feature store 以及使用 Redis Bloom filters 计算资料评分。每个应用程序都是独特的,因此本教程旨在为您构建自己的交易风险评分管道提供一个起点。

额外资源#

Redis Streams

使用 Redis 检测欺诈

通用

请点击您的文本内容,然后按 Enter。