dot Redis 8 来了——并且是开源的

了解更多

6.3.2 公平信号量

返回主页

6.3.2 公平信号量

因为我们不能假设所有系统上的系统时钟都完全相同,所以我们早期的基本计数信号量会出现问题,即系统时钟较慢的客户端可以从系统时钟较快的客户端那里窃取信号量。 任何时候存在这种敏感性,锁定本身就会变得不公平。 我们希望减少不正确的系统时间对获取信号量的影响,达到只要系统在 1 秒内,系统时间就不会导致信号量被盗或信号量过早过期。

为了最大限度地减少系统时间不一致的问题,我们将添加一个计数器和第二个 ZSET。 计数器创建一个稳定增加的类计时器机制,确保首先递增计数器的人应该获得信号量。 然后,我们通过使用带有计数器生成值作为分数的“所有者”ZSET来强制执行我们的要求,即首先获得计数器并且想要信号量的客户端也获得信号量,检查我们在新的 ZSET 中的标识符的排名,以确定哪个客户端获得了信号量。 新的所有者 ZSET 出现在图 6.7 中。

图 6.7 公平信号量所有者 ZSET

我们继续以与基本信号量相同的方式处理超时,方法是从系统时间 ZSET 中删除条目。 我们通过使用 ZINTERSTOREWEIGHTS 参数将这些超时传播到新的所有者 ZSET

将所有内容整合到清单 6.14 中,我们首先通过从超时 ZSET 中删除旧条目来使条目超时,然后将超时 ZSET 与所有者相交

ZSET,保存并覆盖所有者 ZSET。 然后,我们递增计数器并将我们的计数器值添加到所有者 ZSET,同时将我们当前的系统时间添加到超时 ZSET。 最后,我们检查我们在所有者 ZSET 中的排名是否足够低,如果是,我们就可以获得信号量。 如果没有,我们从所有者和超时 ZSET 中删除我们的条目。

清单 6.14 acquire_fair_semaphore() 函数
def acquire_fair_semaphore(conn, semname, limit, timeout=10):
 
   identifier = str(uuid.uuid4())

一个 128 位随机标识符。

   czset = semname + ':owner'
   ctr = semname + ':counter'

   now = time.time()
   pipeline = conn.pipeline(True)
 
   pipeline.zremrangebyscore(semname, '-inf', now - timeout)
   pipeline.zinterstore(czset, {czset: 1, semname: 0})

使旧条目超时。

   pipeline.incr(ctr)
   counter = pipeline.execute()[-1]

获取计数器。

   pipeline.zadd(semname, identifier, now)
   pipeline.zadd(czset, identifier, counter)

尝试获取信号量。

   pipeline.zrank(czset, identifier)
   if pipeline.execute()[-1] < limit:

检查排名以确定我们是否获得了信号量。

      return identifier

我们获得了信号量。

   pipeline.zrem(semname, identifier)
   pipeline.zrem(czset, identifier)

我们没有获得信号量; 清除错误数据。

   pipeline.execute()
   return None
 

此函数有几个不同的部分。 我们首先清理超时的信号量,更新所有者 ZSET 并获取此项的下一个计数器 ID。 在我们将时间添加到超时 ZSET 和将计数器值添加到所有者 ZSET 之后,我们就可以检查我们的排名是否足够低了。

32 位平台上的公平信号量在 32 位 Redis 平台上,整数计数器限制为 231 – 1,即标准有符号整数限制。 在最坏的情况下,在高负荷使用的信号量上,大约每 2 小时就会发生一次溢出情况。 虽然有多种解决方法,但最简单的方法是为使用任何基于计数器的 ID 的任何机器切换到 64 位平台。

让我们看看图 6.8,它显示了当进程 ID 8372 想要在时间 1326437039.100 获取信号量时执行的操作序列,其中限制为 5。

释放信号量几乎与之前一样容易,只是现在我们从所有者和超时 ZSET 中删除我们的标识符,如下一个清单所示。

清单 6.15 release_fair_semaphore() 函数
def release_fair_semaphore(conn, semname, identifier):
   pipeline = conn.pipeline(True)
   pipeline.zrem(semname, identifier)
   pipeline.zrem(semname + ':owner', identifier)
 
   return pipeline.execute()[0]

如果信号量已正确释放,则返回 True;如果信号量已超时,则返回 False

 

图 6.8 acquire_fair_semaphore() 的调用序列

如果我们想偷懒,在大多数情况下,我们可以只从超时 ZSET 中删除我们的信号量标识符;我们在获取序列中的步骤之一是刷新所有者 ZSET 以删除不再在超时 ZSET 中的标识符。 但是通过仅从超时 ZSET 中删除我们的标识符,存在一种可能性(很少见,但可能),即我们删除了该条目,但是 acquire_fair_semaphore() 介于更新所有者 ZSET 的部分和将自己的标识符添加到超时和所有者 ZSET 的部分之间。 如果是这样,这可能会阻止它在应该能够获取信号量时获取信号量。 为了尽可能确保在尽可能多的情况下行为正确,我们将坚持从两个 ZSET 中删除标识符。

现在我们有了一个不需要所有主机具有相同系统时间的信号量,尽管系统时间确实需要在 1 或 2 秒内,以确保信号量不会过早、过晚或根本不超时。