单接收者消息传递很有用,但当我们有多个接收者时,它在替换 PUBLISH 和 SUBSCRIBE 命令方面并没有太大帮助。为了做到这一点,我们需要转变我们的问题。在许多方面,Redis PUBLISH/SUBSCRIBE 就像群聊,某人是否连接决定了他们是否在群聊中。我们想要消除“需要始终连接”的要求,我们将在聊天的上下文中实现它。
让我们看看 Fake Garage Startup 的下一个问题。在快速实施用户到用户消息传递系统后,Fake Garage Startup 意识到替换 SMS 很好,但他们收到了许多添加群聊功能的请求。与之前一样,他们的客户端可能随时连接或断开连接,因此我们不能使用内置的 PUBLISH/SUBSCRIBE 方法。
每个新的群聊都将有一组原始的群消息接收者,用户可以根据需要加入或离开群组。有关哪些用户在聊天中的信息将存储为 ZSET,成员是接收者的用户名,值是用户在聊天中收到的最高消息 ID。单个用户属于哪些聊天也将存储为 ZSET,成员是用户所属的群组,分数是用户在该聊天中收到的最高消息 ID。有关某些用户和聊天的信息可以在图 6.12 中看到。
正如您所看到的,用户 jason22 已经看到了在 chat:827 中发送的六条聊天消息中的五条,jason22 和 jeff24 正在参与其中。
聊天会话本身的内容将存储在 ZSET 中,消息作为成员,消息 ID 作为分数。要创建和开始聊天,我们将递增一个全局计数器以获得一个新的聊天 ID。然后,我们将创建一个 ZSET,其中包含我们想要包含的所有用户,看到的 ID 为 0,并将该群组添加到每个用户的群组列表 ZSET 中。最后,我们将消息放入聊天 ZSET 中,从而将初始消息发送给用户。创建聊天的代码如下所示。
def create_chat(conn, sender, recipients, message, chat_id=None):
chat_id = chat_id or str(conn.incr('ids:chat:'))
获取新的聊天 ID。
recipients.append(sender) recipientsd = dict((r, 0) for r in recipients)
设置一个 users-toscores 字典以添加到聊天 ZSET。
pipeline = conn.pipeline(True)
pipeline.zadd('chat:' + chat_id, **recipientsd)
创建包含参与者列表的集合。
for rec in recipients: pipeline.zadd('seen:' + rec, chat_id, 0)
初始化 seen ZSETs。
pipeline.execute()
return send_message(conn, chat_id, sender, message)
发送消息。
唯一可能令人惊讶的是我们使用了来自 dict() 对象构造函数调用的生成器表达式。此快捷方式使我们可以快速构造一个字典,该字典将用户映射到最初为 0 值的分数,ZADD 可以在单个调用中接受该分数。
生成器表达式和字典构造可以通过传递一系列值对来轻松构造 Python 字典。对中的第一项成为键;第二项成为值。清单 6.24 显示了一些看起来很奇怪的代码,我们在其中实际生成要在线传递给字典的序列。这种类型的序列生成称为生成器表达式,您可以在 http://mng.bz/TTKb 上阅读更多相关信息。
要发送消息,我们必须获得一个新的消息 ID,然后将消息添加到聊天的消息 ZSET。不幸的是,发送消息存在竞争条件,但可以通过使用 6.2 节中的锁来轻松处理。我们用于发送消息的带有锁的函数如下所示。
def send_message(conn, chat_id, sender, message): identifier = acquire_lock(conn, 'chat:' + chat_id) if not identifier: raise Exception("Couldn't get the lock") try:
mid = conn.incr('ids:' + chat_id) ts = time.time() packed = json.dumps({ 'id': mid, 'ts': ts, 'sender': sender, 'message': message, })
准备消息。
conn.zadd('msgs:' + chat_id, packed, mid)
将消息发送到聊天。
finally: release_lock(conn, 'chat:' + chat_id, identifier) return chat_id
发送聊天消息所涉及的大部分工作是准备要发送的信息本身;实际发送消息涉及将其添加到 ZSET。我们将打包消息构造和添加到 ZSET 周围使用锁,原因与之前我们需要计数信号量的锁的原因相同。通常,当我们在构造要添加到 Redis 的另一个值时使用来自 Redis 的值时,我们需要使用 WATCH/MULTI/EXEC 事务或锁来消除竞争条件。我们在这里使用锁的原因与我们首先开发它的性能原因相同。
现在我们已经创建了聊天并发送了初始消息,用户需要找到有关他们参与的聊天的信息以及有多少消息正在等待,并且他们需要实际接收消息。
要获取用户的所有待处理消息,我们需要使用 ZRANGE 从用户的 ZSET 中获取群组 ID 和已看到的消息 ID。当我们拥有群组 ID 和用户已看到的消息时,我们可以在所有消息 ZSETs 上执行 ZRANGEBYSCORE 操作。在我们获取了聊天的消息后,我们使用适当的 ID 更新 seen ZSET 和群组 ZSET 中的用户条目,并且我们继续清理群聊中已被聊天中的每个人收到的任何消息,如以下清单所示。
def fetch_pending_messages(conn, recipient):
seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True)
获取上次收到的消息 ID。
pipeline = conn.pipeline(True)
for chat_id, seen_id in seen: pipeline.zrangebyscore( 'msgs:' + chat_id, seen_id+1, 'inf')
获取所有新消息。
chat_info = zip(seen, pipeline.execute())
准备有关要返回的数据的信息。
for i, ((chat_id, seen_id), messages) in enumerate(chat_info): if not messages: continue messages[:] = map(json.loads, messages)
seen_id = messages[-1]['id'] conn.zadd('chat:' + chat_id, recipient, seen_id)
使用最近收到的消息更新“聊天” ZSET。
min_id = conn.zrange( 'chat:' + chat_id, 0, 0, withscores=True)
发现已被所有用户看到的消息。
pipeline.zadd('seen:' + recipient, chat_id, seen_id)
更新“已查看” ZSET。
if min_id:
pipeline.zremrangebyscore( 'msgs:' + chat_id, 0, min_id[0][1])
清除已被所有用户看到的消息。
chat_info[i] = (chat_id, messages) pipeline.execute() return chat_info
获取待处理消息主要是迭代用户的所有聊天,拉取消息,以及清理已被聊天中所有用户看到的消息。
我们已经从群聊中发送和获取了消息;剩下的就是加入和离开群聊。要加入群聊,我们获取聊天的最新消息 ID,然后我们将聊天信息添加到用户的 seen ZSET,分数是最新消息 ID。我们还将用户添加到群组的成员列表中,同样,分数是最新消息 ID。有关加入群组的代码,请参见下一个清单。
def join_chat(conn, chat_id, user):
message_id = int(conn.get('ids:' + chat_id))
获取聊天的最新消息 ID。
pipeline = conn.pipeline(True)
pipeline.zadd('chat:' + chat_id, user, message_id)
将用户添加到聊天成员列表。
pipeline.zadd('seen:' + user, chat_id, message_id)
将聊天添加到用户的 seen 列表中。
pipeline.execute()
加入聊天只需要将正确的引用添加到聊天中的用户,以及将聊天添加到用户的 seen ZSET。
要从群聊中删除用户,我们从聊天 ZSET 中删除用户 ID,并从用户的 seen ZSET 中删除聊天。如果聊天 ZSET 中没有更多用户,我们将删除消息 ZSET 和消息 ID 计数器。如果还有剩余用户,我们将再次传递并清理已被所有用户看到的所有旧消息。以下清单显示了离开聊天的函数。
def leave_chat(conn, chat_id, user): pipeline = conn.pipeline(True)
pipeline.zrem('chat:' + chat_id, user) pipeline.zrem('seen:' + user, chat_id)
从聊天中删除用户。
pipeline.zcard('chat:' + chat_id)
查找剩余的群组成员人数。
if not pipeline.execute()[-1]:
pipeline.delete('msgs:' + chat_id) pipeline.delete('ids:' + chat_id)
删除聊天。
pipeline.execute() else:
oldest = conn.zrange( 'chat:' + chat_id, 0, 0, withscores=True)
查找所有用户看到的最旧消息。
conn.zremrangebyscore('chat:' + chat_id, 0, oldest)
删除聊天中的旧消息。
当用户离开聊天时进行清理并不难,但需要注意很多小细节,以确保我们不会在某个地方泄漏 ZSET 或 ID。
我们现在已经完成了在 Redis 中创建一个完整的多接收者拉取消息系统的过程。 虽然我们是从聊天的角度来看待它,但当您希望接收者能够接收在他们断开连接时发送的消息时,可以使用相同的方法来替换 PUBLISH/SUBSCRIBE 函数。 通过一些工作,我们可以用 LIST 替换 ZSET,并且可以将我们的锁使用从发送消息转移到旧消息清理。 我们使用 ZSET 是因为它可以节省我们为每次聊天获取当前消息 ID 的麻烦。 此外,通过让发送者做更多的工作(围绕发送消息进行锁定),可以避免多个接收者请求更多数据和在清理期间锁定,这将提高整体性能。
我们现在拥有一个多接收者消息系统,可以替换群聊中的 PUBLISH 和 SUBSCRIBE。 在下一节中,我们将使用它来发送有关 Redis 中可用键名称的信息。