dot Redis 8 来了 - 而且是开源的

了解更多

6.5.2 多接收者发布/订阅替换

返回主页

6.5.2 多接收者发布/订阅替换

单接收者消息传递很有用,但当我们有多个接收者时,它在替换 PUBLISHSUBSCRIBE 命令方面并没有太大帮助。为了做到这一点,我们需要转变我们的问题。在许多方面,Redis PUBLISH/SUBSCRIBE 就像群聊,某人是否连接决定了他们是否在群聊中。我们想要消除“需要始终连接”的要求,我们将在聊天的上下文中实现它。

让我们看看 Fake Garage Startup 的下一个问题。在快速实施用户到用户消息传递系统后,Fake Garage Startup 意识到替换 SMS 很好,但他们收到了许多添加群聊功能的请求。与之前一样,他们的客户端可能随时连接或断开连接,因此我们不能使用内置的 PUBLISH/SUBSCRIBE 方法。

图 6.12 一些示例聊天和用户数据。聊天 ZSET 显示用户以及他们在该聊天中看到的消息的最大 ID。 seen ZSET 列出了每个用户的聊天 ID,同样,分数是他们在给定聊天中看到的最大消息 ID。

每个新的群聊都将有一组原始的群消息接收者,用户可以根据需要加入或离开群组。有关哪些用户在聊天中的信息将存储为 ZSET,成员是接收者的用户名,值是用户在聊天中收到的最高消息 ID。单个用户属于哪些聊天也将存储为 ZSET,成员是用户所属的群组,分数是用户在该聊天中收到的最高消息 ID。有关某些用户和聊天的信息可以在图 6.12 中看到。

正如您所看到的,用户 jason22 已经看到了在 chat:827 中发送的六条聊天消息中的五条,jason22 和 jeff24 正在参与其中。

创建聊天会话

聊天会话本身的内容将存储在 ZSET 中,消息作为成员,消息 ID 作为分数。要创建和开始聊天,我们将递增一个全局计数器以获得一个新的聊天 ID。然后,我们将创建一个 ZSET,其中包含我们想要包含的所有用户,看到的 ID 为 0,并将该群组添加到每个用户的群组列表 ZSET 中。最后,我们将消息放入聊天 ZSET 中,从而将初始消息发送给用户。创建聊天的代码如下所示。

清单 6.24 create_chat() 函数
def create_chat(conn, sender, recipients, message, chat_id=None):
 
   chat_id = chat_id or str(conn.incr('ids:chat:'))

获取新的聊天 ID。

   recipients.append(sender)
   recipientsd = dict((r, 0) for r in recipients)

设置一个 users-toscores 字典以添加到聊天 ZSET

      pipeline = conn.pipeline(True)
 
      pipeline.zadd('chat:' + chat_id, **recipientsd)

创建包含参与者列表的集合。

      for rec in recipients:
         pipeline.zadd('seen:' + rec, chat_id, 0)

初始化 seen ZSETs。

      pipeline.execute()

 
      return send_message(conn, chat_id, sender, message)

发送消息。

唯一可能令人惊讶的是我们使用了来自 dict() 对象构造函数调用的生成器表达式。此快捷方式使我们可以快速构造一个字典,该字典将用户映射到最初为 0 值的分数,ZADD 可以在单个调用中接受该分数。

生成器表达式和字典构造可以通过传递一系列值对来轻松构造 Python 字典。对中的第一项成为键;第二项成为值。清单 6.24 显示了一些看起来很奇怪的代码,我们在其中实际生成要在线传递给字典的序列。这种类型的序列生成称为生成器表达式,您可以在 http://mng.bz/TTKb 上阅读更多相关信息。

发送消息

要发送消息,我们必须获得一个新的消息 ID,然后将消息添加到聊天的消息 ZSET。不幸的是,发送消息存在竞争条件,但可以通过使用 6.2 节中的锁来轻松处理。我们用于发送消息的带有锁的函数如下所示。

清单 6.25 send_message() 函数
def send_message(conn, chat_id, sender, message):
   identifier = acquire_lock(conn, 'chat:' + chat_id)
   if not identifier:
      raise Exception("Couldn't get the lock")
   try:
 
      mid = conn.incr('ids:' + chat_id)
      ts = time.time()
      packed = json.dumps({
         'id': mid,
         'ts': ts,
         'sender': sender,
         'message': message,
      })

准备消息。

      conn.zadd('msgs:' + chat_id, packed, mid)

将消息发送到聊天。

   finally:
      release_lock(conn, 'chat:' + chat_id, identifier)
   return chat_id
 

 

发送聊天消息所涉及的大部分工作是准备要发送的信息本身;实际发送消息涉及将其添加到 ZSET。我们将打包消息构造和添加到 ZSET 周围使用锁,原因与之前我们需要计数信号量的锁的原因相同。通常,当我们在构造要添加到 Redis 的另一个值时使用来自 Redis 的值时,我们需要使用 WATCH/MULTI/EXEC 事务或锁来消除竞争条件。我们在这里使用锁的原因与我们首先开发它的性能原因相同。

现在我们已经创建了聊天并发送了初始消息,用户需要找到有关他们参与的聊天的信息以及有多少消息正在等待,并且他们需要实际接收消息。

获取消息

要获取用户的所有待处理消息,我们需要使用 ZRANGE 从用户的 ZSET 中获取群组 ID 和已看到的消息 ID。当我们拥有群组 ID 和用户已看到的消息时,我们可以在所有消息 ZSETs 上执行 ZRANGEBYSCORE 操作。在我们获取了聊天的消息后,我们使用适当的 ID 更新 seen ZSET 和群组 ZSET 中的用户条目,并且我们继续清理群聊中已被聊天中的每个人收到的任何消息,如以下清单所示。

清单 6.26 fetch_pending_messages() 函数
def fetch_pending_messages(conn, recipient):
 
   seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)

获取上次收到的消息 ID。

   pipeline = conn.pipeline(True)
   
 
   for chat_id, seen_id in seen:
      pipeline.zrangebyscore(
         'msgs:' + chat_id, seen_id+1, 'inf')

获取所有新消息。

   chat_info = zip(seen, pipeline.execute())

准备有关要返回的数据的信息。

   for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
      if not messages:
         continue
      messages[:] = map(json.loads, messages)
 
      seen_id = messages[-1]['id']
      conn.zadd('chat:' + chat_id, recipient, seen_id)

使用最近收到的消息更新“聊天” ZSET。

      min_id = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

发现已被所有用户看到的消息。

      pipeline.zadd('seen:' + recipient, chat_id, seen_id)

更新“已查看” ZSET。

      if min_id:
 
         pipeline.zremrangebyscore(
            'msgs:' + chat_id, 0, min_id[0][1])

清除已被所有用户看到的消息。

      chat_info[i] = (chat_id, messages)
   pipeline.execute()

   return chat_info
 

 

获取待处理消息主要是迭代用户的所有聊天,拉取消息,以及清理已被聊天中所有用户看到的消息。

加入和离开聊天

我们已经从群聊中发送和获取了消息;剩下的就是加入和离开群聊。要加入群聊,我们获取聊天的最新消息 ID,然后我们将聊天信息添加到用户的 seen ZSET,分数是最新消息 ID。我们还将用户添加到群组的成员列表中,同样,分数是最新消息 ID。有关加入群组的代码,请参见下一个清单。

清单 6.27 join_chat() 函数
def join_chat(conn, chat_id, user):
 
   message_id = int(conn.get('ids:' + chat_id))

获取聊天的最新消息 ID。

   pipeline = conn.pipeline(True)
 
   pipeline.zadd('chat:' + chat_id, user, message_id)

将用户添加到聊天成员列表。

   pipeline.zadd('seen:' + user, chat_id, message_id)

将聊天添加到用户的 seen 列表中。

   pipeline.execute()
 

 

加入聊天只需要将正确的引用添加到聊天中的用户,以及将聊天添加到用户的 seen ZSET

要从群聊中删除用户,我们从聊天 ZSET 中删除用户 ID,并从用户的 seen ZSET 中删除聊天。如果聊天 ZSET 中没有更多用户,我们将删除消息 ZSET 和消息 ID 计数器。如果还有剩余用户,我们将再次传递并清理已被所有用户看到的所有旧消息。以下清单显示了离开聊天的函数。

清单 6.28 leave_chat() 函数
def leave_chat(conn, chat_id, user):
   pipeline = conn.pipeline(True)
 
   pipeline.zrem('chat:' + chat_id, user)
   pipeline.zrem('seen:' + user, chat_id)

从聊天中删除用户。

   pipeline.zcard('chat:' + chat_id)

查找剩余的群组成员人数。

   if not pipeline.execute()[-1]:
 
      pipeline.delete('msgs:' + chat_id)
      pipeline.delete('ids:' + chat_id)

删除聊天。

      pipeline.execute()
   else:
 
      oldest = conn.zrange(
         'chat:' + chat_id, 0, 0, withscores=True)

查找所有用户看到的最旧消息。

      conn.zremrangebyscore('chat:' + chat_id, 0, oldest)

删除聊天中的旧消息。

 

当用户离开聊天时进行清理并不难,但需要注意很多小细节,以确保我们不会在某个地方泄漏 ZSET 或 ID。

我们现在已经完成了在 Redis 中创建一个完整的多接收者拉取消息系统的过程。 虽然我们是从聊天的角度来看待它,但当您希望接收者能够接收在他们断开连接时发送的消息时,可以使用相同的方法来替换 PUBLISH/SUBSCRIBE 函数。 通过一些工作,我们可以用 LIST 替换 ZSET,并且可以将我们的锁使用从发送消息转移到旧消息清理。 我们使用 ZSET 是因为它可以节省我们为每次聊天获取当前消息 ID 的麻烦。 此外,通过让发送者做更多的工作(围绕发送消息进行锁定),可以避免多个接收者请求更多数据和在清理期间锁定,这将提高整体性能。

我们现在拥有一个多接收者消息系统,可以替换群聊中的 PUBLISHSUBSCRIBE。 在下一节中,我们将使用它来发送有关 Redis 中可用键名称的信息。