想象一下,您的广告带来了大量流量,但您并未看到预期的广告支出效果。这可能并非巧合——欺诈者经常通过各种复杂的机制窃取数字广告营销预算。伪造点击会使广告看起来像是真实用户在互动,但实际上当这些虚假点击带来安装时,安装成本就落入了欺诈者的口袋。随着企业越来越愿意在数字广告上投入更多资金,广告市场中的欺诈者数量也在增加。
这篇博客文章将演示一个简化的实时欺诈检测用例,以便您可以了解如何领先于欺诈者。
我们使用了以下组件
您可以按照 https://docs.docker.net.cn/get-docker/ 在本地系统上安装 Docker。
您需要在本地机器上启动并运行一个 Redis 服务器。您可以使用以下 CLI 命令启动带有 RedisGears 的 Redis 服务器。
$ docker run -d -p 6379:6379 redislabs/redismod
该命令将从 Redis Docker 仓库拉取镜像,并启动包含所有所需模块的 Redis 服务器,日志将如下结束。
$ git clone https://github.com/redis-developer/redis-datasets.git
将目录切换到 fraud-detection
$ cd redis-datasets/use-cases/fraud-detection
代码位于 use-cases/fraud-detection 中。该应用已使用必要的软件包(包括 Redis 模块的客户端软件包)进行了 Docker 化。
使用命令创建镜像
$ docker build -t redis-fraud:latest .
使用命令创建容器
$ docker run -e REDIS_HOST='<host>' -e REDIS_PORT=6379 -p 5000:5000 -d redis-fraud
您将获得容器 ID,该 ID 可用于查看应用日志。
$ docker logs -f <container-id>
如果您使用 redismod 镜像在本地运行 Redis,请提供主机机器的 IP 地址(而不是 localhost 或 127.0.0.1)。
让我们看看这个项目中是如何管理连接的。
import os
import redis
from redisbloom.client import Client
from singleton_decorator import singleton
@singleton
class RedisConn:
def __init__(self):
host = os.getenv("REDIS_HOST")
port = os.getenv("REDIS_PORT")
if not host or not port:
raise Exception("No Redis Host or Port provided. Please provide Host and Port in docker run command as env")
port = int(port)
self.redis_client = redis.Redis(host=host, port=port)
self.bloom_client = Client(host=host, port=port)
def redis(self):
return self.redis_client
def bloom(self):
return self.bloom_client
在第 2 行,我们导入了 redis 包。所有核心 Redis 命令都在此 Redis 包中可用。
在第 4 行,我们导入了 RedisBloom 包。由于 RedisBloom 是一个模块,用于与其交互的客户端也不同。下面我们将看到更多此类示例。singleton_decorator 确保只创建此连接类的一个实例,并且 os 包用于读取环境变量以建立连接。
现在让我们看看如何使用 Redis 解决点击垃圾邮件和 IP 欺诈问题。
要点: https://gist.github.com/Sachin-Kottarathodi/c3a0647d3fdd0fe8a76425e0594e11c5
def ip_fraud(self, data):
exists = RedisConn().bloom().cfExists(Constants.IP_CUCKOO_FILTER_NAME, data['ip'])
if exists:
data['fraud_type'] = Constants.IP_BLACKLIST
data['status'] = Constants.FRAUD
return exists
def click_spam(self, data):
is_click_spammed = False
count = RedisConn().redis().zcount(data.get('device_id'), data['ts'] - self.click_spam_window_in_sec, data['ts'])
if count >= self.click_spam_threshold:
is_click_spammed = True
data['fraud_type'] = Constants.CLICK_SPAM
data['status'] = Constants.FRAUD
return is_click_spammed
def publish(self, data):
RedisConn().redis().xadd(Constants.STREAM_NAME, data, id='*')
在上面的代码中,使用 Cuckoo Filter 检测列入黑名单的 IP 欺诈。Cuckoo Filter 是 Redis Stack 的一个概率数据结构。使用 bloom 客户端提供的 cfExists 方法检查 IP 是否存在于 Cuckoo Filter 中。
Cuckoo Filter 可能会返回误报。要配置错误率,可以使用 cf.reserve
命令创建过滤器,并提供自定义 bucket 大小。
为了识别点击垃圾邮件,我们使用 redis 包中提供的有序集合的 zcount 方法。使用 zcount,我们可以在预配置的特定窗口内找到来自某个设备的点击次数。如果收到的计数大于某个阈值,我们就将其识别为异常。
最后,使用 xadd 命令将数据推送到 Redistream。id='*' 表示 Redistream 为我们的消息生成一个唯一 ID。
当应用出现时,会注册一个 gear,该 gear 会对我们用于推送数据的流做出反应。
要点:https://gist.github.com/Sachin-Kottarathodi/f9dac7a3342a3643e792e2143a6adf7d
from gearsclient import GearsRemoteBuilder as GearsBuilder
from redistimeseries.client import Client
def stream_handler(item):
data = item['value']
member = json.dumps(
{'device_id': data['device_id'],
'transaction_id': data['transaction_id'],
'ts': data['ts'],
})
redis.Redis().zadd(data.get('device_id'), {member: data['ts']})
Client().incrby(data['fraud_type'], 1)
GearsBuilder(reader='StreamReader', r=redis_conn, requirements=["redis", "redistimeseries"]).foreach(stream_handler).register('data_stream')
如前所述,由于 RedisGears 和 Redis Time Series 是模块,我们需要使用其各自软件包中提供的客户端。
我们使用 GearsRemoteBuilder 类构建 Gear。StreamReader 确保流中的每条新消息都会执行 stream_handler 函数。stream_handler 使用 zadd 将数据添加到有序集合中(此信息在 zcount 中用于识别 click_spam),并使用 Redis Time Series 模块的 incrby 增加正常和欺诈类型的时间序列计数,这随后用于可视化。
Gear 注册也可以在 RedisInsight 上查看。
最后,我们集成了 Flask 应用,该应用暴露了触发器的端点。
要点: https://gist.github.com/Sachin-Kottarathodi/2a6cccb29b4a9fdc7d58086af07aa6eb
from flask import Flask, request
from fraud_checks import FraudChecks
from setup import Setup
app = Flask(__name__)
@app.route('/', methods=['POST'])
def check_fraud():
try:
response = FraudChecks().check_fraud(request.get_json())
code = 200
except Exception as e:
print("Error occurred ", e)
response = str(e)
code = 500
return response, code
if __name__ == '__main__':
Setup().init()
app.run(port=5000, debug=False, host='0.0.0.0')
在这里,应用暴露在端口 5000。在启动服务器之前,会调用我们设置的 init 方法来注册 gear。该端点调用执行欺诈检查并返回响应的函数。
该应用使用 Python 编写,并暴露了一个接受少量参数的端点。使用以下命令调用该应用
$ curl --request POST 'localhost:5000' --header 'Content-Type: application/json' --data-raw '{
"device_id": "111-000-000",
"ip": "1.1.1.1",
"transaction_id": "3e4fad5fs"}'
clean
由于最初 Cuckoo Filter 中没有数据,所有 IP 都将被允许通过。要向 Cuckoo Filter 添加数据,请使用 cli 连接到 Redis 并运行命令
cf.addnx ip_cf 1.1.1.1
再次使用此 IP 运行 post 命令。这次,结果将是 ip_blacklist。
该应用配置为允许同一设备在 10 秒的窗口内发生两次事件。要验证,请在 10 秒内发出两次以上的 curl 请求,结果将是 click_spam
。
可选:可以在“docker run”命令期间配置以下变量。-e CLICK_SPAM_THRESHOLD=3 -e CLICK_SPAM_WINDOW_IN_SEC=10
在 Grafana 中看到欺诈检测结果是令人兴奋的。要实现此功能,请运行以下命令
$ docker run -d -e "GF_INSTALL_PLUGINS=redis-app" -p 3000:3000 grafana/grafana
将浏览器指向 https://<IP_ADDRESS>:3000。
使用用户名“admin”和密码“admin”登录,您可以在首次登录后重置密码。
点击左侧面板(配置)上的齿轮图标,然后选择数据源。
选择“添加数据源”。
搜索 Redis 并选择 Redis Data Source。
将 此处 的原始 JSON 内容复制并粘贴到“通过面板 JSON 导入”框中。点击 Load。
这将创建一个名为“欺诈统计”的仪表板。如果在导入仪表板时出错,请尝试更改仪表板的名称和 UUID。