dot 快速的未来正在您所在城市的一个活动中来临。

加入我们在 Redis 发布会上

使用 ByteWax 进行点击流聚合的 Redis 驱动的 Dataflow

利用 Bytewax 构建 Redis PubSub 以简化数据聚合

了解如何将 Redis PubSub 与 Bytewax 结合使用,以实现有效的实时数据处理。本指南深入探讨了创建自定义输入源和高效聚合点击流数据的见解。

在本指南中,我们将展示如何为 bytewax 编写一个自定义输入源,该源从一个 Redis pubsub 通道读取数据。所有代码都可以在 bytewax/example-redis 存储库中找到。

让我们首先看一下 Redis PUBSUB 通道。本质上,它是一种消息传递范式,而 Redis 以缓存系统而闻名,其功能远远不止于此(PUBSUB 通道只是您在数据基础设施中使用 Redis 集群的众多方式之一)。PUBSUB 通道功能自 2.8.0 版(于 2013 年 11 月发布)就已存在。Redis Streams 是一个较新的补充,于 2018 年在 Redis 5.0 中推出。这两者之间的主要区别在于传递保证。Streams 提供 最多一次 和 至少一次 语义,而 PUBSUB 只支持 最多一次。我发现 PUBSUB 是一个更普遍的功能,但在这篇指南之后,适应 Streams 应该很顺利。

简单的 redis-py 用法

假设我们从两个 Python 脚本开始 - 一个发布者和一个侦听器(可能由我们团队中的另一位开发人员传给我们)。这两个脚本都使用官方的 redis-py 库进行基本操作:连接和交换消息。

假设我们的消息是某种点击流。为简单起见,我们将从文件中读取这些消息,消息看起来像这样

{"click_id": "811320f1-3bc2-42b9-a841-5a1e5a812f2d", "os": "iPad; CPU iPad OS 4_2_1 like Mac OS X", "os_name": "iOS", "os_timezone": "Europe/Berlin", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "aperry@yahoo.com", "user_domain_id": "1ab06c9f-0e2e-4f46-9b6c-91a0e9a86a4d"}
{"click_id": "dd5a2b23-3357-4bba-857a-1afdb45f9144", "os": "Android 1.5", "os_name": "Android", "os_timezone": "America/Sao_Paulo", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "hessjuan@gmail.com", "user_domain_id": "e7a09617-3635-44b5-b119-8c1034865a9f"}
{"click_id": "811320f1-3bc2-42b9-a841-5a1e5a812f2d", "os": "iPad; CPU iPad OS 4_2_1 like Mac OS X", "os_name": "iOS", "os_timezone": "Europe/Berlin", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "aperry@yahoo.com", "user_domain_id": "1ab06c9f-0e2e-4f46-9b6c-91a0e9a86a4d"}
...

您可以在 存储库 中检查 events 文件

发布者脚本

下面的发布者脚本读取一个 .jsonl 文件,并将内容发布到 Redis 通道

# redis_publisher.py
import os
import pathlib

import redis  # pip install redis

# Configuration
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
CHANNEL_NAME = os.getenv('REDIS_CHANNEL_NAME', 'device_events')
JSONL_FILE = 'events.jsonl'

# Connect to Redis
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)

# Read the .jsonl file and publish it to the Redis channel
with pathlib.Path(JSONL_FILE).open() as file:
    for line in file:
        r.publish(CHANNEL_NAME, line)

print("👍 events published")

运行 Redis 实例的 docker 命令

docker run -d --rm -p 6379:6379 redis:latest

发布消息

python redis_publisher.py

当使用 JSONL 文件格式时,文件的每一行都是一个 json 对象,因此我们不需要在发布端进行任何处理,只需“按原样”发送消息。如果我们使用提供的命令运行容器,我们将使用默认连接设置,因此我们不需要提供任何环境变量,只需运行发布者脚本,不带任何参数(假设 events.jsonl 位于同一目录中)

侦听器脚本

侦听器脚本侦听一个通道并记录传入的消息。除了轮询机制之外,它相当简单。以下是此类脚本的示例

# redis_listener.py
import logging
import os

import redis

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
CHANNEL_NAME = os.getenv('REDIS_CHANNEL_NAME', 'device_events')

# Connect to Redis
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT)

# Function to handle the messages from the channel
def message_handler(message):
    data = message['data'].decode('utf-8')
    logger.info(f'Received data: {data}')

# Subscribe to the channel
pubsub = r.pubsub()
pubsub.subscribe(**{CHANNEL_NAME: message_handler})

# Listen for incoming messages
logger.info(f'Listening to channel: {CHANNEL_NAME}')
pubsub.run_in_thread(sleep_time=0.001)

运行此脚本,然后将消息发布到通道,应该输出类似这样的内容

INFO:__main__:Listening to channel: device_events
INFO:__main__:Received data: {"click_id": "811320f1-3bc2-42b9-a841-5a1e5a812f2d", "os": "iPad; CPU iPad OS 4_2_1 like Mac OS X", "os_name": "iOS", "os_timezone": "Europe/Berlin", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "aperry@yahoo.com", "user_domain_id": "1ab06c9f-0e2e-4f46-9b6c-91a0e9a86a4d"}

INFO:__main__:Received data: {"click_id": "dd5a2b23-3357-4bba-857a-1afdb45f9144", "os": "Android 1.5", "os_name": "Android", "os_timezone": "America/Sao_Paulo", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "hessjuan@gmail.com", "user_domain_id": "e7a09617-3635-44b5-b119-8c1034865a9f"}

...

这种方法虽然有效,但看起来很笨拙,尤其是在复杂性增加时。如果您有一些编写 Python 的经验,您可能已经发现关于此脚本的一些事情可能会让您感到困惑。消息处理程序使用 kwargs 参数注册,使用一个笨拙的 通道名称 -> 函数 字典,然后我们使用线程来运行侦听器。我不知道您怎么样,但对于我来说,使用 Python 中的线程从来都不是一件愉快的事情。此外,我总是会在我的代码中看到一个 sleep 时,开始质疑自己。而这一切仅仅是为了一个简单的回声式管道:如果我们要添加更多步骤,它可能会很容易地变成一个混乱的高度循环复杂回调函数。

值得一提的是,使用 redis-py 设置 pub/sub 有不同的方法, 其中一些比其他方法更优雅。根据您的上下文,您可以找到更合适的方案,但您仍然需要管理处理的复杂性。因此,为了获得更结构化和可扩展的管道,我们将转向 bytewax。

简单的 redis-py 管道概述

在继续之前,让我们回顾一下我们当前的小型回声管道的外观,并就一些术语达成一致

redis_simple_pipeline_overview_

我们将把“Redis 左侧的所有内容”称为 “生产者”,而“Redis 右侧的所有内容”称为 “消费者”。这只是一个约定,我们本可以选择“写入器”/“读取器”或“发布者”/“订阅者”。从技术上讲,如果您想吹毛求疵,这些术语之间有一些区别,但在本文的背景下,它们无关紧要。使用这些术语,我们应该补充说

  • 对于 生产者, events.jsonl 是 
  • 对于 生产者,Redis pubsub 通道“device_events”是 输出
  • 对于 消费者,Redis pubsub 通道“device_events”是 ,而标准输出是 输出

现在,当我们继续使用 bytewax 重写消费者时,左侧的所有内容将保持不变。当然,bytewax 也可用于生产者侧,但这将是另一篇博文的主题 🙂

Bytewax 管道

在使用新工具进行实验时,从小处着手总是好的。因此,我们将从使用 bytewax 复制我们的回声管道开始。一旦验证了基本功能,我们就可以始终添加更多计算步骤。幸运的是,由于我们已经拥有了功能正常的侦听器脚本,因此任务主要涉及以正确的方式将其集成到 bytewax 中。

创建自己的连接器的绝佳参考是内置的 KafkaSource。我们将命名为 RedisPubSubSource,它应该是 FixedPartitionedSource 的子类,确保每个分区都维护一个不同的 StatefulSourcePartition 对象,我们实际上还需要编写它。让我们深入研究一下。

Bytewax 的 Redis PubSub 源

消息处理的核心逻辑位于 StatefulSourcePartition 中,我们将在这里放置我们的通道处理代码

  • 源需要使用提供的连接属性连接到 Redis 集群
  • 然后,源订阅 pubsub 通道
  • 源的主要功能是以适合数据流的方式处理消息
  • 我们应该在执行流中的 Partition 对象之前或之后安排任何健全性检查。这有助于保持代码简单和流线型
class RedisPubSubPartition(StatefulSourcePartition):
    def __init__(self, redis_host, redis_port, channel):
        r = redis.Redis(host=redis_host, port=redis_port)
        self.pubsub = r.pubsub(ignore_subscribe_messages=True)
        self.pubsub.subscribe(channel)

    def next_batch(self, _sched):
        message = self.pubsub.get_message()
        if message is None:
            return []
        data = message['data']
        if isinstance(data, bytes):
            data = data.decode('utf-8')
        return [data]

    def snapshot(self):
        return None

    def close(self):
        self.pubsub.close()

请记住,这不是生产就绪的代码。在实际应用中,您可能希望为连接提供更多设置,并且需要解决其他细微差别。但对于我们的目的,这已经足够了。

让我们仔细看一下 next_batch 函数

    def next_batch(self, _sched):
        message = self.pubsub.get_message()
        if message is None:
            return []
        data = message['data']
        if isinstance(data, bytes):
            data = data.decode('utf-8')
        return [data]

这里,它首先从订阅的频道中检索消息。如果没有消息,它将返回一个空列表。当收到消息时,我们对其进行解码。这在 next_batch 中进行并不必要,但它减少了后续管道中的冗余。但是,您可能已经注意到,反序列化没有在这里包含。这是因为我们的目标是灵活地更改适合管道需求的序列化格式,而不使代码过于复杂。当然,这并不严格限制,对于具有稳定模式的源,在此时包含反序列化步骤可能很实用。或者,对类进行子类化或将序列化函数作为构造函数参数传递也是实现此目的的一种方式。

让我们概述为创建您自己的 bytewax StatefulSourcePartition 的两个指导原则。

  1. next_batch 方法中整合例行消息处理。
  1. 避免使此类的代码复杂化或过度专门化。此类任务应推迟到以后的“计算”步骤进行。

此外,除了 next_batch 之外,我们还添加了两个与快照机制和资源生命周期相关的辅助方法。虽然 KafkaSourcePartition 使用偏移量来创建快照,但 Redis PubSub 频道没有,因此我们在这里使用了一个占位符函数,它只返回 None。关闭连接需要调用 PubSub 的 .close() 方法。请注意,我们没有使用任何线程,总体而言,与我们原始侦听器的代码相比,到目前为止的代码更加简洁,更符合 Python 风格。

接下来,我们将概述 Source 类,它使用 Partition 并为我们的数据流创建一个输入(或者可以称之为“数据入口点”)。

用于 bytewax 的 Redis PubSub 源

Input 的主要目的是初始化我们之前编写的 PubSub 源,并使用来自环境的必要连接详细信息对其进行初始化。如果有必要进行完整性检查,将它们整合到此 Input 类的构造函数中将是最合乎逻辑的选择。

import os
import redis
from bytewax.inputs import FixedPartitionedSource, StatefulSourcePartition


class RedisPubSubPartition(StatefulSourcePartition):
    ...


class RedisPubSubSource(FixedPartitionedSource):
    def __init__(self):
        self.redis_host = os.getenv('REDIS_HOST', 'localhost')
        self.redis_port = int(os.getenv('REDIS_PORT', '6379'))
        self.channel_name = os.getenv('REDIS_CHANNEL_NAME', 'device_events')

    def list_parts(self):
        return ['single-part']

    def build_part(self, now, for_key, resume_state):
        return RedisPubSubPartition(
            self.redis_host,
            self.redis_port,
            self.channel_name,
        )

将 Input 代码与我们之前简单的侦听器脚本进行比较,可以明显看出,此部分与原始脚本的配置部分密切相关。至于消息处理,我们没有使用之前使用的直接回调,因为处理已委托给 RedisPubSubPartition。另外两个方法,list_partsbuild_part 值得多说几句。

Bytewax 的底层架构将分区输入视为一组独立的、独立的流。理想情况下,在构建管道时,您应该知道输入源中存在多少个数据分区,这对于任何数据处理管道都是有效的。对于大型数据处理,通常有多个分区,并且 bytewax 被设计为适应各种数据需求,因此它需要具有提供这种灵活性的 API。对于您作为开发人员而言,这基本上意味着您需要实现这两个方法,list_parts 用于获取分区列表,build_part 用于为每个分区构建一个独立的 Partition 对象。

因此,bytewax 的灵活性是以执行此额外仪式为代价的,即使您只处理一个分区也是如此。这也简化了几个内部考虑因素,例如快照恢复和恢复。将此仪式变为强制的决定是经过深思熟虑的,因为它以一种避免任何可能混淆用户(“显式优于隐式”)的“神奇自动化”的方式平衡了提供的 API。幸运的是,我们当前的方案只使用一个单通道分区,这使得我们的实现相对简单。

用于 Redis PubSub 频道的回声数据流

所有组件就位后,我们现在可以将它们组合起来,有效地重现我们之前的解决方案。

# An echo dataflow would consist of an input and an output to stdout
from bytewax.dataflow import Dataflow
from bytewax.connectors.stdio import StdOutSink
from bytewax import operators as op

# ... (definitions of RedisPubSubPartition, RedisPubSubSource)

flow = Dataflow('redis_echo')

stream = op.input('inp', RedisPubSubSource())
op.output('out', stream, StdOutSink())

完整的代码可在 示例存储库 中找到。我们目前的实现如下所示。

redis_dataflow_echo_overivew_69f222a6f2

从结构上讲,这反映了我们之前的实现,管道左侧保持不变。

您需要像往常一样运行数据流,使用 bytewax.run 帮助程序。

python -m bytewax.run ./src/dataflow_echo.py:flow

输出将保持不变,这很好,因为这是我们期望的结果。

{"click_id": "811320f1-3bc2-42b9-a841-5a1e5a812f2d", "os": "iPad; CPU iPad OS 4_2_1 like Mac OS X", "os_name": "iOS", "os_timezone": "Europe/Berlin", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "aperry@yahoo.com", "user_domain_id": "1ab06c9f-0e2e-4f46-9b6c-91a0e9a86a4d"}

{"click_id": "dd5a2b23-3357-4bba-857a-1afdb45f9144", "os": "Android 1.5", "os_name": "Android", "os_timezone": "America/Sao_Paulo", "device_type": "Mobile", "device_is_mobile": true, "user_custom_id": "hessjuan@gmail.com", "user_domain_id": "e7a09617-3635-44b5-b119-8c1034865a9f"}

将其容器化

在继续之前,让我们快速将设置容器化。这确保我们在隔离的环境中工作,也使重新运行管道的发布者部分变得更容易。

组合文件将包含我们的管道、生产者脚本和 bytewax 数据流。

version: '3.8'

services:
  redis:
    image: "redis:7.2.1"
    ports:
      - "6379:6379"
      
  simple-publisher:
    build:
      context: .
      dockerfile: docker/redis-simple.Dockerfile
    command: python /app/redis_publisher.py
    depends_on:
      - redis
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
      REDIS_CHANNEL_NAME: device_events
      EVENTS_JSONL_FILE: /data/events.jsonl
    volumes:
      - ./data:/data
      
  bytewax-echo:
    build:
      context: .
      dockerfile: docker/redis-bytewax.Dockerfile
    command: python -m bytewax.run /app/dataflow_echo.py:flow
    depends_on:
      - redis
    environment:
      REDIS_HOST: redis
      REDIS_PORT: 6379
      REDIS_CHANNEL_NAME: device_events

用于 bytewax 数据流 的 Dockerfile 安装 redis 和 bytewax 并复制必要的文件。用于 发布者脚本 的 Dockerfile 本质上相同,但没有 bytewax。

有了这种结构,我们就可以 100% 准备好开始使我们的数据流更加复杂,所以让我们开始吧!对于那些渴望直接获得最终结果的人来说,此示例的完整源代码可在 这里 找到。只需克隆存储库并执行 docker compose up 即可查看我们将在下一节中构建的数据流。

在 bytewax 数据流中统计移动用户

我们的 RedisPubSubSource 已准备就绪并经过测试,因此我们不再需要关注 Redis 用于任何我们可能感兴趣的数据处理。这使我们能够自由地使用任何预先存在的数据流步骤,只需进行最少的更改,或者当然,从头开始编写我们自己的步骤,将所需的运算串联起来。甚至可以将此 RedisPubSubSource 作为一项初步测试设置,其基础设施开销非常小,然后将管道转换为完整的 Kafka 系统。

为了展示 bytewax 的一些功能,让我们找出不同地区有多少移动用户。为此,我们需要执行以下步骤。

  • 首先,反序列化数据,将字符串转换为 Python 字典。
  • 其次,将数据转换为以 (REGION, 1 if mobile else 0) 表示的成对数据。我们将使用设备时区信息作为位置的代理。
  • 然后,我们将统计计数。一旦所有计数都聚合起来,我们就会对其进行一些调整,并将计数转换为以 {‘timezone’: Str, ‘num_mobile_users’: Int} 结构的字典。
  • 最后,输出将发送到 StdOutSink。

上述每个要点都大致对应于我们的数据流中的一个独立步骤。最终数据流的完整代码如下所示。

import json
from datetime import timedelta, datetime, timezone

from bytewax import operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.operators.window import SystemClockConfig, TumblingWindow
from bytewax.operators import window as window_op

from bytewax_redis_input import RedisPubSubSource


def deserialize(payload):
    try:
        data = json.loads(payload)
    except json.decoder.JSONDecodeError:
        return None
    return data


def initial_count(data):
    if data is None:
        return 'Uknown', 0
    return data['os_timezone'], int(data['device_is_mobile'])


def add(count1, count2):
    return count1 + count2


def jsonify(timezone__mobile_count):
    tz, count = timezone__mobile_count
    return {'timezone': tz, 'num_mobile_users': count}


clock_config = SystemClockConfig()
window_config = TumblingWindow(
    length=timedelta(seconds=5),
    align_to=datetime(2023, 1, 1, tzinfo=timezone.utc),
)


## build the dataflow

flow = Dataflow('dataflow_mobile_counts)

inp = op.input('inp', flow, RedisPubSubSource())

des = op.map('deserialize', inp, deserialize)
init_count = op.map('initial_count', des, initial_count)
reduce = window_op.reduce_window('sum', init_count, clock_config, window_config, add)
serialize = op.map('jsonify', reduce, jsonify)

op.output('out', serialize, StdOutSink())

这可是很多步骤!如果我们运行此数据流,应该会看到以下输出。

{'timezone': 'America/El_Salvador', 'num_mobile_users': 0}
{'timezone': 'Asia/Bangkok', 'num_mobile_users': 0}
{'timezone': 'Europe/Madrid', 'num_mobile_users': 45}
{'timezone': 'America/New_York', 'num_mobile_users': 37}
{'timezone': 'Europe/Berlin', 'num_mobile_users': 29}
{'timezone': 'Africa/Lubumbashi', 'num_mobile_users': 6}
...

现在,我们管道的完整图更加复杂了。

redis_dataflow_mobile_counts_overivew_d1536f7727

每个步骤都执行其特定功能,并且足够独立,可以轻松测试和调试。如果你需要了解 reduce_window 的工作原理,我建议你查看我们的 文档,因为这不是这里的重点。那么,你可能想知道,主要关注点是什么?我很高兴你问了!主要重点是突出显示 整个管道现在看起来多么 Pythonic 和简化 !每个步骤都易于阅读和修改;本质上,只有几行代码可以有效地将大量数据整理成一组计数,例如,可以提交到某个外部 BI 平台。

但不要只听我们说,我们鼓励你亲自探索和实验!你可以在 专用仓库中访问完整代码。随意添加更多步骤,调整管道逻辑,甚至考虑重写 RedisPubSubSource 类——无论你选择哪条路径,bytewax 都能满足你的需求。

结论

在这篇文章中,我们概述了如何使用 redis PubSub 通道作为示例编写自己的 bytewax 输入。我们从一个简单的回声示例过渡到一个窗口计算。这个示例可以轻松扩展到其他系统。

一些收获

  • bytewax 框架的目标是在过于神奇和黑盒之间找到最佳平衡点。一个明显的副作用是我们向 Python API 公开了某些机制,但我们努力使其尽可能地对用户友好(或者更确切地说, 对程序员友好)。
  • 当你编写自己的连接器的 Partition 类(在这里我们使用了 StatefulSourcePartition),将所有需要对消息进行的转换放在 next_batch 方法的代码中。将其他所有内容委托给数据流中的自定义步骤
  • 有效处理数据输入意味着要考虑你拥有的分区。 FixedPartitionedSource 类提供了一种简单的方法来处理 list_parts/build_part 组合。即使你最终使用单个分区或想要订阅多个频道并将每个频道视为一个单独的分区,你也需要指定它们。
  • 你可以使用 Redis Pubsub(实际上,也可以使用任何自定义连接器)作为数据流的调试输入。redis 容器非常小,你可以轻松地启动它并开始接受消息。它允许进行比常规文件读取更真实的测试,但它非常简单且灵活,可以在本地设置。由于 bytewax 为你提供的抽象,它使编写与源无关的数据流变得非常容易。

希望这篇文章能激发你编写自己的输入并揭开过程的神秘面纱。如果你有任何意见或建议,请在社区频道中联系我们,或在 bytewax/example-redis 仓库中提出问题。