XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- 可用版本
- Redis 开源版 5.0.0
- 时间复杂度
- ACL 类别
-
@read
,@stream
,@slow
,@blocking
,
从一个或多个流中读取数据,仅返回 ID 大于调用者报告的最后一个收到的 ID 的条目。该命令有一个选项可以在项目不可用时阻塞,类似于 BRPOP
或 BZPOPMIN
等命令。
请注意,在阅读本页之前,如果您是 Redis Streams 的新手,我们建议您先阅读 Redis Streams 简介。
非阻塞用法
如果不使用 BLOCK 选项,该命令是同步的,并且可以认为与 XRANGE
有些相关:它将返回流中的一系列项目,但是与 XRANGE
相比,即使仅考虑同步用法,它也有两个根本区别
- 如果要同时从多个键读取数据,可以使用多个流调用此命令。这是
XREAD
的一个关键特性,因为特别是在使用 BLOCK 阻塞时,能够使用单个连接监听多个键是一项重要功能。 - 虽然
XRANGE
返回指定 ID 范围内的项目,而XREAD
更适合于从大于迄今为止所见任何其他条目的第一个条目开始消费流。因此,我们传递给XREAD
的是每个流中我们收到的最后一个元素的 ID。
例如,如果我有两个流 mystream
和 writers
,并且想从这两个流中读取它们包含的第一个元素开始的数据,可以使用如下示例调用 XREAD
。
注意:示例中使用了 COUNT 选项,因此对于每个流,调用最多将返回两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
STREAMS 选项是强制性的,并且必须是最后一个选项,因为该选项接受可变长度的参数,格式如下
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
因此,我们首先列出键,然后紧跟着所有关联的 ID,这些 ID 代表我们为该流收到的最后一个 ID,以便调用仅从同一流中返回更大的 ID。
例如,在上面的示例中,我们为流 mystream
收到的最后一个项目的 ID 是 1526999352406-0
,而为流 writers
收到的 ID 是 1526985685298-0
。
要继续遍历这两个流,我会调用
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morrison"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
以此类推。最终,调用将不会返回任何项目,而只是一个空数组,此时我们知道无法从流中获取更多内容(并且必须重试操作,因此此命令也支持阻塞模式)。
不完整的 ID
使用不完整的 ID 是有效的,就像 XRANGE
一样。但这里 ID 的序列部分如果缺失,总是被解释为零,所以命令
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻塞读取数据
在其同步形式中,只要有更多可用项目,该命令就可以获取新数据。然而,在某些时候,我们将不得不等待数据生产者使用 XADD
将新条目推入我们正在消费的流中。为了避免以固定或自适应间隔进行轮询,该命令能够在无法根据指定的流和 ID 返回任何数据时阻塞,并在请求的键之一接受数据后自动解除阻塞。
重要的是要理解,此命令会将数据扇出给所有正在等待相同 ID 范围的客户端,因此每个消费者都将获得一份数据副本,这与使用阻塞列表弹出操作时发生的情况不同。
为了进行阻塞,使用 BLOCK 选项,并指定在超时前我们希望阻塞的毫秒数。通常 Redis 阻塞命令以秒为单位设置超时,但此命令接受毫秒级的超时,尽管通常服务器的超时分辨率接近 0.1 秒。在这种情况下,可以在某些用例中阻塞更短的时间,并且如果服务器内部随着时间的推移而改进,超时分辨率可能会提高。
当传入 BLOCK 命令,但至少在一个传入的流中有数据可以返回时,该命令会同步执行,就像没有 BLOCK 选项一样。
这是一个阻塞调用的示例,其中命令稍后返回一个 null 回复,因为超时已过而没有新数据到达
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊的 $ ID。
阻塞时,有时我们希望只接收从阻塞时刻开始通过 XADD
添加到流中的条目。在这种情况下,我们对已添加条目的历史记录不感兴趣。对于这种情况,我们必须检查流的顶部元素 ID,并在 XREAD
命令行中使用该 ID。这不够简洁且需要调用其他命令,因此可以使用特殊的 $
ID 来指示流我们只想要新数据。
非常重要的一点是,您应该只在第一次调用 XREAD
时使用 $
ID。之后,ID 应该是流中报告的最后一个项目的 ID,否则您可能会错过在此期间添加的所有条目。
这是一个典型的 XREAD
调用在消费者愿意只消费新条目的第一次迭代中的样子
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们收到一些回复,下一次调用将类似于
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
以此类推。
特殊的 + ID
您可以使用 XREVRANGE
命令轻松读取单个流中的最后一个条目,如下所示
> XREVRANGE stream + - COUNT 1
但是,随着您添加更多流,这种方法会变慢,因为您必须为每个流单独发出一个命令。相反,从 Redis 7.4 开始,您可以使用 +
号作为特殊 ID。这将请求流中最后一个可用的条目。例如
> XREAD STREAMS streamA streamB streamC streamD + + + +
请注意,当对流使用此特殊 ID 时,COUNT 选项将被忽略(对于特定流),因为只能返回最后一个条目。
如何为阻塞在单个流上的多个客户端提供服务
列表或有序集合上的阻塞列表操作具有弹出行为。基本上,元素会从列表或有序集合中移除,以便返回给客户端。在这种情况下,您希望项目以公平的方式消费,具体取决于客户端在给定键上阻塞的时刻。通常在这种用例中,Redis 使用 FIFO 语义。
然而请注意,对于流,这不是问题:客户端被服务时,流条目不会从流中移除,因此只要 XADD
命令向流提供数据,所有等待的客户端都将得到服务。
强烈建议阅读 Redis Streams 简介,以更多地了解流的整体行为和语义。
RESP2 回复
以下之一
- 数组回复:一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以与
XADD
添加时相同的顺序报告。 - Nil 回复:如果提供了 BLOCK 选项并且发生超时,或者没有可以服务的流。
RESP3 回复
以下之一
- Map 回复:一个由键值元素组成的 map,其中每个元素都由键名和该键报告的条目组成。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以与
XADD
添加时相同的顺序报告。 - Null 回复:如果提供了 BLOCK 选项并且发生超时,或者没有可以服务的流。