使用Redis构建分布式锁

Published on 2017 - 06 - 13

一般来说,在对数据进行“加锁”时,程序首先需要通过获取(acquire)锁来得到对数据进行排他性访问的能力,然后才能对数据执行一系列操作,最后还要将锁释放(release)给其他程序。对于能够被多个线程访问的共享内存数据结构(shared-memory data structure)来说,这种“先获取锁,然后执行操作,最后释放锁”的动作非常常见。Redis使用WATCH命令来代替对数据进行加锁,因为WATCH只会在数据被其他客户端抢先修改了的情况下通知执行了这个命令的客户端,而不会阻止其他客户端对数据进行修改,所以这个命令被称为乐观锁(optimistic locking)。

分布式锁也有类似的“首先获取锁,然后执行操作,最后释放锁”动作,但这种锁既不是给同一个进程中的多个线程使用,也不是给同一台机器上的多个进程使用,而是由不同机器上的不同Redis客户端进行获取和释放的。何时使用以及是否使用WATCH或者锁取决于给定的应用程序:有的应用不需要使用锁就可以正确地运行,而有的应用只需要使用少量的锁,还有的应用需要在每个步骤都使用锁,不一而足。

我们没有直接使用操作系统级别的锁、编程语言级别的锁,或者其他各式各样的锁,而是选择了花费大量时间去使用Redis构建锁,这其中一个原因和范围(score)有关:为了对Redis存储的数据进行排他性访问,客户端需要访问一个锁,这个锁必须定义在一个可以让所有客户端都看得见的范围之内,而这个范围就是Redis本身,因此我们需要把锁构建在Redis里面。另一方面,虽然Redis提供的SETNX命令确实具有基本的加锁功能,但它的功能并不完整,并且也不具备分布式锁常见的一些高级特性,所以我们还是需要自己动手来构建分布式锁。

这一节将会说明“为什么使用WATCH命令来监视被频繁访问的键可能会引起性能问题”,还会展示构建一个锁的详细步骤,并最终在某些情况下使用锁去代替WATCH命令。

锁的重要性

之前展示的第1版自动补全程序在向列表添加元素或者从列表中移除元素的时候,会使用MULTI和EXEC来包裹多个命令调用。之前为了实现游戏中的商品交易市场也引入了由WATCH、MULTI和EXEC组成的事务,如果读者对此还有印象的话,应该会记得那个市场就是一个有序集合,其中集合的成员由商品ID和卖家ID组成,而成员的分值则是商品的售价。另外,游戏中的每个玩家都有一个与之对应的散列,这个散列记录了玩家的名字、当前拥有的钱数以及其他相关信息。图2展示了这个市场、玩家包裹以及玩家信息的例子。

当玩家将商品放到市场上面进行销售的时候,为了确保被出售的商品的确存在于玩家的包裹里面,程序首先会使用WATCH命令来监视玩家的包裹,然后将被出售的商品添加到代表市场的有序集合里面,最后从玩家的包裹里面移除被出售的商品。代码清单6给出了list_item()函数的核心代码。

def list_item(conn, itemid, sellerid, price):
    #...
            pipe.watch(inv)                             #A
            if not pipe.sismember(inv, itemid):         #B
                pipe.unwatch()                          #B
                return None

            pipe.multi()                                #C
            pipe.zadd("market:", item, price)           #C
            pipe.srem(inv, itemid)                      #C
            pipe.execute()                              #C
            return True
#A Watch for changes to the users's inventory
#B Verify that the user still has the item to be listed
#C Actually list the item

现在来回顾一下商品的购物过程。当玩家在市场上购买商品的时候,程序首先需要使用WATCH去监视市场以及买家的个人信息散列,在得知买家现有的钱数以及商品的售价之后,程序会验证买家是否有足够的钱来购买指定的商品:如果买家有足够的钱,那么程序会将买家支付的钱转移给卖家,接着将商品添加到买家的包裹里面,并从市场里面移除已被售出的商品;相反地,如果买家没有足够的钱来购买商品,那么程序就会取消事务。在执行购买操作的过程中,如果有其他玩家对市场进行了改动,或者因为记录买家个人信息的散列出现了变化而引发了WATCH错误,那么程序将重新执行购买操作。代码清单7给出了purchase_item()函数的核心代码。

def purchase_item(conn, buyerid, itemid, sellerid, lprice):
    #...
            pipe.watch("market:", buyer)                #A

            price = pipe.zscore("market:", item)        #B
            funds = int(pipe.hget(buyer, 'funds'))      #B
            if price != lprice or price > funds:        #B
                pipe.unwatch()                          #B
                return None

            pipe.multi()                                #C
            pipe.hincrby(seller, 'funds', int(price))   #C
            pipe.hincrby(buyerid, 'funds', int(-price)) #C
            pipe.sadd(inventory, itemid)                #C
            pipe.zrem("market:", item)                  #C
            pipe.execute()                              #C
            return True
#A Watch for changes to the market and the buyer's account information
#B Check for a sold/repriced item or insufficient funds
#C Transfer funds from the buyer to the seller, and transfer the item to the buyer

为了展示锁对于性能扩展的必要性,我们会模拟市场在3种不同负载情况下的性能表现,这3种情况分别是1个玩家出售商品,另1个玩家购买商品;5个玩家出售商品,另1个玩家购买商品;以及5个玩家出售商品,另外5个玩家购买商品。表1展示了模拟的结果。

上架商品数量 买入商品数量 购买重试次数 每次购买的平均等待时间
1个卖家,1个买家 145 000 27 000 80 000 14 ms
5个卖家,1个买家 331 000 <200 50 000 150 ms
5个卖家,5个买家 206 000 <600 161 000 498 ms

根据表1的模拟结果显示,随着负载不断增加,系统完成一次交易所需的重试次数从最初的3次上升到了250次,与此同时,完成一次交易所需的等待时间也从最初的少于10 ms上升到了500 ms。这个模拟示例完美地展示了为什么WATCH、MULTI和EXEC组成的事务并不具有可扩展性,原因在于程序在尝试完成一个事务的时候,可能会因为事务执行失败而反复地进行重试。保证数据的正确性是一件非常重要的事情,但使用WATCH命令的做法并不完美。为了解决这个问题,并以可扩展的方式来处理市场交易,我们将使用锁来保证市场在任一时刻只能上架或者销售一件商品。

简易锁

本文接下来将向读者介绍第1版的锁实现,这个锁非常简单,并且在一些情况下可能会无法正常运作。我们在刚开始构建锁的时候,并不会立即处理那些可能会导致锁无法正常运作的问题,而是先构建出可以运行的锁获取操作和锁释放过程,等到证明了使用锁的确可以提升性能之后,才会回过头去一个接一个地解决那些引发锁故障的问题。

因为客户端即使在使用锁的过程中也可能会因为这样或那样的原因而下线,所以为了防止客户端在取得锁之后崩溃,并导致锁一直处于“已被获取”的状态,最终版的锁实现将带有超时限制特性:如果获得锁的进程未能在指定的时限内完成操作,那么锁将自动被释放。

虽然很多Redis用户都对锁(lock)、加锁(locking)及锁超时(lock timeouts)有所了解,但遗憾的是,大部分使用Redis实现的锁只是基本上正确,它们发生故障的时间和方式通常难以预料。下面列出了一些导致锁出现不正确行为的原因,以及锁在不正确运行时的症状。

  • 持有锁的进程因为操作时间过长而导致锁被自动释放,但进程本身并不知晓这一点,甚至还可能会错误地释放掉了其他进程持有的锁。
  • 一个持有锁并打算执行长时间操作的进程已经崩溃,但其他想要获取锁的进程不知道哪个进程持有着锁,也无法检测出持有锁的进程已经崩溃,只能白白地浪费时间等待锁被释放。
  • 在一个进程持有的锁过期之后,其他多个进程同时尝试去获取锁,并且都获得了锁。
  • 上面提到的第一种情况和第三种情况同时出现,导致有多个进程获得了锁,而每个进程都以为自己是唯一一个获得锁的进程。

因为Redis在最新的硬件上可以每秒执行100 000个操作,而在高端的硬件上甚至可以每秒执行将近225 000个操作,所以尽管上面提到的问题出现的几率只有万分之一,但这些问题在高负载的情况下还是有可能会出现,因此,让锁正确地运作起来仍然是一件相当重要的事情。

使用Redis构建锁

使用Redis构建一个基本上正确的锁非常简单,如果在实现锁时能够对用到的操作多加留心的话,那么使用Redis构建一个完全正确的锁也并不是一件非常困难的事情。本节接下来要介绍的是锁实现的第1个版本,这个版本的锁要做的事就是正确地实现基本的加锁功能,而之后的一节将会介绍如何处理过期的锁以及因为持有者崩溃而无法释放的锁。

为了对数据进行排他性访问,程序首先要做的就是获取锁。SETNX命令天生就适合用来实现锁的获取功能,这个命令只会在键不存在的情况下为键设置值,而锁要做的就是将一个随机生成的128位UUID设置为键的值,并使用这个值来防止锁被其他进程取得。

如果程序在尝试获取锁的时候失败,那么它将不断地进行重试,直到成功地取得锁或者超过给定的时限为止,正如代码清单8所示。

def acquire_lock(conn, lockname, acquire_timeout=10):
    identifier = str(uuid.uuid4())                      #A

    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx('lock:' + lockname, identifier):  #B
            return identifier

        time.sleep(.001)

    return False
#A A 128-bit random identifier
#B Get the lock

acquire_lock()函数的行为和前面描述的一样:它会使用SETNX命令,尝试在代表锁的键不存在的情况下,为键设置一个值,以此来获取锁;在获取锁失败的时候,函数会在给定的时限内进行重试,直到成功获取锁或者超过给定的时限为止(默认的重试时限为10秒)。

在实现了锁之后,我们就可以使用锁来代替针对市场的WATCH操作了。代码清单9展示了使用锁重新实现的商品购买操作:程序首先对市场进行加锁,接着检查商品的价格,并在确保买家有足够的钱来购买商品之后,对钱和商品进行相应的转移。当操作执行完毕之后,程序就会释放锁。

def purchase_item_with_lock(conn, buyerid, itemid, sellerid):
    buyer = "users:%s" % buyerid
    seller = "users:%s" % sellerid
    item = "%s.%s" % (itemid, sellerid)
    inventory = "inventory:%s" % buyerid

    locked = acquire_lock(conn, 'market:')     #A
    if not locked:
        return False

    pipe = conn.pipeline(True)
    try:
        pipe.zscore("market:", item)           #B
        pipe.hget(buyer, 'funds')              #B
        price, funds = pipe.execute()          #B
        if price is None or price > funds:     #B
            return None                        #B

        pipe.hincrby(seller, 'funds', int(price))  #C
        pipe.hincrby(buyer, 'funds', int(-price))  #C
        pipe.sadd(inventory, itemid)               #C
        pipe.zrem("market:", item)                 #C
        pipe.execute()                             #C
        return True
    finally:
        release_lock(conn, 'market:', locked)      #D
#A Get the lock
#B Check for a sold item or insufficient funds
#C Transfer funds from the buyer to the seller, and transfer the item to the buyer
#D Release the lock

初看上去,代码清单9中的锁似乎是用来加锁整个购买操作的,但实际上并非如此——这把锁是用来锁住市场数据的,它之所以会包围着执行购买操作的代码,是因为程序在操作市场数据期间必须一直持有锁。

因为在程序持有锁期间,其他客户端可能会擅自对锁进行修改,所以锁的释放操作需要和加锁操作一样小心谨慎地进行。代码清单10中的release_lock()函数展示了锁释放操作的实现代码:函数首先使用WATCH命令监视代表锁的键,接着检查键目前的值是否和加锁时设置的值相同,并在确认值没有变化之后删除该键(这个检查还可以防止程序错误地释放同一个锁多次)。

def release_lock(conn, lockname, identifier):
    pipe = conn.pipeline(True)
    lockname = 'lock:' + lockname

    while True:
        try:
            pipe.watch(lockname)                  #A
            if pipe.get(lockname) == identifier:  #A
                pipe.multi()                      #B
                pipe.delete(lockname)             #B
                pipe.execute()                    #B
                return True                       #B

            pipe.unwatch()
            break

        except redis.exceptions.WatchError:       #C
            pass                                  #C

    return False                                  #D
#A Check and verify that we still have the lock
#B Release the lock
#C Someone else did something with the lock, retry
#D We lost the lock

和之前展示的商品购买操作一样,release_lock()函数也做了很多措施来确保锁没有被修改。需要注意的一点是,对于目前的锁实现来说,release_lock()函数包含的无限循环只会在极少数情况下用到——函数之所以包含这个无限循环,主要是因为之后介绍的锁实现会支持超时限制特性,而如果用户不小心地混合使用了两个版本的锁,就可能会引起解锁事务失败,并导致上锁时间被不必要地延长。尽管这种情况并不常见,但为了保证解锁操作在各种情况下都能够正确地执行,我们还是选择在一开始就把这个无限循环添加到release_lock()函数里面。

在使用锁代替WATCH重新实现商品购买操作之后,我们可以再次进行之前的商品买卖模拟操作:表2中的单数行展示了WATCH实现的模拟结果,而表中的复数行则展示了在与前一行条件相同的情况下,锁实现的模拟结果。

上架商品数量 买入商品数量 购买重试次数 每次购买的平均等待时间
1个卖家,1个买家,使用WATCH 145 000 27 000 80 000 14ms
1个卖家,1个买家,使用锁 51 000 50 000 0 1ms
5个卖家,1个买家,使用WATCH 331 000 <200 50 000 150ms
5个卖家,1个买家,使用锁 68 000 13 000 <10 5ms
5个卖家,5个买家,使用WATCH 206 000 <600 161 000 498ms
5个卖家,5个买家,使用锁 21 000 20 500 0 14ms

与之前的WATCH实现相比,锁实现的上架商品数量虽然有所减少,但是在买入商品时却不需要进行重试,并且上架商品数量和买入商品数量之间的比率,也跟卖家数量和买家数量之间的比率接近。目前来说,不同上架和买入进程之间的竞争限制了商品买卖操作性能的进一步提升,而接下来介绍的细粒度锁将解决这个问题。

细粒度锁

在前面介绍锁实现以及加锁操作的时候,我们考虑的是如何实现与WATCH命令粒度相同的锁——这种锁可以把整个市场都锁住。因为我们是自己动手来构建锁实现,并且我们关心的不是整个市场,而是市场里面的某件商品是否存在,所以我们实际上可以将加锁的粒度变得更细一些。通过只锁住被买卖的商品而不是整个市场,可以减少锁竞争出现的几率并提升程序的性能。

表3展示了使用只对单个商品进行加锁的锁实现之后,进行与表6-2所示相同的模拟时的结果。

上架商品数量 买入商品数量 购买重试次数 每次购买的平均等待时间
1个卖家,1个买家,使用WATCH 145 000 27 000 80 000 14ms
1个卖家,1个买家,使用锁 51 000 50 000 0 1ms
1个卖家,1个买家,使用细粒度锁 113 000 110 000 0 <1ms
5个卖家,1个买家,使用WATCH 331 000 <200 50 000 150ms
5个卖家,1个买家,使用锁 68 000 13 000 <10 5ms
5个卖家,1个买家,使用细粒度锁 192 000 36 000 0 <2ms
5个卖家,5个买家,使用WATCH 206 000 <600 161 000 498ms
5个卖家,5个买家,使用锁 21 000 20 500 0 14ms
5个卖家,5个买家,使用细粒度锁 116 000 111 000 0 <3ms

表3中的模拟结果显示,在使用细粒度锁的情况下,无论有多少个上架进程和买入进程在运行,程序总能在60秒内完成220 000~230 000次的上架和买入操作,并且不会引发任何重试操作。除此之外,买入操作的延迟时间即使在高负载情况下也不会超过3毫秒。在使用细粒度锁的时候买卖操作的执行次数比率跟买家数量和卖家数量之间的比率基本一致,这和使用粗粒度锁时的情况非常相似。更关键的是,锁可以有效地避免WATCH实现因为买入操作竞争过多而导致延迟剧增甚至无法执行的问题。

在接下来的内容中,我们将通过图片的方式直观地了解几个不同的锁实现在性能方面的差异。从图3可以看出,在负载条件相同的情况下,使用锁实现成功买入的商品数量,比使用WATCH实现成功买入的商品数量要多得多。


[图3 在60秒内买入的商品数量。因为系统已经超负载运行,所以图片总体上呈V字形,当有5个上架进程和1个买入进程在进行时,商品的在售数量和买入数量之间的比率约为5比1]

接下来的图4展示了WATCH实现仅仅为了完成少量的交易也需要进行数千次昂贵的重试操作。


[图4 在60秒内因为购买商品而引发的重试次数。因为两种锁实现都不会引发重试,而记录细粒度锁数据的线条正好覆盖住了记录粗粒度锁数据的线条,因此我们在图中只会看见记录细粒度锁数据的线条,而看不见记录粗粒度锁数据的线条]

图5展示了因为WATCH竞争而引发的重试次数剧增以及购买量锐减问题,而使用锁则可以有效地减少购买商品时的等待时间。


[图5 在购买商品时,以毫秒为单位的平均等待时间。因为两种锁实现的最大延迟时间都低于14 ms,所以它们的数据线条都位于图片的底部,几乎难以察觉。而使用WATCH时,系统的平均等待时间却接近500 ms]

通过以上展示的模拟结果以及数据图表,我们可以看出,在高负载情况下,使用锁可以减少重试次数、降低延迟时间、提升性能并将加锁的粒度调整至合适的大小。

需要注意的是,前面进行的模拟并不完美:我们既没有模拟多个买家因为等待其他买家而导致无法购买商品的情况,也没有模拟dogpile效应——dogpile效应指的是,执行事务所需的时间越长,就会有越多待处理的事务互相重叠,这种重叠增加了执行单个事务所需的时间,并使得那些带有时间限制的事务失败的几率大幅上升,最终导致所有事务执行失败的几率和进行重试的几率都大幅地上升,这对于WATCH实现的商品买卖操作来说,影响尤为严重。

在一些情况下,判断应该锁住整个结构还是应该锁住结构中的一小部分是一件非常简单的事情。比如在前面的商品买卖例子中,我们要监视的关键数据为市场中的一件商品,而一件商品只是整个市场中的一小部分数据,所以只锁住一件商品的做法无疑是正确的。但是,在需要锁住的一小部分数据有不止一份的时候,又或者需要锁住结构的多个部分的时候,判断应该对小部分数据进行加锁还是应该直接锁住整个结构就会变得困难起来。除此之外,使用多个细粒度锁也有引发死锁的危险,一不小心就会导致程序无法正常运行。

带有超时限制特性的锁

前面提到过,目前的锁实现在持有者崩溃的时候不会自动被释放,这将导致锁一直处于已被获取的状态。为了解决这个问题,在这一节中,我们将为锁加上超时功能。

为了给锁加上超时限制特性,程序将在取得锁之后,调用EXPIRE命令来为锁设置过期时间,使得Redis可以自动删除超时的锁。为了确保锁在客户端已经崩溃(客户端在执行介于SETNX和EXPIRE之间的时候崩溃是最糟糕的)的情况下仍然能够自动被释放,客户端会在尝试获取锁失败之后,检查锁的超时时间,并为未设置超时时间的锁设置超时时间。因此锁总会带有超时时间,并最终因为超时而自动被释放,使得其他客户端可以继续尝试获取已被释放的锁。

需要注意的一点是,因为多个客户端在同一时间内设置的超时时间基本上都是相同的,所以即使有多个客户端同时为同一个锁设置超时时间,锁的超时时间也不会产生太大变化。

代码清单11展示了给acquire_lock()函数添加超时时间设置代码之后得出的acquire_lock_with_timeout()函数。

def acquire_lock_with_timeout(
    conn, lockname, acquire_timeout=10, lock_timeout=10):
    identifier = str(uuid.uuid4())                      #A
    lockname = 'lock:' + lockname
    lock_timeout = int(math.ceil(lock_timeout))         #D

    end = time.time() + acquire_timeout
    while time.time() < end:
        if conn.setnx(lockname, identifier):            #B
            conn.expire(lockname, lock_timeout)         #B
            return identifier
        elif conn.ttl(lockname) < 0:                    #C
            conn.expire(lockname, lock_timeout)         #C

        time.sleep(.001)

    return False
#A A 128-bit random identifier
#B Get the lock and set the expiration
#C Check and update the expiration time as necessary
#D Only pass integers to our EXPIRE calls

新的acquire_lock_with_timeout()函数给锁增加了超时限制特性,这一特性确保了锁总会在有需要的时候被释放,而不会被某个客户端一直把持着。更棒的是,这个新的加锁函数可以和之前写好的锁释放函数一起使用,我们不需要另外再写新的锁释放函数。

在其他数据库里面,加锁通常是一个自动执行的基本操作。而Redis的WATCH、MULTI和EXEC,就像之前所说的那样,只是一个乐观锁——这种锁只会在数据被其他客户端抢先修改了的情况下,通知加锁的客户端,让它撤销对数据的修改,而不会真正地把数据锁住。通过在客户端上面实现一个真正的锁,程序可以为用户带来更好的性能、更熟悉的编程概念、更简单易用的API,等等。但是与此同时,也请记住Redis并不会主动使用这个自制的锁,我们必须自己使用这个锁来代替WATCH,或者同时使用锁和WATCH协同进行工作,从而保证数据的正确与一致。

参考文档

  • Redis实战 第6章 使用Redis构建应用程序组件