假设您的广告产生了大量流量,但您没有从广告支出中看到预期的效果。这可能不是巧合——诈骗者经常尝试通过各种复杂的机制窃取数字广告营销预算。伪造点击可以让广告看起来像真实用户与之互动,但实际上当这些伪造点击驱动安装时,安装成本会进入诈骗者的口袋。随着公司愿意在数字广告上投入更多资金,广告市场中的诈骗者数量也在增加。
这篇博文将演示实时欺诈检测工作原理的简化用例——以便您了解如何领先于诈骗者。
以下是我们使用的内容
您可以按照 https://docs.docker.net.cn/get-docker/ 在您的本地系统上安装 Docker。
您需要在本地计算机上运行一个 Redis 服务器。您可以使用以下 CLI 在本地计算机上启动 Redis 服务器并使用 RedisGears。
$ docker run -d -p 6379:6379 redislabs/redismod
该命令将从 redis docker 存储库中拉取镜像并启动 Redis 服务器,其中包含所有必需的模块,并且日志将以如下方式结束。
$ git clone https://github.com/redis-developer/redis-datasets.git
更改目录到 fraud-detection
$ cd redis-datasets/use-cases/fraud-detection
代码位于 use-cases/fraud-detection 中。该应用程序已使用必要的软件包(包括 redis 模块的客户端软件包)进行 docker 化。
使用以下命令创建镜像
$ docker build -t redis-fraud:latest .
使用以下命令创建容器
$ docker run -e REDIS_HOST='<host>' -e REDIS_PORT=6379 -p 5000:5000 -d redis-fraud
您将获得容器 ID,该 ID 可用于尾随应用程序日志。
$ docker logs -f <container-id>
如果您使用 redismod 镜像在本地运行 Redis,请提供主机计算机的 IP(而不是 localhost 或 127.0.0.1)。
让我们看一下本项目中的连接是如何管理的。
import os
import redis
from redisbloom.client import Client
from singleton_decorator import singleton
@singleton
class RedisConn:
def __init__(self):
host = os.getenv("REDIS_HOST")
port = os.getenv("REDIS_PORT")
if not host or not port:
raise Exception("No Redis Host or Port provided. Please provide Host and Port in docker run command as env")
port = int(port)
self.redis_client = redis.Redis(host=host, port=port)
self.bloom_client = Client(host=host, port=port)
def redis(self):
return self.redis_client
def bloom(self):
return self.bloom_client
在第 2 行,我们导入 redis 包以进行打包。所有核心 Redis 命令都可以在此 Redis 包中找到。
在第 4 行,我们导入 RedisBloom 包。由于 RedisBloom 是一个模块,因此与该模块交互的客户端也不同。我们将在下面看到更多这样的示例。singleton_decorator 确保仅创建此连接类的单个实例,并且 os 包用于读取环境变量以形成连接。
现在让我们看一下我们如何使用 Redis 来解决点击垃圾邮件和 IP 欺诈。
代码片段: https://gist.github.com/Sachin-Kottarathodi/c3a0647d3fdd0fe8a76425e0594e11c5
def ip_fraud(self, data):
exists = RedisConn().bloom().cfExists(Constants.IP_CUCKOO_FILTER_NAME, data['ip'])
if exists:
data['fraud_type'] = Constants.IP_BLACKLIST
data['status'] = Constants.FRAUD
return exists
def click_spam(self, data):
is_click_spammed = False
count = RedisConn().redis().zcount(data.get('device_id'), data['ts'] - self.click_spam_window_in_sec, data['ts'])
if count >= self.click_spam_threshold:
is_click_spammed = True
data['fraud_type'] = Constants.CLICK_SPAM
data['status'] = Constants.FRAUD
return is_click_spammed
def publish(self, data):
RedisConn().redis().xadd(Constants.STREAM_NAME, data, id='*')
在上面的代码中,Cuckoo Filter 用于查找被列入黑名单的 IP 欺诈。Cuckoo Filter 是一种概率数据结构,它是 Redis Stack 的一部分。使用 bloom 客户端提供的 cfExists 方法检查 IP 在 Cuckoo Filter 中是否存在。
Cuckoo Filter 会返回假阳性。要配置错误率,可以使用 cf.reserve
命令来创建过滤器,并可以提供自定义桶大小。
为了识别点击垃圾邮件,我们使用 redis 包中提供的排序集的 zcount 方法。使用 zcount,我们可以在预先配置的时间窗口内找到来自设备的点击次数。如果收到的计数超过某个阈值,则我们将它识别为异常。
最后,使用 xadd 命令将数据推送到 Redistream。id=’*’ 指示 Redistream 为我们的消息生成一个唯一的 ID。
当应用程序出现时,会注册一个齿轮,它对我们用于推送数据的流做出反应。
代码片段:https://gist.github.com/Sachin-Kottarathodi/f9dac7a3342a3643e792e2143a6adf7d
from gearsclient import GearsRemoteBuilder as GearsBuilder
from redistimeseries.client import Client
def stream_handler(item):
data = item['value']
member = json.dumps(
{'device_id': data['device_id'],
'transaction_id': data['transaction_id'],
'ts': data['ts'],
})
redis.Redis().zadd(data.get('device_id'), {member: data['ts']})
Client().incrby(data['fraud_type'], 1)
GearsBuilder(reader='StreamReader', r=redis_conn, requirements=["redis", "redistimeseries"]).foreach(stream_handler).register('data_stream')
如前所述,由于 RedisGears 和 Redis 时间序列是模块,因此我们需要使用其各自软件包中提供的客户端。
我们使用 GearsRemoteBuilder 类构建 Gear。StreamReader 确保 stream_handler 函数对来自流的每条新消息执行。stream_handler 使用 zadd 将数据添加到排序集(此信息用于在 zcount 中识别 click_spam),并使用 Redis 时间序列模块的 incrby 增加 clean 和 fraud 类型的计数,该模块稍后用于可视化。
也可以在 RedisInsight 上检查 Gear 注册。
最后,我们将 flask 应用程序纳入其中,该应用程序公开用于触发的端点。
代码片段: https://gist.github.com/Sachin-Kottarathodi/2a6cccb29b4a9fdc7d58086af07aa6eb
from flask import Flask, request
from fraud_checks import FraudChecks
from setup import Setup
app = Flask(__name__)
@app.route('/', methods=['POST'])
def check_fraud():
try:
response = FraudChecks().check_fraud(request.get_json())
code = 200
except Exception as e:
print("Error occurred ", e)
response = str(e)
code = 500
return response, code
if __name__ == '__main__':
Setup().init()
app.run(port=5000, debug=False, host='0.0.0.0')
在这里,应用程序在端口 5000 上公开。在启动服务器之前,会调用我们的 setup 的 init 方法来注册 gear。端点调用执行欺诈检查的函数并返回响应。
该应用程序使用 python 编写,并公开了一个接受一些参数的端点。使用以下命令调用该应用程序
$ curl --request POST 'localhost:5000' --header 'Content-Type: application/json' --data-raw '{
"device_id": "111-000-000",
"ip": "1.1.1.1",
"transaction_id": "3e4fad5fs"}'
clean
由于最初 Cuckoo Filter 中没有数据,因此将允许所有 IP 通过。要将数据添加到 Cuckoo Filter,请使用 cli 连接到 Redis 并运行以下命令
cf.addnx ip_cf 1.1.1.1
再次使用此 IP 运行 post 命令。这次,结果将是 ip_blacklist。
该应用程序配置为允许在 10 秒内从同一设备发出两次事件。为了验证,在 10 秒内进行两次以上的 curl 请求,结果将是 click_spam
.
可选:可以在 ‘docker run’ 命令中配置以下变量。 -e CLICK_SPAM_THRESHOLD=3 -e CLICK_SPAM_WINDOW_IN_SEC=10
在 Grafana 中看到欺诈检测结果真是令人兴奋。要实现这一点,请运行以下命令
$ docker run -d -e "GF_INSTALL_PLUGINS=redis-app" -p 3000:3000 grafana/grafana
将浏览器指向 https://<IP_ADDRESS>:3000。
以“admin”身份登录,密码为“admin”,您可以在首次登录后重置密码。
单击左侧面板(配置)上的齿轮图标,然后选择数据源。
选择“添加数据源”。
搜索 Redis 并选择 Redis 数据源。
从 这里 复制粘贴原始 JSON 内容到“通过面板 JSON 导入”框中。单击“加载”。
这将创建一个名为“欺诈统计”的仪表盘。如果您在导入仪表盘时遇到错误,请尝试更改仪表盘的名称和 UUID。