XREADGROUP

语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
  [NOACK] STREAMS key [key ...] id [id ...]
可用版本
Redis 开源版 5.0.0
时间复杂度
对于提及的每个流:O(M),其中 M 是返回元素的数量。如果 M 是常数(例如,总是使用 COUNT 请求前 10 个元素),则可以认为是 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将花费 O(N) 的时间来服务阻塞在获取新数据流上的 N 个客户端。
ACL 类别
@write, @stream, @slow, @blocking,

XREADGROUP 命令是 XREAD 命令的一个特殊版本,支持消费者组。在阅读本页面之前,您可能需要了解 XREAD 命令。

此外,如果您是 Redis Streams 的新手,我们建议您阅读我们的Redis Streams 简介。请务必理解简介中消费者组的概念,以便更轻松地理解此命令的工作原理。

30 秒了解消费者组

此命令与标准 XREAD 的区别在于它支持消费者组。

不使用消费者组,只使用 XREAD,所有客户端都会收到流中到达的所有条目。而使用带有 XREADGROUP 的消费者组,可以创建客户端组,这些客户端组消费给定流中到达的不同部分的消息。例如,如果流接收到新条目 A、B 和 C,并且有两个消费者通过消费者组读取,则一个客户端将接收到例如消息 A 和 C,而另一个客户端将接收到消息 B,依此类推。

在消费者组内,给定消费者(即只是从流中消费消息的客户端)必须通过唯一的 消费者名称 进行标识。这只是一个字符串。

消费者组的保证之一是,给定消费者只能看到已交付给它的消息历史记录,因此一条消息只有一个所有者。但是,有一种称为 消息认领 的特殊功能,允许其他消费者在某些消费者发生不可恢复故障时认领消息。为了实现这种语义,消费者组要求通过 XACK 命令显式确认消费者成功处理的消息。这是必需的,因为流会为每个消费者组跟踪谁正在处理哪条消息。

如何判断是否需要使用消费者组

  1. 如果您有一个流和多个客户端,并且希望所有客户端都接收到所有消息,则不需要消费者组。
  2. 如果您有一个流和多个客户端,并且希望流在您的客户端之间进行 分区分片,以便每个客户端接收到流中到达的部分消息,则需要消费者组。

XREAD 和 XREADGROUP 的区别

从语法上看,这两个命令几乎相同,但 XREADGROUP 需要 一个特殊且强制的选项

GROUP <group-name> <consumer-name>

组名称是与流关联的消费者组的名称。使用 XGROUP 命令创建组。消费者名称是客户端用于在组内标识自身的字符串。消费者首次出现时会在消费者组内自动创建。不同的客户端应选择不同的消费者名称。

当您使用 XREADGROUP 读取时,服务器会 记住 某个消息已交付给您:该消息将存储在消费者组中,称为 Pending Entries List (PEL),即已交付但尚未确认的消息 ID 列表。

客户端需要使用 XACK 确认消息处理,以便从 PEL 中移除待处理条目。可以使用 XPENDING 命令检查 PEL。

在不需要可靠性且可接受偶尔消息丢失的情况下,可以使用 NOACK 子命令避免将消息添加到 PEL。这相当于在读取消息时立即确认。

使用 XREADGROUP 时,在 STREAMS 选项中指定的 ID 可以是以下两种之一

  • 特殊的 > ID,表示消费者只希望接收那些 从未交付给任何其他消费者 的消息。这只是意味着,给我新消息。
  • 任何其他 ID,即 0 或任何其他有效的 ID 或不完整的 ID(仅包含毫秒时间部分),其作用是返回发送命令的消费者待处理的、ID 大于提供的 ID 的条目。因此,基本上如果 ID 不是 >,则命令将只允许客户端访问其待处理条目:已交付给它但尚未确认的消息。请注意,在这种情况下,BLOCKNOACK 都将被忽略。

XREAD 一样,XREADGROUP 命令可以以阻塞方式使用。在这方面没有区别。

消息交付给消费者时会发生什么?

两件事:

  1. 如果消息从未交付给任何人,即,如果我们正在讨论一条新消息,则会创建一个 PEL (Pending Entries List)。
  2. 如果消息已经交付给此消费者,并且只是再次重新获取同一条消息,则会更新 上次交付计数器 为当前时间,并使 交付次数 增加一。您可以使用 XPENDING 命令访问这些消息属性。

用法示例

通常,您会像这样使用该命令来获取新消息并进行处理。伪代码如下:

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

通过这种方式,示例消费者代码将只获取新消息,处理它们,并通过 XACK 确认它们。但是,上面的示例代码并不完整,因为它没有处理崩溃后的恢复。如果在处理消息过程中发生崩溃,我们的消息将保留在待处理条目列表中,因此我们可以通过最初给 XREADGROUP 一个 ID 为 0 来访问我们的历史记录,并执行相同的循环。一旦提供 ID 为 0 且回复为空消息集,我们就知道已经处理并确认了所有待处理消息:我们可以开始使用 > 作为 ID,以获取新消息并重新加入正在处理新内容的消费者。

要查看该命令的实际回复,请查阅 XREAD 命令页面。

待处理消息被删除时会发生什么?

由于修剪或显式调用 XDEL,条目可能随时从流中删除。设计上,Redis 不会阻止删除流的 PEL 中存在的条目。发生这种情况时,PEL 会保留已删除条目的 ID,但实际的条目载荷不再可用。因此,当读取此类 PEL 条目时,Redis 将返回 null 值代替其相应的数据。

示例

> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "myfield"
            2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) (nil)

强烈建议阅读Redis Streams 简介,以便更全面地了解流的整体行为和语义。

RESP2 回复

以下之一

  • 数组回复:一个数组,其中每个元素都是一个由两部分组成的数组,包含键名和该键报告的条目。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照 XADD 添加时的顺序报告。
  • Nil 回复:如果指定了 BLOCK 选项且发生超时,或者没有可服务的流。

RESP3 回复

以下之一

  • 映射回复:一个包含键值对元素的映射表,其中每个元素由键名和该键报告的条目组成。报告的条目是完整的流条目,包含 ID 以及所有字段和值的列表。字段和值保证按照 XADD 添加时的顺序报告。
  • Null 回复:如果指定了 BLOCK 选项且发生超时,或者没有可服务的流。

评价此页
回到顶部 ↑