XREADGROUP

语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds]
  [NOACK] STREAMS key [key ...] id [id ...]
可用性
5.0.0
时间复杂度
对于每个提到的流:O(M),其中 M 是返回的元素数量。如果 M 是常数(例如,总是使用 COUNT 请求前 10 个元素),则可以将其视为 O(1)。另一方面,当 XREADGROUP 阻塞时,XADD 将支付 O(N) 时间以服务于 N 个在流中获取新数据的阻塞客户端。
ACL 类别
@write, @stream, @slow, @blocking,

XREADGROUP 命令是 XREAD 命令的特殊版本,支持消费者组。可能您需要先了解 XREAD 命令,然后再阅读本页内容才能理解。

此外,如果您不熟悉流,我们建议您阅读我们的 Redis 流介绍。确保您了解介绍中的消费者组概念,这样更容易理解此命令的工作方式。

消费者组简介

此命令与普通 XREAD 命令之间的区别在于,此命令支持消费者组。

如果没有消费者组,仅使用 XREAD,所有客户端都将收到流中到达的所有条目。相反,使用消费者组和 XREADGROUP,可以创建消费流中到达的消息的不同部分的客户端组。例如,如果流获得新的条目 A、B 和 C,并且有两个消费者通过消费者组读取,则一个客户端将获得例如消息 A 和 C,另一个客户端将获得消息 B,等等。

在一个消费者组内,一个给定的消费者(即,只从流中消费消息的客户端)必须使用唯一的消费者名称进行标识。这只是一个字符串。

消费者组的保证之一是,给定的消费者只能看到已传递给它的消息的历史记录,因此消息只有一个所有者。但是,有一个名为消息认领的特殊功能,允许其他消费者认领消息,以防某个消费者出现不可恢复的故障。为了实现这种语义,消费者组需要通过 XACK 命令显式确认消费者成功处理的消息。这是必需的,因为流将为每个消费者组跟踪谁正在处理什么消息。

如何判断是否需要使用消费者组

  1. 如果您有一个流和多个客户端,并且您希望所有客户端都获得所有消息,则不需要消费者组。
  2. 如果您有一个流和多个客户端,并且您希望流在客户端之间分区分片,以便每个客户端都获得到达流中的消息的子集,则需要消费者组。

XREAD 和 XREADGROUP 之间的区别

从语法的角度来看,这两个命令几乎相同,但是 XREADGROUP需要一个特殊且必须的选项

GROUP <group-name> <consumer-name>

组名称只是与流关联的消费者组的名称。该组是使用 XGROUP 命令创建的。消费者名称是客户端用于在组内标识自己的字符串。消费者在第一次被看到时会在消费者组内自动创建。不同的客户端应该选择不同的消费者名称。

当您使用 XREADGROUP 读取时,服务器将记住某个消息已传递给您:该消息将存储在消费者组内的所谓挂起条目列表 (PEL) 中,该列表是已传递但尚未确认的消息 ID 列表。

客户端必须使用 XACK 确认消息处理,以便从 PEL 中删除挂起条目。可以使用 XPENDING 命令检查 PEL。

NOACK 子命令可用于避免将消息添加到 PEL 中,在这种情况下,可靠性不是要求,偶尔的消息丢失是可以接受的。这等同于在读取消息时确认消息。

使用 XREADGROUP 时,在 STREAMS 选项中指定的 ID 可以是以下两种之一

  • 特殊 > ID 表示消费者希望仅接收 _从未发送给任何其他消费者_ 的消息。 这仅仅意味着,给我新的消息。
  • 任何其他 ID,也就是 0 或任何其他有效 ID 或不完整 ID(只有毫秒时间部分),将具有返回待处理条目的效果,这些条目是发送具有大于所提供 ID 的 ID 的命令的消费者所待处理的。 因此,基本上,如果 ID 不是 >,那么该命令将只允许客户端访问其待处理条目:发送给它的但尚未确认的消息。 请注意,在这种情况下,BLOCKNOACK 都将被忽略。

XREAD 一样,XREADGROUP 命令也可以以阻塞方式使用。 在这方面没有区别。

当消息传递给消费者时会发生什么?

两件事

  1. 如果该消息从未发送给任何人,也就是说,如果我们谈论的是一条新消息,那么将创建一个 PEL(待处理条目列表)。
  2. 相反,如果该消息已经发送给此消费者,并且它只是再次重新获取相同的消息,那么 _最后一次传递计数器_ 将更新为当前时间,并且 _传递次数_ 将增加 1。 您可以使用 XPENDING 命令访问这些消息属性。

用法示例

通常,您使用该命令来获取新消息并进行处理。 在伪代码中

WHILE true
    entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
    if entries == nil
        puts "Timeout... try again"
        CONTINUE
    end

    FOREACH entries AS stream_entries
        FOREACH stream_entries as message
            process_message(message.id,message.fields)

            # ACK the message as processed
            XACK mystream $GroupName message.id
        END
    END
END

通过这种方式,示例消费者代码将只获取新消息,处理它们,并通过 XACK 确认它们。 但是上面的示例代码并不完整,因为它没有处理崩溃后的恢复。 如果我们在处理消息的过程中崩溃会发生什么,我们的消息将保留在待处理条目列表中,因此我们可以通过最初将 XREADGROUP 的 ID 设置为 0 来访问我们的历史记录,并执行相同的循环。 一旦提供 ID 为 0,回复将是一个空的消息集,我们知道我们已经处理并确认了所有待处理的消息:我们可以开始将 > 用作 ID,以便获取新消息并重新加入正在处理新事物的消费者。

要查看该命令的实际回复方式,请查看 XREAD 命令页面。

待处理消息被删除时会发生什么?

由于修剪或显式调用 XDEL,条目可能会随时从流中删除。 根据设计,Redis 不会阻止删除流的 PEL 中存在的条目。 当这种情况发生时,PEL 会保留已删除条目的 ID,但实际的条目有效负载将不再可用。 因此,当读取此类 PEL 条目时,Redis 将返回一个空值来代替它们各自的数据。

示例

> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) 1) "myfield"
            2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
   2) 1) 1) "1-0"
         2) (nil)

强烈建议阅读 Redis 流简介,以便更多地了解流的整体行为和语义。

RESP2 回复

以下之一

  • 数组回复:一个数组,其中每个元素都是一个数组,由包含键名称和为该键报告的条目的两个元素组成。 报告的条目是完整的流条目,具有 ID 和所有字段和值的列表。 字段和值保证以 XADD 添加它们的相同顺序报告。
  • 空回复:如果给出了 BLOCK 选项并且发生了超时,或者没有可以服务的流。

RESP3 回复

以下之一

  • 映射回复:一个键值元素的映射,其中每个元素都由键名称和为该键报告的条目组成。 报告的条目是完整的流条目,具有 ID 和所有字段和值的列表。 字段和值保证以 XADD 添加它们的相同顺序报告。
  • 空回复:如果给出了 BLOCK 选项并且发生了超时,或者没有可以服务的流。

RATE THIS PAGE
Back to top ↑