Redis事务介绍

Published on 2017 - 06 - 03

为了保证数据的正确性,我们必须认识到这一点:在多个客户端同时处理相同的数据时,不谨慎的操作很容易会导致数据出错。本文将介绍使用Redis事务来防止数据出错的方法,以及在某些情况下,使用事务来提升性能的方法。

Redis的事务和传统关系数据库的事务并不相同。在关系数据库中,用户首先向数据库服务器发送BEGIN,然后执行各个相互一致(consistent)的写操作和读操作,最后,用户可以选择发送COMMIT来确认之前所做的修改,或者发送ROLLBACK来放弃那些修改。

在Redis里面也有简单的方法可以处理一连串相互一致的读操作和写操作。Redis的事务以特殊命令MULTI为开始,之后跟着用户传入的多个命令,最后以EXEC为结束。但是由于这种简单的事务在EXEC命令被调用之前不会执行任何实际操作,所以用户将没办法根据读取到的数据来做决定。这个问题看上去似乎无足轻重,但实际上无法以一致的形式读取数据将导致某一类型的问题变得难以解决,除此之外,因为在多个事务同时处理同一个对象时通常需要用到二阶提交(two-phase commit),所以如果事务不能以一致的形式读取数据,那么二阶提交将无法实现,从而导致一些原本可以成功执行的事务沦落至执行失败的地步。比如说:“在市场里面购买一件商品”就是其中一个会因为无法以一致的形式读取数据而变得难以解决的问题,本文接下来将在实际环境中对这个问题进行介绍。

延迟执行事务有助于提升性能因为Redis在执行事务的过程中,会延迟执行已入队的命令直到客户端发送EXEC命令为止。因此,包括本文使用的Python客户端在内的很多Redis客户端都会等到事务包含的所有命令都出现了之后,才一次性地将MULTI命令、要在事务中执行的一系列命令,以及EXEC命令全部发送给Redis,然后等待直到接收到所有命令的回复为止。这种“一次性发送多个命令,然后等待所有回复出现”的做法通常被称为流水线(pipelining),它可以通过减少客户端与Redis服务器之间的网络通信次数来提升Redis在执行多个命令时的性能。

最近几个月,Fake Game公司发现他们在YouTwitFace(一个虚构的社交网站)上面推出的角色扮演网页游戏正在变得越来越受欢迎。因此,关心玩家需求的Fake Game公司决定在游戏里面增加一个商品买卖市场,让玩家们可以在市场里面销售和购买商品。本节接下来的内容将介绍设计和实现这个商品买卖市场的方法,并说明如何按需对这个商品买卖市场进行扩展。

定义用户信息和用户包裹

图2展示了游戏中用于表示用户信息和用户包裹(inventory)的结构:用户信息存储在一个散列里面,散列的各个键值对分别记录了用户的姓名、用户拥有的钱数等属性。用户包裹使用一个集合来表示,它记录了包裹里面每件商品的唯一编号。


[图2 用户信息示例和用户包裹示例。Frank有43块钱,并且他打算卖掉自己包裹里面的其中一件商品]

商品买卖市场的需求非常简单:一个用户(卖家)可以将自己的商品按照给定的价格放到市场上进行销售,当另一个用户(买家)购买这个商品时,卖家就会收到钱。另外,本节实现的市场只能根据商品的价格来进行排序。

为了将被销售商品的全部信息都存储到市场里面,我们会将商品的ID和卖家的ID拼接起来,并将拼接的结果用作成员存储到市场有序集合(market ZSET)里面,而商品的售价则用作成员的分值。通过将所有数据都包含在一起,我们极大地简化了实现商品买卖市场所需的数据结构,并且因为市场里面的所有商品都按照价格排序,所以针对商品的分页功能和查找功能都可以很容易地实现。图3展示了一个只包含数个商品的市场例子。


[图3 一个基本的商品买卖市场,其中用户4正在销售商品ItemA,售价为35块钱]

既然我们已经知道了实现商品买卖市场所需的数据结构,那么接下来该考虑如何实现市场的商品上架功能了。

将商品放到市场上销售

为了将商品放到市场上进行销售,程序除了要使用MULTI命令和EXEC命令之外,还需要配合使用WATCH命令,有时候甚至还会用到UNWATCH或DISCARD命令。在用户使用WATCH命令对键进行监视之后,直到用户执行EXEC命令的这段时间里面,如果有其他客户端抢先对任何被监视的键进行了替换、更新或删除等操作,那么当用户尝试执行EXEC命令的时候,事务将失败并返回一个错误(之后用户可以选择重试事务或者放弃事务)。通过使用WATCH、MULTI/EXEC、 UNWATCH/DISCARD等命令,程序可以在执行某些重要操作的时候,通过确保自己正在使用的数据没有发生变化来避免数据出错。

什么是DISCARD?UNWATCH命令可以在WATCH命令执行之后、MULTI命令执行之前对连接进行重置(reset);同样地,DISCARD命令也可以在MULTI命令执行之后、EXEC命令执行之前对连接进行重置。这也就是说,用户在使用WATCH监视一个或多个键,接着使用MULTI开始一个新的事务,并将多个命令入队到事务队列之后,仍然可以通过发送DISCARD命令来取消WATCH命令并清空所有已入队命令。本文展示的例子都没有用到DISCARD,主要原因在于我们已经清楚地知道自己是否想要执行MULTI/EXEC或者UNWATCH,所以没有必要在这些例子里面使用DISCARD。

在将一件商品放到市场上进行销售的时候,程序需要将被销售的商品添加到记录市场正在销售商品的有序集合里面,并且在添加操作执行的过程中,监视卖家的包裹以确保被销售的商品的确存在于卖家的包裹当中,代码清单5展示了这一操作的具体实现。

def list_item(conn, itemid, sellerid, price):
    inventory = "inventory:%s"%sellerid
    item = "%s.%s"%(itemid, sellerid)
    end = time.time() + 5
    pipe = conn.pipeline()

    while time.time() < end:
        try:
            pipe.watch(inventory)                    #A
            if not pipe.sismember(inventory, itemid):#B
                pipe.unwatch()                       #E
                return None

            pipe.multi()                             #C
            pipe.zadd("market:", item, price)        #C
            pipe.srem(inventory, itemid)             #C
            pipe.execute()                           #F
            return True
        except redis.exceptions.WatchError:          #D
            pass                                     #D
#A Watch for changes to the users's inventory
#B Verify that the user still has the item to be listed
#E If the item is not in the user's inventory, stop watching the inventory key and return
#C Actually list the item
#F If execute returns without a WatchError being raised, then the transaction is complete and the inventory key is no longer watched
#D The user's inventory was changed, retry

list_item()函数的行为就和我们之前描述的一样:它首先执行一些初始化步骤,然后对卖家的包裹进行监视,验证卖家想要销售的商品是否仍然存在于卖家的包裹当中,如果是的话,函数就会将被销售的商品添加到买卖市场里面,并从卖家的包裹中移除该商品。正如函数中的while循环所示,在使用WATCH命令对包裹进行监视的过程中,如果包裹被更新或者修改,那么程序将接收到错误并进行重试。

图4展示了当Frank(用户ID为17)尝试以97块钱的价格销售ItemM时,list_item()函数的执行过程。


[图4 list_item(conn, "ItemM", 17, 97)的执行过程]

因为程序会确保用户只能销售他们自己所拥有的商品,所以在一般情况下,用户都可以顺利地将自己想要销售的商品添加到商品买卖市场上面,但是正如之前所说,如果用户的包裹在WATCH执行之后直到EXEC执行之前的这段时间内发生了变化,那么添加操作将执行失败并重试。

在弄懂了怎样将商品放到市场上销售之后,接下来让我们来了解一下怎样从市场上购买商品。

购买商品

代码清单6中的purchase_item()函数展示了从市场里面购买一件商品的具体方法:程序首先使用WATCH对市场以及买家的个人信息进行监视,然后获取买家拥有的钱数以及商品的售价,并检查买家是否有足够的钱来购买该商品。如果买家没有足够的钱,那么程序会取消事务;相反地,如果买家的钱足够,那么程序首先会将买家支付的钱转移给卖家,然后将售出的商品移动至买家的包裹,并将该商品从市场中移除。当买家的个人信息或者商品买卖市场出现变化而导致WatchError异常出现时,程序将进行重试,其中最大重试时间为10秒。

def purchase_item(conn, buyerid, itemid, sellerid, lprice):
    buyer = "users:%s"%buyerid
    seller = "users:%s"%sellerid
    item = "%s.%s"%(itemid, sellerid)
    inventory = "inventory:%s"%buyerid
    end = time.time() + 10
    pipe = conn.pipeline()

    while time.time() < end:
        try:
            pipe.watch("market:", buyer)                #A

            price = pipe.zscore("market:", item)        #B
            funds = int(pipe.hget(buyer, "funds"))      #B
            if price != lprice or price > funds:        #B
                pipe.unwatch()                          #B
                return None

            pipe.multi()                                #C
            pipe.hincrby(seller, "funds", int(price))   #C
            pipe.hincrby(buyer, "funds", int(-price))   #C
            pipe.sadd(inventory, itemid)                #C
            pipe.zrem("market:", item)                  #C
            pipe.execute()                              #C
            return True
        except redis.exceptions.WatchError:             #D
            pass                                        #D

    return False
#A Watch for changes to the market and to the buyer's account information
#B Check for a sold/repriced item or insufficient funds
#C Transfer funds from the buyer to the seller, and transfer the item to the buyer
#D Retry if the buyer's account or the market changed

在执行商品购买操作的时候,程序除了需要花费大量时间来准备相关数据之外,还需要对商品买卖市场以及买家的个人信息进行监视:监视商品买卖市场是为了确保买家想要购买的商品仍然有售(或者在商品已经被其他人买走时进行提示),而监视买家的个人信息则是为了验证买家是否有足够的钱来购买自己想要的商品。

当程序确认商品仍然存在并且买家有足够钱的时候,程序会将被购买的商品移动到买家的包裹里面,并将买家支付的钱转移给卖家。
在观察了市场上展示的商品之后,Bill(用户ID为27)决定购买Frank在市场上销售的ItemM,图5和图6展示了购买操作执行期间,数据结构是如何变化的。


[图5 在购买指定商品之前,程序必须对商品买卖市场以及买家的个人信息进行监视,检查指定商品是否仍然存在,以及买家是否有足够的钱来购买该商品]


[图6 商品购买操作的执行流程如下:程序首先需要将买家支付的钱转移给卖家,然后从商品买卖市场里面移除被售出的商品,最后将该商品添加到买家的包裹里面]

正如之前的代码清单6所示,如果商品买卖市场有序集合(market ZSET)或者Bill的个人信息在WATCH和EXEC执行之间发生了变化,那么purchase_item()将进行重试,或者在重试操作超时之后放弃此次购买操作。

为什么Redis没有实现典型的加锁功能?在访问以写入为目的数据的时候(SQL中的SELECT FOR UPDATE),关系数据库会对被访问的数据行进行加锁,直到事务被提交(COMMIT)或者被回滚(ROLLBACK)为止。如果有其他客户端试图对被加锁的数据行进行写入,那么该客户端将被阻塞,直到第一个事务执行完毕为止。加锁在实际使用中非常有效,基本上所有关系数据库都实现了这种加锁功能,它的缺点在于,持有锁的客户端运行越慢,等待解锁的客户端被阻塞的时间就越长。

因为加锁有可能会造成长时间的等待,所以Redis为了尽可能地减少客户端的等待时间,并不会在执行WATCH命令时对数据进行加锁。相反地,Redis只会在数据已经被其他客户端抢先修改了的情况下,通知执行了WATCH命令的客户端,这种做法被称为乐观锁(optimistic locking),而关系数据库实际执行的加锁操作则被称为悲观锁(pessimistic locking)。乐观锁在实际使用中同样非常有效,因为客户端永远不必花时间去等待第一个取得锁的客户端——它们只需要在自己的事务执行失败时进行重试就可以了。

参考文档