XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- 可用版本
- 5.0.0
- 时间复杂度
- ACL 类别
-
@read
,@stream
,@slow
,@blocking
,
从一个或多个流读取数据,只返回 ID 大于调用者报告的最后一个接收 ID 的条目。此命令可以选择在没有数据可用时阻塞,类似于 BRPOP
或 BZPOPMIN
等命令。
请注意,如果您是 Redis Streams 的新手,在阅读此页面之前,建议阅读 Redis Streams 简介。
非阻塞使用
如果没有使用 **BLOCK** 选项,命令是同步的,可以认为它与 XRANGE
有些类似:它将返回流内的一系列项目,但即使我们只考虑同步使用,它与 XRANGE
也有两个根本区别。
- 如果我们想要同时从多个键读取数据,则可以使用此命令调用多个流。这是一个关键功能
XREAD
,尤其是当使用 **BLOCK** 阻塞时,能够使用单个连接监听多个键是至关重要的功能。 - 虽然
XRANGE
返回 ID 范围内的项目,XREAD
更适合用来从我们见过的所有条目中第一个大于所有条目的条目开始消费流。因此,我们传递给XREAD
的是每个流的最后一个接收的元素的 ID。
例如,如果我有两个流 mystream
和 writers
,并且我想从两个流的第一个元素开始读取数据,我可以像以下示例那样调用 XREAD
。
注意:我们在示例中使用了 **COUNT** 选项,因此对于每个流,调用最多将返回每个流的两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
**STREAMS** 选项是强制性的,并且必须是最后一个选项,因为此选项以以下格式获取可变长度的参数。
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
因此,我们从一个键列表开始,然后继续使用所有相关的 ID,这些 ID 代表我们为该流接收的最后一个 ID,以便调用只提供来自相同流的更大 ID。
例如,在上面的示例中,我们为流 mystream
收到的最后一个项目的 ID 为 1526999352406-0
,而对于流 writers
则为 1526985685298-0
。
要继续迭代两个流,我将调用
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morrison"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
依此类推。最终,调用将不返回任何项目,而只返回一个空数组,然后我们就知道从我们的流中没有更多内容可以获取了(我们必须重试操作,因此此命令也支持阻塞模式)。
不完整的 ID
使用不完整的 ID 是有效的,就像对 XRANGE
有效一样。但是在这里,ID 的序列部分如果缺失,始终被解释为零,因此命令
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等价于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
阻塞以获取数据
在同步形式中,只要有更多项目可用,命令就可以获取新数据。但是,在某个时候,我们必须等待数据生产者使用 XADD
将新条目推送到我们正在消费的流中。为了避免以固定或自适应间隔轮询,命令能够在根据指定的流和 ID 无法返回任何数据的情况下阻塞,并且一旦请求的键之一接受数据,就会自动解除阻塞。
重要的是要了解,此命令会分散到所有等待相同 ID 范围的客户端,因此每个消费者都会收到数据副本,这与阻塞列表弹出操作时的行为不同。
为了阻塞,**BLOCK** 选项与我们要阻塞的毫秒数一起使用,直到超时。通常 Redis 阻塞命令以秒为单位进行超时,但是此命令以毫秒为单位进行超时,即使通常服务器的超时分辨率接近 0.1 秒。这次,在某些用例中,可以阻塞更短的时间,如果服务器内部机制随着时间的推移而改进,那么超时的分辨率可能会提高。
当传递了BLOCK命令,但至少在一个传递的流中存在要返回的数据时,命令将同步执行,就像缺少BLOCK选项一样。
这是一个阻塞调用的示例,其中命令稍后返回一个空回复,因为超时已过,而没有新的数据到达。
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊的$
ID。
在阻塞时,我们有时只想接收从我们阻塞的那一刻起通过XADD
添加到流中的条目。在这种情况下,我们对已添加条目的历史记录不感兴趣。对于这种情况,我们将不得不检查流顶部的元素 ID,并在XREAD
命令行中使用此 ID。这并不干净,需要调用其他命令,因此可以使用特殊的$
ID 来向流发出我们只想要新内容的信号。
非常重要的是,您应该只在第一次调用XREAD
时使用$
ID。之后,ID 应该是流中最后一个报告项目的 ID,否则您可能会错过其间添加的所有条目。
这是第一个迭代中,一个只想消费新条目的消费者,典型的XREAD
调用看起来像这样
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们得到一些回复,下一个调用将类似于
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
等等。
特殊的+
ID
您可以使用XREVRANGE
命令轻松地读取单个流中的最后一个条目,如下所示
> XREVRANGE stream + - COUNT 1
但是,随着您添加更多流,这种方法会变得很慢,因为您必须为每个流发出单独的命令。相反,从 Redis 7.4 开始,您可以使用+
符号作为特殊的 ID。这将请求流中最后一个可用的条目。例如
> XREAD STREAM streamA streamB streamC streamD + + + +
请注意,当对流使用此特殊 ID 时,COUNT 选项将被忽略(对于特定流),因为只能返回最后一个条目。
如何为阻塞在单个流上的多个客户端提供服务
对列表或有序集合的阻塞列表操作具有弹出行为。基本上,元素从列表或有序集合中删除,以便返回给客户端。在这种情况下,您希望以公平的方式消费项目,具体取决于客户端在给定键上阻塞的时间。通常,Redis 在这些用例中使用 FIFO 语义。
但是请注意,对于流来说这不是问题:当为客户端提供服务时,流条目不会从流中删除,因此每个等待的客户端都会在XADD
命令向流提供数据后立即得到服务。
强烈建议阅读Redis Streams 简介,以了解有关流的整体行为和语义的更多信息。
RESP2 回复
以下之一
- 数组回复:一个数组,其中每个元素都是一个数组,包含两个元素:键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以
XADD
添加时的相同顺序报告。 - 空回复:如果给定了BLOCK 选项并且发生超时,或者没有流可以提供服务。
RESP3 回复
以下之一
- 映射回复:一个键值元素映射,其中每个元素都包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证以
XADD
添加时的相同顺序报告。 - 空回复:如果给定了BLOCK 选项并且发生超时,或者没有流可以提供服务。