dot Redis 8 已发布——而且它是开源的

了解更多

你可能对 Redis Streams 的理解有误

立即下载教程:如何使用 Redis Streams 构建应用

Redis Streams 简介

Redis Streams 是一种数据类型,它提供了超快的内存抽象,相当于一个只追加日志。Redis Streams 的主要优势在于其高效的消费者组,允许一组消费者独特地从同一消息流的不同部分进行消费,以及阻塞操作,允许消费者等待直到新的数据被添加到流中。随着版本 5.0 的发布,Redis 推出了一种创新方式来管理大量数据收集时的流——Redis Streams。Redis Streams 是一种数据结构,除了其他功能外,可以有效地管理数据消费,在消费者离线时通过数据故障保护持久化数据,并在多个生产者和消费者之间创建数据通道。它允许用户扩展使用应用的消费者数量,实现生产者和消费者之间的异步通信,并高效地使用主内存。最终,Redis Streams 的设计旨在满足消费者多样化的需求,从实时数据处理到历史数据访问,同时保持易于管理。

https://www.youtube.com/embed/JpeHIbzmGP4

我个人犯过错误,用错误的方式描述了 Streams——我将其定义为“一系列类似 hashmap 的元素,按时间排序,都在一个键下”。这是不正确的。关于时间和键的后半部分是正确的,但前半部分完全错了。 

让我们来看看为什么 Streams 会被误解以及它们实际的行为方式。我们将评估这种误解的利弊,以及它可能如何影响你的软件。最后,我们将探讨一些利用 Redis Streams 不为人知的属性的非显而易见的模式。 

Redis Streams 教程

数据处理近年来发生了革命性的变化,这些变化带来了巨大的可能性。例如,如果我们考虑各种用例——从物联网和人工智能到用户活动监控、欺诈检测和金融科技——这些用例有什么共同点?它们都收集和处理大量数据,并且这些数据以高速度到达。处理这些数据后,这些技术再将它们交付给所有适当的数据消费者。

Redis Streams 为用户提供了多种可能性,包括将这种新的数据结构集成到各种应用中。为了让用户更容易开始使用 Redis Streams,我们编写了一些教程来帮助您入门

  1. 如何使用 Redis Streams:在本文中,我们将引导您了解使用 Redis Streams 的基础知识。我们将介绍如何向流中添加数据,以及如何读取这些数据(一次全部读取、异步读取、到达时读取等),以满足不同的消费者用例。我们希望本教程能帮助您了解 Redis Streams 中的数据流,以及如何从流中消费或分区数据。
  2. 如何在 Redis Streams 中使用消费者组:在本文中,我们解释了如何在 Redis Streams 中使用消费者组。消费者组是一种在多个客户端之间分发消息流的方法,以加快处理速度或减轻较慢消费者的负载;其目的是扩展您的数据消费过程。本教程不仅可以帮助您理解消费者组的使用,还可以帮助您了解如何读取、管理和消费 Redis Streams 数据,从应用故障中恢复,以及从待处理条目列表中删除已处理的消息。
  3. 如何构建一个 Redis Streams 应用:在本文中,我们将演示一个使用 Redis Streams 开发数据流处理应用的用例。本教程将引导您了解 Redis Streams 应用的推荐技术和设计组件,概述如何运行和测试此类应用,并解释如何验证流中的数据。

XADD 命令

首先,让我们看看 XADD 命令,这是误解的开始。在官方 redis.io 文档中,该命令的签名如下所示

XADD key ID field value [field value ...]

key 不言自明。id 是新条目的时间戳/序列组合,但实际上,它几乎总是 * 表示自动生成。这种混淆的根源始于 field 和 value。如果您查看 HSET 命令的签名,您会看到一个非常相似的模式

HSET key field value [field value ...]

这两个命令的签名只差一个参数,而且 XADD 中的那个参数几乎总是单个 *。看起来很相似,使用了相同的术语,所以肯定是一样的,对吧?

好的。为了继续讨论这个问题,让我们放下 Redis,看看编程语言如何处理字段-值对。在大多数情况下,无论何种语言,表达字段-值对最具代表性的方式是使用一组字段(无重复)与值相关联——有些语言会保留字段的顺序,有些则不会。让我们通过一个小型的跨语言比较来深入了解

Language	Structure	Ordered fields	Allows repeats
JavaScript & JSON	Object	✗	✗
JavaScript	Map	✓	✗
Python*	Dict	✓	✗
PHP	Array	✓	✗
Java	HashMap	✗	✗
C#	Dictionary	✗	✗
* The CPython implementation has maintained order for dict key-value pairs since 3.6 and prior to this OrderedDict has been available.

这与 Redis 的 hash 很好地对应——所有这些都可以表达 hash 的属性,即无序且无重复。PHP 数组、Python 字典和 JavaScript Map 可以定义字段顺序,但如果您在 Redis 中使用 hash,那么这一点无关紧要,您只需要知道在应用层面不能依赖这种顺序。

对大多数人来说,自然的结论是,既然 HSET 和 XADD 的命令签名有关联,那么它们的返回也可能有关联。事实上,在 RESP2 的协议层面,两者都作为交错的RESP Arrays返回。在早期版本的 RESP3 中,HGETALL 和 XREAD 的响应都是 map(稍后会详细介绍),这一点也得到了延续。

一个 bug 改变了我的想法

通常,我使用 JavaScript 编程,偶尔使用 Python。作为一个传达 Redis 信息的人,我需要触及尽可能多的人,而这两种语言被很好地理解,所以用其中任何一种进行演示或示例代码都会被很大比例的开发者理解。最近,我有机会在 PHP 会议上发言,需要将一些现有的 Python 代码转换为 PHP。我断断续续地使用 PHP 接近 20 年了,但它从未成为我的热情所在。特定的演示不太适合 mod_php 风格的执行,所以我使用了 swoole 及其协程执行(顺便说一句,对于熟悉 JavaScript 世界的人来说,swoole 让 PHP 对我来说非常非常熟悉)。这个库有点过时,需要发送原始的 Redis 命令,没有真正的客户端库辅助解码返回的高级方式。一般来说,发送原始 Redis 命令并解码结果可以提供更多的控制权,而且并不繁琐。 

所以,在发送 XADD 命令时,我正在构建字段-值部分,并且出现了一个 off-by-one 错误(这要归咎于我在多年未使用 PHP 后重新投入其中)。这导致我无意中发送了类似以下内容

XADD myKey * field0 value1 field0 value2 field2 value3

而不是发送相关的字段和值(field0 对应 value0,等等)。

在代码后面,我在一个循环中将 XREAD 的结果放入一个现有的 PHP 数组(它是关联数组)中,将每个字段作为键,对应每个值,并跳过任何已经设置的内容。所以,我开始时的数组是这样的

array(1) {
  ["foo"]=>
  string(3) "bar"
}

结果数组变成这样

array(3) {
  ["foo"]=>
  string(3) "bar"
  ["field0"]=>
  string(6) "value1"
  ["field2"]=>
  string(6) "value3"
}

我无法理解这是怎么可能的。我很快就找到了 value1 被赋给 field0 的 bug(上面提到的 XADD 中的 off-by-one 错误),但为什么 field0 没有被设置为 value2 呢?在 HSET 中,添加字段的行为基本上是 upsert——如果字段存在则更新,否则添加字段并设置值。 

检查 MONITOR 日志并重放这些值,我运行 XREAD 如下

> XREAD STREAMS myKey 0
1) 1) "myKey"
   2) 1) 1) "1592409586907-0"
         2) 1) "field0"
            2) "value1"
            3) "field0"
            4) "value2"
            5) "field2"
            6) "value3"

重复的字段是存在的并被记录了下来,而不是 upserted;此外,顺序也被保留了这和 hash 完全不同! 

用 JSON 近似来思考这个问题,我原以为 Stream 条目看起来像这样

{
  "field1"  : "value",
  "field2"  : "value",
  "field3"  : "value"
}

但它们实际上看起来像这样

[
  ["field1", "value"],
  ["field2", "value"],
  ["field3", "value"]
]

这意味着什么?

好消息

如果你目前使用 Streams 的代码假定条目是像 hash map 一样的,你可能没问题。你只需要注意潜在的关于重复字段的 bug,因为它们在特定应用中的行为可能与你预期的不同。这可能不适用于所有情况或客户端库,但一个好的做法是提供一个不允许重复的数据结构(见上文),并在提供给 XADD 时将其序列化为参数。输入唯一的字段,输出唯一的字段。 

坏消息

并非所有客户端库和工具都能正确处理。相对低级的客户端库(非穷举:node_redis、hiredis)在将 Redis 的输出转换为语言构造方面做得不多。其他更高级的库确实将 Redis 的实际返回抽象为语言构造——你应该检查你选择的库是否这样做,如果这样做就提交一个 issue。一些更高级的库一开始就做对了(stackexchange.redis),对此应予赞扬。

另一个有点糟糕的部分是:如果你是 RESP3 的早期采用者,你可能经历过 XREAD / XREADGROUP 返回 RESP3 map 类型。直到四月初,开发中的 Redis 6 版本在读取 Streams 时令人困惑地返回带有重复字段的 map。幸运的是,这个问题得到了解决,并且 Redis 6 的 GA 版本——你应该真正开始在生产环境中使用 RESP3 的第一个版本——在 XREAD / XREADGROUP 的返回中包含了正确的形式。

有趣的部分

既然我已经讲了你可能对 Streams 有误解,那么让我们花点时间思考一下如何利用 Streams 的这些迄今为止被误解的属性。 

为 Stream 条目中的顺序赋予语义含义

所以,在这个模式中,你实际上有三个向量可以玩。想象一下存储矢量图形的路径。每个 Stream 条目将是一个独特的 polygon 或 path,字段和值将是坐标。例如,看这段 SVG 片段

<polyline points="50,150 50,200 200,200 200,100">

这可以表示为

> XADD mySVG * 50 150 50 200 200 200 200 100

每个额外的形状将是同一键上的另一个条目。如果你试图用 Redis Hash 做类似的事情,你只有两个坐标,而且没有顺序保证。诚然,你可以用 bitfields 等方法实现,但你会失去大量的关于长度和坐标大小的灵活性。使用 Streams,你甚至可以利用时间戳来表示一系列随时间出现的形状,这可能会很有趣。

创建按时间排序的序列化项目集

这个需要一个小小的技巧,但可以提供很多功能。想象一下你正在维护一个类似数组的数据序列。实际上,一个数组的数组——在 JSON 中你可以这样想

[
  ["A New Hope", "The Empire Strikes Back", "Return of the Jedi"],
  ["The Phantom Menace", "Attack of the Clones", "Revenge of the Sith"],
  ["The Force Awakens", "The Last Jedi", "The Rise of Skywalker"]
]

你可以将其表示为一系列 Stream 条目,只有一个小小的细微之处:你需要确保内部列表的(伪)元素数量不是奇数。如果它们是奇数,如上所示,你需要以某种方式记录这一点——我在这里用一个空字符串来记录

> XADD starwars * "A New Hope" "The Empire Strikes Back" "Return of the Jedi" ""
"1592427370458-0"
> XADD starwars * "The Phantom Menace" "Attack of the Clones" "Revenge of the Sith" ""
"1592427393492-0"
> XADD starwars * "The Force Awakens" "The Last Jedi" "The Rise of Skywalker" ""
"1592427414475-0"
> XREAD streams starwars 0
1# "starwars" => 
   1) 1) "1592427370458-0"
      2) 1) "A New Hope"
         2) "The Empire Strikes Back"
         3) "Return of the Jedi"
         4) ""
   2) 1) "1592427393492-0"
      2) 1) "The Phantom Menace"
         2) "Attack of the Clones"
         3) "Revenge of the Sith"
         4) ""
   3) 1) "1592427414475-0"
      2) 1) "The Force Awakens"
         2) "The Last Jedi"
         3) "The Rise of Skywalker"
         4) ""

这种模式下,你在过滤掉任何长度为 0 的字符串的(微小)代价下获得了很多好处。

将 Streams 用作分页缓存

一个你经常看到的棘手问题是网站上的项目列表(电子商务、留言板等)。这通常会被缓存,但我看到人们为了找到缓存此类数据的最佳方法而焦头烂额——你是将整个结果集缓存到像 sorted set 中然后用 ZRANGE 分页,还是将完整的页面存储到字符串键中?这两种方法各有优缺点。 

事实证明,Streams 确实适用于这种情况。例如,以电子商务列表为例。你有一系列项目,每个都有一个 ID。这些项目按有限的几种排序方式列出,通常有反向排序(A-Z,Z-A,低到高,高到低,最高评级到最低评级,最低评级到最高评级等)。

要将这种类型的数据建模为 Stream,你需要确定一个特定的“块”大小,并将其作为一个条目。为什么是块而不是整个结果页作为条目?这允许你在分页中有不同大小的页面(例如,每页 10 个项目可以由 2 个大小为 5 的块组成,而每页 25 个则由 5 个大小为 5 的块组成)。每个条目将包含映射到产品 ID 的字段,值则是产品数据。看这个简化示例,块大小人为设置得很小

当你想检索缓存的值时,你可以运行带有 COUNT 参数的 XRANGE 命令,将 COUNT 设置为你界面一个结果页所需的块数。所以,如果你想获取第一个包含四个项目的页面,你可以运行

> XRANGE listcache - + COUNT 2
1) 1) "0-1"
   2) 1) "123"
      2) "{ \"Red Belt\", ... }"
      3) "456"
      4) "{ \"Yellow Belt\", ... }"
2) 1) "0-2"
   2) 1) "789"
      2) "{ \"Blue Belt\", ... }"
      3) "012"
      4) "{ \"Purple Belt\", ... }"

要获取第二个包含 4 个项目的页面,你需要提供一个下界 Stream ID,并将其递增 1。在我们的例子中,下界将是 0-2

> XRANGE listcache 0-3 + COUNT 2
1) 1) "0-3"
   2) 1) "345"
      2) "{ \"Black Belt\", ... }"
      3) "678"
      4) "{ \"Red Boa\", ... }"
2) 1) "0-4"
   2) 1) "901"
      2) "{ \"Yellow Boa\", ... }"
      3) "234"
      4) "{ \"Green Belt\", ... }"

这种用法在计算复杂度上比 Sorted Sets 或 Lists 有优势,因为 XRANGE 在这种用法下实际上是 O(1),但有几点需要记住

  • XREVRANGE 可以用于反向排序,但这只会反转“块”的顺序。在每个块内部,你需要在应用逻辑中反转顺序,这相对来说应该很简单。
  • 如果你手动线性设置 Stream ID,跳转到列表的不同部分是“免费”的。例如,块 1 是 Stream ID 0-1,块 2 是 Stream ID 0-2 等等。注意,你必须从 1 开始而不是 0,因为你不能在 0-0 添加 Stream 条目。

像任何键一样,你可以使用过期时间来管理 Stream 存在多久。这方面的一个示例可以在 stream-row-cache 中找到。

希望这篇文章能为你提供一些额外的背景,了解 Streams 的实际工作原理以及如何在你的应用中利用 Streams 的这些鲜为人知的属性。