XREADGROUP
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
- 可用性
- 5.0.0
- 时间复杂度
- 对于每个提到的流:O(M),其中 M 是返回的元素数量。如果 M 是常数(例如,总是使用 COUNT 请求前 10 个元素),则可以将其视为 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将支付 O(N) 时间以服务于 N 个在流中获取新数据的阻塞客户端。
- ACL 类别
-
@write
,@stream
,@slow
,@blocking
,
XREADGROUP
命令是 XREAD
命令的特殊版本,支持消费者组。可能您需要先了解 XREAD
命令,然后再阅读本页内容才能理解。
此外,如果您不熟悉流,我们建议您阅读我们的 Redis 流介绍。确保您了解介绍中的消费者组概念,这样更容易理解此命令的工作方式。
消费者组简介
此命令与普通 XREAD
命令之间的区别在于,此命令支持消费者组。
如果没有消费者组,仅使用 XREAD
,所有客户端都将收到流中到达的所有条目。相反,使用消费者组和 XREADGROUP
,可以创建消费流中到达的消息的不同部分的客户端组。例如,如果流获得新的条目 A、B 和 C,并且有两个消费者通过消费者组读取,则一个客户端将获得例如消息 A 和 C,另一个客户端将获得消息 B,等等。
在一个消费者组内,一个给定的消费者(即,只从流中消费消息的客户端)必须使用唯一的消费者名称进行标识。这只是一个字符串。
消费者组的保证之一是,给定的消费者只能看到已传递给它的消息的历史记录,因此消息只有一个所有者。但是,有一个名为消息认领的特殊功能,允许其他消费者认领消息,以防某个消费者出现不可恢复的故障。为了实现这种语义,消费者组需要通过 XACK
命令显式确认消费者成功处理的消息。这是必需的,因为流将为每个消费者组跟踪谁正在处理什么消息。
如何判断是否需要使用消费者组
- 如果您有一个流和多个客户端,并且您希望所有客户端都获得所有消息,则不需要消费者组。
- 如果您有一个流和多个客户端,并且您希望流在客户端之间分区或分片,以便每个客户端都获得到达流中的消息的子集,则需要消费者组。
XREAD 和 XREADGROUP 之间的区别
从语法的角度来看,这两个命令几乎相同,但是 XREADGROUP
需要一个特殊且必须的选项
GROUP <group-name> <consumer-name>
组名称只是与流关联的消费者组的名称。该组是使用 XGROUP
命令创建的。消费者名称是客户端用于在组内标识自己的字符串。消费者在第一次被看到时会在消费者组内自动创建。不同的客户端应该选择不同的消费者名称。
当您使用 XREADGROUP
读取时,服务器将记住某个消息已传递给您:该消息将存储在消费者组内的所谓挂起条目列表 (PEL) 中,该列表是已传递但尚未确认的消息 ID 列表。
客户端必须使用 XACK
确认消息处理,以便从 PEL 中删除挂起条目。可以使用 XPENDING
命令检查 PEL。
NOACK
子命令可用于避免将消息添加到 PEL 中,在这种情况下,可靠性不是要求,偶尔的消息丢失是可以接受的。这等同于在读取消息时确认消息。
使用 XREADGROUP
时,在 STREAMS 选项中指定的 ID 可以是以下两种之一
- 特殊
>
ID 表示消费者希望仅接收 _从未发送给任何其他消费者_ 的消息。 这仅仅意味着,给我新的消息。 - 任何其他 ID,也就是 0 或任何其他有效 ID 或不完整 ID(只有毫秒时间部分),将具有返回待处理条目的效果,这些条目是发送具有大于所提供 ID 的 ID 的命令的消费者所待处理的。 因此,基本上,如果 ID 不是
>
,那么该命令将只允许客户端访问其待处理条目:发送给它的但尚未确认的消息。 请注意,在这种情况下,BLOCK
和NOACK
都将被忽略。
与 XREAD
一样,XREADGROUP
命令也可以以阻塞方式使用。 在这方面没有区别。
当消息传递给消费者时会发生什么?
两件事
- 如果该消息从未发送给任何人,也就是说,如果我们谈论的是一条新消息,那么将创建一个 PEL(待处理条目列表)。
- 相反,如果该消息已经发送给此消费者,并且它只是再次重新获取相同的消息,那么 _最后一次传递计数器_ 将更新为当前时间,并且 _传递次数_ 将增加 1。 您可以使用
XPENDING
命令访问这些消息属性。
用法示例
通常,您使用该命令来获取新消息并进行处理。 在伪代码中
WHILE true
entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil
puts "Timeout... try again"
CONTINUE
end
FOREACH entries AS stream_entries
FOREACH stream_entries as message
process_message(message.id,message.fields)
# ACK the message as processed
XACK mystream $GroupName message.id
END
END
END
通过这种方式,示例消费者代码将只获取新消息,处理它们,并通过 XACK
确认它们。 但是上面的示例代码并不完整,因为它没有处理崩溃后的恢复。 如果我们在处理消息的过程中崩溃会发生什么,我们的消息将保留在待处理条目列表中,因此我们可以通过最初将 XREADGROUP
的 ID 设置为 0 来访问我们的历史记录,并执行相同的循环。 一旦提供 ID 为 0,回复将是一个空的消息集,我们知道我们已经处理并确认了所有待处理的消息:我们可以开始将 >
用作 ID,以便获取新消息并重新加入正在处理新事物的消费者。
要查看该命令的实际回复方式,请查看 XREAD
命令页面。
待处理消息被删除时会发生什么?
由于修剪或显式调用 XDEL
,条目可能会随时从流中删除。 根据设计,Redis 不会阻止删除流的 PEL 中存在的条目。 当这种情况发生时,PEL 会保留已删除条目的 ID,但实际的条目有效负载将不再可用。 因此,当读取此类 PEL 条目时,Redis 将返回一个空值来代替它们各自的数据。
示例
> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "myfield"
2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) (nil)
强烈建议阅读 Redis 流简介,以便更多地了解流的整体行为和语义。
RESP2 回复
以下之一
- 数组回复:一个数组,其中每个元素都是一个数组,由包含键名称和为该键报告的条目的两个元素组成。 报告的条目是完整的流条目,具有 ID 和所有字段和值的列表。 字段和值保证以
XADD
添加它们的相同顺序报告。 - 空回复:如果给出了 BLOCK 选项并且发生了超时,或者没有可以服务的流。
RESP3 回复
以下之一
- 映射回复:一个键值元素的映射,其中每个元素都由键名称和为该键报告的条目组成。 报告的条目是完整的流条目,具有 ID 和所有字段和值的列表。 字段和值保证以
XADD
添加它们的相同顺序报告。 - 空回复:如果给出了 BLOCK 选项并且发生了超时,或者没有可以服务的流。