使用Redis构建计数信号量控制并发

Published on 2017 - 06 - 18

计数信号量是一种锁,它可以让用户限制一项资源最多能够同时被多少个进程访问,通常用于限定能够同时使用的资源数量。你可以把我们在前一节创建的锁看作是只能被一个进程访问的信号量。

计数信号量和其他种类的锁一样,都需要被获取和释放。客户端首先需要获取信号量,然后执行操作,最后释放信号量。计数信号量和其他锁的区别在于,当客户端获取锁失败的时候,客户端通常会选择进行等待;而当客户端获取计数信号量失败的时候,客户端通常会选择立即返回失败结果。举个例子,假设我们最多只允许5个进程同时获取信号量,那么当有第6个进程尝试去获取信号量的时候,我们希望这个获取操作可以尽早地失败,并向客户端返回“本资源目前正处于繁忙状态”之类的信息。

我们将以渐进的方式来构建计数信号量实现,直到它具有完整的功能并且运作正常为止。

让我们来看一个关于Fake Game公司的例子。随着商品交易市场变得越来越红火,玩家希望Fake Game公司能够允许他们在游戏以外的地方访问商品交易市场的相关信息,以便在不登入游戏的情况下进行商品买卖。目前,执行相关操作的API已经完成,而我们的任务就是要构建出一种机制,限制每个账号最多只能有5个进程同时访问市场。

等到计数信号量构建完毕的时候,我们就可以使用acquire_semaphore()和release_semaphore()来包裹起商品买卖操作的相关API了。

构建基本的计数信号量

构建计数信号量时要考虑的事情和构建其他类型的锁时要考虑的事情大部分都是相同的,比如判断是哪个客户端取得了锁,如何处理客户端在获得锁之后崩溃的情况,以及如何处理锁超时的问题。实际上,如果我们不考虑信号量超时的问题,也不考虑信号量的持有者在未释放信号量的情况下崩溃的问题,那么有好几种不同的方法可以非常方便地构建出一个信号量实现。遗憾的是,从长远来看,这些简单方便的方法构建出来的信号量都不太实用,因此我们将通过持续改进的方式来提供一个功能完整的计数信号量。

使用Redis来实现超时限制特性通常有两种方法可选。一种是像之前构建分布式锁那样,使用EXPIRE命令,而另一种则是使用有序集合。为了将多个信号量持有者的信息都存储到同一个结构里面,这次我们将使用有序集合来构建计数信号量。

说得更具体一点,程序将为每个尝试获取信号量的进程生成一个唯一标识符,并将这个标识符用作有序集合的成员,而成员对应的分值则是进程尝试获取信号量时的Unix时间戳。图6展示了一个存储信号量信息的有序集合示例。


[图6 存储信号量信息的有序集合]

进程在尝试获取信号量时会生成一个标识符,并使用当前时间戳作为分值,将标识符添加到有序集合里面。接着进程会检查自己的标识符在有序集合中的排名。如果排名低于可获取的信号量总数(成员的排名从0开始计算),那么表示进程成功地取得了信号量。反之,则表示进程未能取得信号量,它必须从有序集合里面移除自己的标识符。为了处理过期的信号量,程序在将标识符添加到有序集合之前,会先清理有序集合中所有时间戳大于超时数值(timeout number value)的标识符。代码清单12展示了信号量获取操作的具体实现代码。

def acquire_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())                             #A
    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zremrangebyscore(semname, '-inf', now - timeout)  #B
    pipeline.zadd(semname, identifier, now)                    #C
    pipeline.zrank(semname, identifier)                        #D
    if pipeline.execute()[-1] < limit:                         #D
        return identifier

    conn.zrem(semname, identifier)                             #E
    return None

#A A 128-bit random identifier
#B Time out old semaphore holders
#C Try to acquire the semaphore
#D Check to see if we have it
#E We failed to get the semaphore, discard our identifier

acquire_semaphore()函数所做的就和前面介绍的一样:生成标识符,清除所有过期的信号量,将新的标识符添加到有序集合里面,检查新添加的标识符在有序集合中的排名。没有什么让人出乎意料的地方。

代码清单13展示的信号量释放操作非常简单:程序只需要从有序集合里面移除指定的标识符就可以了。

def release_semaphore(conn, semname, identifier):
    return conn.zrem(semname, identifier)                      #A

#A Returns True if the semaphore was properly released, False if it had timed out

这个基本的信号量实现非常好用,它不仅简单,而且运行速度也飞快。但这个信号量实现也存在一些问题:它在获取信号量的时候,会假设每个进程访问到的系统时间都是相同的,而这一假设在多主机环境下可能并不成立。举个例子,对于系统A和B来说,如果A的系统时间要比B的系统时间快10毫秒,那么当A取得了最后一个信号量的时候,B只需要在10毫秒内尝试获取信号量,就可以在A不知情的情况下,“偷走”A已经取得的信号量。对于一部分应用程序来说这并不是一个大问题,但对于另外一部分应用程序来说却并不是如此。

每当锁或者信号量因为系统时钟的细微不同而导致锁的获取结果出现剧烈变化时,这个锁或者信号量就是不公平的(unfair)。不公平的锁和信号量可能会导致客户端永远也无法取得它原本应该得到的锁或信号量。接下来的一节将介绍解决这个问题的方法。

公平信号量

当各个系统的系统时间并不完全相同的时候,前面介绍的基本信号量就会出现问题:系统时钟较慢的系统上运行的客户端,将能够偷走系统时钟较快的系统上运行的客户端已经取得的信号量,导致信号量变得不公平。我们需要减少不正确的系统时间对信号量获取操作带来的影响,使得只要各个系统的系统时间相差不超过1秒,就不会引起信号量被偷或者信号量提早过期。

为了尽可能地减少系统时间不一致带来的问题,我们需要给信号量实现添加一个计数器以及一个有序集合。其中,计数器通过持续地执行自增操作,创建出一种类似于计时器(timer)的机制,确保最先对计数器执行自增操作的客户端能够获得信号量。另外,为了满足“最先对计数器执行自增操作的客户端能够获得信号量”这一要求,程序会将计数器生成的值用作分值,存储到一个“信号量拥有者”有序集合里面,然后通过检查客户端生成的标识符在有序集合中的排名来判断客户端是否取得了信号量。图7展示了一个信号量拥有者有序集合示例以及一个计数器示例。

公平信号量和之前介绍的基本信号量一样,都是通过从系统时间有序集合里面移除过期元素的方式来清理过期信号量的。另外,公平信号量实现还会通过ZINTERSTORE命令以及该命令的WEIGHTS参数,将信号量的超时时间传递给新的信号量拥有者有序集合。

代码清单14展示了公平信号量获取操作的实现代码。程序首先通过从超时有序集合里面移除过期元素的方式来移除超时的信号量,接着对超时有序集合和信号量拥有者有序集合执行交集计算,并将计算结果保存到信号量拥有者有序集合里面,覆盖有序集合中原有的数据。之后,程序会对计数器执行自增操作,并将计数器生成的值添加到信号量拥有者有序集合里面;与此同时,程序还会将当前的系统时间添加到超时有序集合里面。在完成以上操作之后,程序会检查当前客户端添加的标识符在信号量拥有者有序集合中的排名是否足够低,如果是的话就表示客户端成功取得了信号量。相反地,如果客户端未能取得信号量,那么程序将从信号量拥有者有序集合以及超时有序集合里面移除与该客户端相关的元素。

def acquire_fair_semaphore(conn, semname, limit, timeout=10):
    identifier = str(uuid.uuid4())                             #A
    czset = semname + ':owner'
    ctr = semname + ':counter'

    now = time.time()
    pipeline = conn.pipeline(True)
    pipeline.zremrangebyscore(semname, '-inf', now - timeout)  #B
    pipeline.zinterstore(czset, {czset: 1, semname: 0})        #B

    pipeline.incr(ctr)                                         #C
    counter = pipeline.execute()[-1]                           #C

    pipeline.zadd(semname, identifier, now)                    #D
    pipeline.zadd(czset, identifier, counter)                  #D

    pipeline.zrank(czset, identifier)                          #E
    if pipeline.execute()[-1] < limit:                         #E
        return identifier                                      #F

    pipeline.zrem(semname, identifier)                         #G
    pipeline.zrem(czset, identifier)                           #G
    pipeline.execute()
    return None

#A A 128-bit random identifier
#B Time out old entries
#C Get the counter
#D Try to acquire the semaphore
#E Check the rank to determine if we got the semaphore
#F We got the semaphore
#G We didn't get the semaphore, clean out the bad data

acquire_fair_semaphore()函数和之前的acquire_semaphore()函数有一些不同的地方。它首先清除已经超时的信号量,接着更新信号量拥有者有序集合并获取计数器生成的新ID值,之后,函数会将客户端的当前时间戳添加到过期时间有序集合里面,并将计数器生成的ID值添加到信号量拥有者有序集合里面,这样就可以检查标识符在有序集合里面的排名是否足够低了。

在32位平台上实现公平信号量 对于运行在32位平台上的Redis来说,整数计数器的最大值将被限制为231−1,也就是标准有符号整数的最大值。在大量使用信号量的情况下,32位计数器的值大约每过2小时就会溢出一次。尽管有几种变通的方法可以避开这个问题,但对于需要生成计数器ID的应用程序来说,最简单的办法还是直接切换到64位平台。

图8展示了ID为8372的进程在1326437039.100这个时间尝试获取信号量时执行的一系列操作,其中信号量的最大数量为5。


[图8 acquire_fair_semaphore()函数的调用序列]

公平信号量的释放操作几乎和基础信号量的释放操作一样简单,它们之间的唯一区别在于:公平信号量的释放操作需要同时从信号量拥有者有序集合以及超时有序集合里面删除当前客户端的标识符。代码清单15展示了公平信号量释放操作的实现代码。

def release_fair_semaphore(conn, semname, identifier):
    pipeline = conn.pipeline(True)
    pipeline.zrem(semname, identifier)
    pipeline.zrem(semname + ':owner', identifier)
    return pipeline.execute()[0]                               #A

#A Returns True if the semaphore was properly released, False if it had timed out

因为信号量获取操作的其中一个步骤,就是对信号量拥有者有序集合进行更新,移除那些不再存在于超时有序集合中的标识符,所以,如果我们想要稍微偷懒一下的话,也可以在释放信号量的时候,只移除超时有序集合里面的客户端标识符,而不对信号量拥有者有序集合执行相同的操作。但是只从超时有序集合里面移除标识符可能会引发这样一个问题:当一个客户端执行acquire_fair_semaphore()函数,对信号量拥有者有序集合进行了更新,并正准备将自己的标识符添加到超时有序集合和信号量拥有者有序集合之际,如果有另一个客户端执行信号量释放函数,并将该客户端自己的标识符从超时有序集合中移除的话,这将导致原本能够成功执行的信号量获取操作变为执行失败。虽然这个问题出现的几率很低,但它还是有可能会出现,因此,为了确保程序在不同情况下都能产生正确的行为,信号量释放函数仍然会同时从两个有序集合里面移除客户端标识符。

尽管这个信号量实现并不要求所有主机都拥有相同的系统时间,但各个主机在系统时间上的差距仍然需要控制在一两秒之内,从而避免信号量过早释放或者太晚释放。

刷新信号量

在商品交易市场的API完成之后,Fake Game公司决定通过流(stream)来让用户第一时间获知最新上架销售的商品以及最新完成的交易。前面介绍的信号量实现默认只能设置10秒的超时时间,它主要用于实现超时限制特性并掩盖自身包含的潜在缺陷,但是短短的10秒连接时间对于流API的使用者来说是远远不够的,因此我们需要想办法对信号量进行刷新,防止其过期。

因为公平信号量区分开了超时有序集合和信号量拥有者有序集合,所以程序只需要对超时有序集合进行更新,就可以立即刷新信号量的超时时间了。代码清单16展示了刷新操作的实现代码。

def refresh_fair_semaphore(conn, semname, identifier):
    if conn.zadd(semname, identifier, time.time()):            #A
        release_fair_semaphore(conn, semname, identifier)      #B
        return False                                           #B
    return True                                                #C

#A Update our semaphore
#B We lost our semaphore, report back
#C We still have our semaphore

只要客户端持有的信号量没有因为过期而被删除,refresh_fair_semaphore()函数就可以对信号量的超时时间进行刷新。另一方面,如果客户端持有的信号量已经因为超时而被删除,那么函数将释放信号量,并将信号量已经丢失的信息告知调用者。在长时间使用信号量的时候,我们必须以足够频繁的频率对信号量进行刷新,防止它因为过期而丢失。

既然我们已经可以获取、释放和刷新公平信号量了,那么是时候来解决竞争条件的问题了。

消除竞争条件

竞争条件可能会导致操作重试或者数据出错,而解决竞争条件并不容易。不巧的是,前面介绍的信号量实现也带有可能会导致操作不正确的竞争条件。

比如说,当两个进程A和B都在尝试获取剩余的一个信号量时,即使A首先对计数器执行了自增操作,但只要B能够抢先将自己的标识符添加到有序集合里,并检查标识符在有序集合中的排名,那么B就可以成功地取得信号量。之后当A也将自己的标识符添加到有序集合里,并检查标识符在有序集合中的排名时,A将“偷走”B已经取得的信号量,而B只有在尝试释放信号量或者尝试刷新信号量的时候才会察觉这一点。

将系统时钟用作获取锁的手段提高了这类竞争条件出现的可能性,导致信号量持有者的数量比预期的还要多,多出的信号量数量与各个系统时钟之间的差异有关——差异越大,出现额外信号量持有者的可能性也就越大。虽然引入计数器和信号量拥有者有序集合可以移除系统时钟这一不确定因素,并降低竞争条件出现的几率,但由于执行信号量获取操作需要客户端和服务器进行多次通信,所以竞争条件还是有可能会发生。

为了消除信号量实现中所有可能出现的竞争条件,构建一个正确的计数器信号量实现,我们需要用到前面构建的带有超时功能的分布式锁。总的来说,当程序想要获取信号量的时候,它会先尝试获取一个带有短暂超时时间的锁。如果程序成功取得了锁,,那么它就会接着执行正常的信号量获取操作。如果程序未能取得锁,那么信号量获取操作也宣告失败。代码清单17展示了执行这一操作的代码。

def acquire_semaphore_with_lock(conn, semname, limit, timeout=10):
    identifier = acquire_lock(conn, semname, acquire_timeout=.01)
    if identifier:
        try:
            return acquire_fair_semaphore(conn, semname, limit, timeout)
        finally:
            release_lock(conn, semname, identifier)

读者可能会感到有些意外,因为令我们困扰至今的竞争条件竟然只需要使用一个锁就可以轻而易举地解决掉,但这种事在使用Redis的时候并不少见:相同或者相似的问题通常会有几种不同的解决方法,而每种解决方法都有各自的优点和缺点。以下是之前介绍过的各个信号量实现的优缺点。

  • 如果你对于使用系统时钟没有意见,也不需要对信号量进行刷新,并且能够接受信号量的数量偶尔超过限制,那么可以使用我们给出的第一个信号量实现。
  • 如果你只信任差距在一两秒之间的系统时钟,但仍然能够接受信号量的数量偶尔超过限制,那么可以使用第二个信号量实现。
  • 如果你希望信号量一直都具有正确的行为,那么可以使用带锁的信号量实现来保证正确性。

在使用锁来解决竞争条件之后,我们拥有了几个不同的信号量实现,而它们遵守信号量限制的程度也各不相同。一般来说,使用最新也最严格遵守限制的实现是最好的,这不仅因为最新的实现是唯一真正正确的实现,更关键的是,如果我们因为图一时之快而使用了带有错误的简陋实现,最终可能会因为使用了太多资源而导致得不偿失。

参考文档

  • Redis实战 第6章 使用Redis构建应用程序组件