XREADGROUP
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
- 可用版本
- Redis 开源版 5.0.0
- 时间复杂度
- 对于提及的每个流:O(M),其中 M 是返回元素的数量。如果 M 是常数(例如,总是使用 COUNT 请求前 10 个元素),则可以认为是 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将花费 O(N) 的时间来服务阻塞在获取新数据流上的 N 个客户端。
- ACL 类别
-
@write
,@stream
,@slow
,@blocking
,
XREADGROUP
命令是 XREAD
命令的一个特殊版本,支持消费者组。在阅读本页面之前,您可能需要了解 XREAD
命令。
此外,如果您是 Redis Streams 的新手,我们建议您阅读我们的Redis Streams 简介。请务必理解简介中消费者组的概念,以便更轻松地理解此命令的工作原理。
30 秒了解消费者组
此命令与标准 XREAD
的区别在于它支持消费者组。
不使用消费者组,只使用 XREAD
,所有客户端都会收到流中到达的所有条目。而使用带有 XREADGROUP
的消费者组,可以创建客户端组,这些客户端组消费给定流中到达的不同部分的消息。例如,如果流接收到新条目 A、B 和 C,并且有两个消费者通过消费者组读取,则一个客户端将接收到例如消息 A 和 C,而另一个客户端将接收到消息 B,依此类推。
在消费者组内,给定消费者(即只是从流中消费消息的客户端)必须通过唯一的 消费者名称 进行标识。这只是一个字符串。
消费者组的保证之一是,给定消费者只能看到已交付给它的消息历史记录,因此一条消息只有一个所有者。但是,有一种称为 消息认领 的特殊功能,允许其他消费者在某些消费者发生不可恢复故障时认领消息。为了实现这种语义,消费者组要求通过 XACK
命令显式确认消费者成功处理的消息。这是必需的,因为流会为每个消费者组跟踪谁正在处理哪条消息。
如何判断是否需要使用消费者组
- 如果您有一个流和多个客户端,并且希望所有客户端都接收到所有消息,则不需要消费者组。
- 如果您有一个流和多个客户端,并且希望流在您的客户端之间进行 分区 或 分片,以便每个客户端接收到流中到达的部分消息,则需要消费者组。
XREAD 和 XREADGROUP 的区别
从语法上看,这两个命令几乎相同,但 XREADGROUP
需要 一个特殊且强制的选项
GROUP <group-name> <consumer-name>
组名称是与流关联的消费者组的名称。使用 XGROUP
命令创建组。消费者名称是客户端用于在组内标识自身的字符串。消费者首次出现时会在消费者组内自动创建。不同的客户端应选择不同的消费者名称。
当您使用 XREADGROUP
读取时,服务器会 记住 某个消息已交付给您:该消息将存储在消费者组中,称为 Pending Entries List (PEL),即已交付但尚未确认的消息 ID 列表。
客户端需要使用 XACK
确认消息处理,以便从 PEL 中移除待处理条目。可以使用 XPENDING
命令检查 PEL。
在不需要可靠性且可接受偶尔消息丢失的情况下,可以使用 NOACK
子命令避免将消息添加到 PEL。这相当于在读取消息时立即确认。
使用 XREADGROUP
时,在 STREAMS 选项中指定的 ID 可以是以下两种之一
- 特殊的
>
ID,表示消费者只希望接收那些 从未交付给任何其他消费者 的消息。这只是意味着,给我新消息。 - 任何其他 ID,即 0 或任何其他有效的 ID 或不完整的 ID(仅包含毫秒时间部分),其作用是返回发送命令的消费者待处理的、ID 大于提供的 ID 的条目。因此,基本上如果 ID 不是
>
,则命令将只允许客户端访问其待处理条目:已交付给它但尚未确认的消息。请注意,在这种情况下,BLOCK
和NOACK
都将被忽略。
与 XREAD
一样,XREADGROUP
命令可以以阻塞方式使用。在这方面没有区别。
消息交付给消费者时会发生什么?
两件事:
- 如果消息从未交付给任何人,即,如果我们正在讨论一条新消息,则会创建一个 PEL (Pending Entries List)。
- 如果消息已经交付给此消费者,并且只是再次重新获取同一条消息,则会更新 上次交付计数器 为当前时间,并使 交付次数 增加一。您可以使用
XPENDING
命令访问这些消息属性。
用法示例
通常,您会像这样使用该命令来获取新消息并进行处理。伪代码如下:
WHILE true
entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil
puts "Timeout... try again"
CONTINUE
end
FOREACH entries AS stream_entries
FOREACH stream_entries as message
process_message(message.id,message.fields)
# ACK the message as processed
XACK mystream $GroupName message.id
END
END
END
通过这种方式,示例消费者代码将只获取新消息,处理它们,并通过 XACK
确认它们。但是,上面的示例代码并不完整,因为它没有处理崩溃后的恢复。如果在处理消息过程中发生崩溃,我们的消息将保留在待处理条目列表中,因此我们可以通过最初给 XREADGROUP
一个 ID 为 0 来访问我们的历史记录,并执行相同的循环。一旦提供 ID 为 0 且回复为空消息集,我们就知道已经处理并确认了所有待处理消息:我们可以开始使用 >
作为 ID,以获取新消息并重新加入正在处理新内容的消费者。
要查看该命令的实际回复,请查阅 XREAD
命令页面。
待处理消息被删除时会发生什么?
由于修剪或显式调用 XDEL
,条目可能随时从流中删除。设计上,Redis 不会阻止删除流的 PEL 中存在的条目。发生这种情况时,PEL 会保留已删除条目的 ID,但实际的条目载荷不再可用。因此,当读取此类 PEL 条目时,Redis 将返回 null 值代替其相应的数据。
示例
> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "myfield"
2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) (nil)
强烈建议阅读Redis Streams 简介,以便更全面地了解流的整体行为和语义。
RESP2 回复
以下之一
- 数组回复:一个数组,其中每个元素都是一个由两部分组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照
XADD
添加时的顺序报告。 - Nil 回复:如果指定了 BLOCK 选项且发生超时,或者没有可服务的流。
RESP3 回复
以下之一
- 映射回复:一个包含键值对元素的映射表,其中每个元素由键名和该键报告的条目组成。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照
XADD
添加时的顺序报告。 - Null 回复:如果指定了 BLOCK 选项且发生超时,或者没有可服务的流。