点 快速的未来正来到你所在城市的一场活动中。

加入我们在 Redis Released

你可能会错误地理解 Redis Streams

下载教程:使用 Redis Streams 构建应用现在下载

Redis Streams 简介

Redis Streams是一种通过单例追加日志提供的超高速内存中抽象的数据类型。Redis Stream的主要优点是高效的消费者组,使消费者组可以从相同消息流的个别部分以唯一的方式获取消息,还能执行阻塞操作,使用户可以一直等待,直到有新数据添加到流中。在 5.0版本中发布后,Redis推出了一种创新方式,即在收集大量数据时管理流——Redis Streams。Redis Streams是一种数据结构,它除了其他功能外,还可以有效地管理数据获取并使用防故障数据将数据保存到消费者脱机,还能在多个生产者和消费者之间创建数据通道。它允许用户使用应用扩展消费者数量,在生产者和消费者之间启用异步通信,还可有效使用主存储器。最终,Redis Streams旨在满足消费者各种需求,从实时数据处理到历史数据访问,同时易于管理。

https://www.youtube.com/embed/JpeHIbzmGP4

我个人总是错误地描述流——我将其定义为“一系列散列表形式的元素,按时间排序,位于一个键下”。这是错误的。关于时间和键的最后一点没错,但第一点则完全错误。

我们来看看为何不理解流以及它们实际的行为。我们将评估这种误解的优缺点及其如何影响您的软件。最后,我们将研究一些不明显的模式,它们利用了Redis Streams鲜为人知的一些属性。

Redis Streams教程

近年来,数据处理发生了革命性的变化,这些变化带来了巨大的可能性。例如,如果我们考虑各种使用案例,从物联网和人工智能到用户活动监控、欺诈检测和金融科技——所有这些案例有什么共同点?它们都收集并处理大量数据,而且这些数据出现的速度很快。在处理完这些数据后,这些技术会将其交付给所有合适的数据消费者。

Redis Streams为用户提供了多种可能性,包括将这种新的数据结构集成到各种应用中的可能性。为了让用户更容易开始使用Redis Streams,我们撰写了几篇教程,以便帮助您开始使用

  1. 如何使用Redis Streams:在这篇文章中,我们将向您讲解使用Redis Stream的基础知识。我们将介绍如何将数据添加到流以及如何读取数据(全部、异步、在收到数据时等),以满足不同的消费者使用案例。我们希望本教程能帮助您理解Redis Streams中的数据流,以及如何获取或对流中的数据进行分区。
  2. 如何在 Redis Streams 中使用使用者群组: 本文将阐明如何在 Redis Streams 中使用使用者群组。使用者群组是一种将消息流分配给多个客户端的方法,以加快处理速度,或减轻较慢使用者负担;其目标是扩展数据使用流程。本教程不仅可以帮助你了解使用者群组的使用,还能帮助你了解如何读取、管理和使用 Redis Streames 数据、从应用程序故障中恢复,以及从待处理条目列表中删除已处理的消息。
  3. 如何构建 Redis Streams 应用程序: 本文将演示如何使用 Redis Streames 开发数据流处理应用程序的用例。教程将引导你了解 Redis Streams 应用程序建议的技术和设计组件,概述如何运行和测试此类应用程序,并解释如何验证流中的数据。

XADD 命令

首先,让我们了解 XADD 命令,因为这是产生误解的地方。该命令签名按 官方 redis.io 文档 中所述如下所示

XADD key ID field value [field value ...]

key不言自明。 id是新条目的时间戳/序列组合,但实际上,它几乎总是 * 表明自动生成。这种混乱的根源始于 field 和 value。如果你了解 HSET 的命令签名,你将会看到一个非常相似的模式

HSET key field value [field value ...]

这两个命令的签名只有一个参数不同,并且 XADD 中的参数几乎总是单个 *。看起来非常相似,使用相同的术语,应该是相同的,对吗?

好的。为继续排查问题,让我们搁置 Redis,了解编程语言如何处理键值对。在大多数情况下,无论语言如何,表达键值的最具表现力的方式是字段的集合(无重复),这些字段与值相关联 - 一些语言将保留字段的顺序,而另一些语言则不会。让我们通过一个小型的跨语言比较来深入了解

Language	Structure	Ordered fields	Allows repeats
JavaScript & JSON	Object	✗	✗
JavaScript	Map	✓	✗
Python*	Dict	✓	✗
PHP	Array	✓	✗
Java	HashMap	✗	✗
C#	Dictionary	✗	✗
* The CPython implementation has maintained order for dict key-value pairs since 3.6 and prior to this OrderedDict has been available.

这很好地映射到了 Redis hashes - 所有这些都可以表述哈希的属性,该属性是无序的且没有重复。PHP 数组、Python 字典和 JavaScript 映射可以定义字段顺序,但如果你正在使用 Redis 中的哈希,那么这并不重要,你只需要知道你不能依赖于应用程序级别的顺序。

对于大多数人而言,自然而然的结论是既然 HSET 和 XADD 的命令签名之间有相关性,那么它们可能也有类似的返回值相关性。事实上,在 RESP2 的协议级别,这两个都作为交错的 RESP 数组 返回。在 RESP3 的早期版本中也是如此,其中 HGETALL 和 XREAD 的响应都是映射(稍后会详细介绍)。

一个错误改变了我的想法

通常,我用 JavaScript 编码,偶尔用 Python。作为一名谈论 Redis 的人,我需要尽可能地接触到更多的人,而这两种语言都能得到很好的理解,所以大多数开发者都能理解这两种语言中展示或示例的代码。最近,我有机会在 PHP 会议上发表演讲,需要将一些现有的 Python 代码转换为 PHP。我在过去 20 年中时不时地用过 PHP,但它从未是我的兴趣所在。这个特定的展示不太适合 mod_php 风格的执行,所以我使用了 swoole 及其协程执行(顺便说一下,对于 JavaScript 世界来说,swoole 让 PHP 对我来说非常非常熟悉)。该程序库有点过时,需要发送原始 Redis 命令,而没有任何真正的客户端程序库协助以高级方式解码返回值。通常,发送原始 Redis 命令和解码结果会提供更多的控制,而且并不会带来负担。 

所以,在给 XADD 发送命令时,我会构建字段值部分,并且出现了个负一的错误(将此归结为在缺席多年后重新使用 PHP)。这导致了我无意中发送了一行类似于

XADD myKey * field0 value1 field0 value2 field2 value3

而不是发送关联的字段和值(field0value0,以此类推)。

在代码的后面,我将 XREAD 的结果放入现有 PHP 数组(关联数组)中,为每个字段分配一个键,并且跳过已经设置的任何内容。因此,我从如下所示的数组开始

array(1) {
  ["foo"]=>
  string(3) "bar"
}

并最终得到了如下所示的数组

array(3) {
  ["foo"]=>
  string(3) "bar"
  ["field0"]=>
  string(6) "value1"
  ["field2"]=>
  string(6) "value3"
}

我无法理解这是怎么发生的。我很快就能追查到 value1 被分配给 field0 的原因(我之前提到的 XADD 中的负一错误),但为什么没有将 field0 设置为 value2?在 HSET 中,添加字段的此行为基本上就是 upsert——如果字段存在则更新,否则添加字段并设置值。 

检查了 MONITOR 日志并使用这些值重复测试后,我运行了 XREAD,如下所示

> XREAD STREAMS myKey 0
1) 1) "myKey"
   2) 1) 1) "1592409586907-0"
         2) 1) "field0"
            2) "value1"
            3) "field0"
            4) "value2"
            5) "field2"
            6) "value3"

存在重复并且被记录下来,而不是进行 upsert 操作;另外,顺序是保留的。这和哈希表一点也不像! 

使用 JSON 作为近似来考虑这一点时,我认为流条目如下所示

{
  "field1"  : "value",
  "field2"  : "value",
  "field3"  : "value"
}

但它们实际上如下所示

[
  ["field1", "value"],
  ["field2", "value"],
  ["field3", "value"]
]

这意味着什么?

好消息

如果您有目前与流协作的代码并假设条目像哈希映射,那么您可能会做的很好。您只需要注意关于放入重复字段的潜在 bug,因为它们可能不会在给定应用程序中以您预期的方式表现。这可能不适用于每种情况或客户端库,但一个好的做法是提供不允许重复(见上文)的数据结构,并在将其提供给 XADD 时将其序列化为参数。输入和输出唯一字段。 

坏消息

并非所有客户端库和工具都正确无误。相对较低级别的客户端库(不详尽:node_redis、hiredis)远未对 Redis 输出更改为语言结构。其他更高级别的库确实将 Redis 的实际返回抽象成语言结构——您应该检查所选库是否这么做,如果确实如此,请提出问题。一些更高级别的库从一开始就做对了(stackExchange.redis),因此这里应给予表扬。

另一部分有点糟糕:如果您是 RESP3 的非常早期的采用者,您可能已经体验到 XREAD / XREADGROUP 返回 RESP3 映射类型。直到 4 月初,正在开发的 Redis 6 版本在读取流时令人困惑地返回带有重复的映射。值得庆幸的是,此问题已解决,通用版本 Redis 6——这是您第一次真正应当在生产中使用 RESP3——随 XREAD / XREADGROUP 的正确返回一同交付。

有趣的部分

由于我已详细讲解您对流的理解很可能出错,我们不妨想想您如何利用这种迄今为止被误解的结构。 

将语义含义应用于流条目中的顺序

因此,您实际上有这三个向量的在该模式中使用。假设存储用于矢量图形的路径。每个流条目将是唯一多边形或路径,字段和值将是坐标。例如,取 SVG 的这一片段

<polyline points="50,150 50,200 200,200 200,100">

这可以表述为

> XADD mySVG * 50 150 50 200 200 200 200 100

每个附加形状都在同一键上再加一个条目。如果尝试通过 Redis Hash 执行类似操作,您将只有两个坐标并且没有顺序保障。不可否认,您可以通过位字段执行此操作,但您将在长度和坐标大小方面损失大量灵活性。借助流,您甚至可能使用时间戳做一些巧妙的事来表示随着时间推移而出现的形状系列。

创建一个由已排序项目组成的按时间排序的集合

这需要一个小技巧,但可以提供很多功能。假设您保留类似数组的数据序列。从本质上讲,数组的数组——在 JSON 中您可以认为它是类似于

[
  ["A New Hope", "The Empire Strikes Back", "Return of the Jedi"],
  ["The Phantom Menace", "Attack of the Clones", "Revenge of the Sith"],
  ["The Force Awakens", "The Last Jedi", "The Rise of Skywalker"]
]

你可以将其表述为一系列 Stream 项,但需注意一个细微差别:你必须确保内嵌列表的(伪)元素个数不是奇数。如果它们是奇数,如上,你将需要用某种方法记录这一事实——以下是我的做法,即使用空字符串

> XADD starwars * "A New Hope" "The Empire Strikes Back" "Return of the Jedi" ""
"1592427370458-0"
> XADD starwars * "The Phantom Menace" "Attack of the Clones" "Revenge of the Sith" ""
"1592427393492-0"
> XADD starwars * "The Force Awakens" "The Last Jedi" "The Rise of Skywalker" ""
"1592427414475-0"
> XREAD streams starwars 0
1# "starwars" => 
   1) 1) "1592427370458-0"
      2) 1) "A New Hope"
         2) "The Empire Strikes Back"
         3) "Return of the Jedi"
         4) ""
   2) 1) "1592427393492-0"
      2) 1) "The Phantom Menace"
         2) "Attack of the Clones"
         3) "Revenge of the Sith"
         4) ""
   3) 1) "1592427414475-0"
      2) 1) "The Force Awakens"
         2) "The Last Jedi"
         3) "The Rise of Skywalker"
         4) ""

此模式的优势在于过滤出任何长度为 0 的字符串时只需损失少量(次要)开销。

用作分页缓存的 Stream

你经常会遇到一个棘手的问题,即网站(电子商务、留言板等)上的项目列表。这通常是缓存的,但我看到有人拼命想找到缓存此类型数据的最佳方法——是将整个结果集缓存到类似有序集这样的东西中,并通过 ZRANGE 向外分页,还是将完整的页面存储为字符串键?这两种方法各有优缺点。

事实证明,Stream 确实适用于此。例如,以电子商务列表来说。你有一系列项目,每个项目都有一个 ID。这些项目以有限的某种序列进行排列,通常有一个倒序 (A-Z、Z-A、低到高、高到低、最高到最低、最低到最高等)。

要在 Stream 中建模此类型的数据,你需要确定一个特定的“块”大小并将其作为一个项。为什么使用块而不是将其中的整个结果页面作为项?这能让你在分页中具有不同大小的页面(例如,每页 10 个项目可由每页 5 个的 2 个块组成,而每页 25 个项目可由每页 5 个的 5 个块组成)。每个项都将包含映射到产品 ID 的字段,并且值将为产品数据。请查看这个块大小人为较小的简化示例

想要检索缓存值时,你可以运行 XRANGE,并将其 COUNT 参数设置为构成你的界面结果页面的块的数量。因此,如果要获取四个项目的第一个页面,你可以运行

> XRANGE listcache - + COUNT 2
1) 1) "0-1"
   2) 1) "123"
      2) "{ \"Red Belt\", ... }"
      3) "456"
      4) "{ \"Yellow Belt\", ... }"
2) 1) "0-2"
   2) 1) "789"
      2) "{ \"Blue Belt\", ... }"
      3) "012"
      4) "{ \"Purple Belt\", ... }"

要获取 4 个项目的第二个页面,你需要提供一个递增了 1 的较小界限 Stream ID,在本例中,较小界限将是 0-2

> XRANGE listcache 0-3 + COUNT 2
1) 1) "0-3"
   2) 1) "345"
      2) "{ \"Black Belt\", ... }"
      3) "678"
      4) "{ \"Red Boa\", ... }"
2) 1) "0-4"
   2) 1) "901"
      2) "{ \"Yellow Boa\", ... }"
      3) "234"
      4) "{ \"Green Belt\", ... }"

这确实比有序集或列表提供了计算复杂度优势,因为 XRANGE 在此用例中实际上为 O(1),但也有一些事项需要记住

  • XREVRANGE 可用于倒转,但此操作只会倒转“块”的顺序。在每个块的内部,你需要在应用程序逻辑中反转顺序,这应该相对简单。
  • 如果你手动线性地设置 Stream ID,则在列表的不同部分间进行查找是“免费”的,因此块 1 的 Stream ID 为 0-1,块 2 的 Stream ID 为 0-2,依此类推。请注意你必须从 1 开始进行此操作,而不能从 0 开始,因为你无法在 0-0 处添加 Stream 项

您可以像使用任何键一样,通过到期时间来管理 Stream 停留的时间。有关如何执行此操作的示例,请参见 stream-row-cache

我希望本文为您提供了有关 Stream 真正工作原理的一些其他背景,以及如何在应用程序中利用这些在 Stream 中鲜为人知属性。