使用Redis构建任务队列

Published on 2017 - 06 - 18

在处理Web客户端发送的命令请求时,某些操作的执行时间可能会比我们预期的更长一些。通过将待执行任务的相关信息放入队列里面,并在之后对队列进行处理,用户可以推迟执行那些需要一段时间才能完成的操作,这种将工作交给任务处理器来执行的做法被称为任务队列(task queue)。现在有很多专门的任务队列软件(如ActiveMQ、RabbitMQ、Gearman、Amazon SQS,等等),另外在缺少专门的任务队列可用的情况下,也有一些临时性的方法可以创建任务队列。比方说使用定期作业来扫描一个数据表,查找那些在给定时间/日期之前或者之后被修改过/被检查过的用户账号,并根据扫描的结果执行某些操作,这也是在创建任务队列。

这一节接下来将介绍两种不同类型的任务队列,第一种队列会根据任务被插入队列的顺序来尽快地执行任务,而第二种队列则具有安排任务在未来某个特定时间执行的能力。

先进先出队列

在队列领域中,除了任务队列之外,其他几种不同的队列也常常会被人谈起——比如先进先出(FIFO)队列、后进先出(LIFO)队列和优先级(priority)队列。因为先进先出队列具有语义清晰、易于实现和速度快等优点,所以本节首先介绍先进先出队列,然后再说明如何实现一个简陋的优先级队列,以及如何实现一个基于时间来执行任务的延迟队列。

下面再来回顾一下Fake Game公司的例子。为了鼓励不常上线的玩家进入游戏,Fake Game公司决定增加一个选项,让玩家可以通过电子邮件来订阅商品交易市场中已售出商品和已过期商品的相关信息。因为对外发送电子邮件可能会有非常高的延迟,甚至可能会出现发送失败的情况,所以我们不能用平时常见的代码流(code flow)方式来执行这个邮件发送操作。为此,我们将使用任务队列来记录邮件的收信人以及发送邮件的原因,并构建一个可以在邮件发送服务器运行变得缓慢的时候,以并行方式一次发送多封邮件的工作进程(worker process)。

我们要编写的队列将以“先到先服务”(first-come,first-served)的方式发送邮件,并且无论发送是否成功,程序都会把发送结果记录到日志里面。Redis的列表结构允许用户通过RPUSH和LPUSH以及RPOP和LPOP,从列表的两端推入和弹出元素。这次的邮件队列将使用RPUSH命令来将待发送的邮件推入列表的右端,并且因为工作进程除了发送邮件之外不需要执行其他工作,所以它将使用阻塞版本的弹出命令BLPOP从队列中弹出待发送的邮件,而命令的最大阻塞时限为30秒(从右边推入元素并从左边弹出元素的做法,符合我们从左向右进行阅读的习惯)。为了简便起见,本节展示的任务队列只会处理已售出商品邮件,但是添加针对已过期商品邮件的支持也并非难事。

我们的邮件队列由一个Redis列表构成,它包含多个JSON编码对象,图9展示了一个这样的队列示例。


[图9 一个使用列表结构实现的先进先出队列]

为了将待发送邮件推入队列里面,程序会获取发送邮件所需的全部信息,并将这些信息序列化为JSON对象,最后使用RPUSH命令将JSON对象推入邮件队列里面。我们使用JSON来进行序列化的原因在于这种格式能够被人类读懂,并且大多数编程语言都提供了能够快速编码和解码JSON格式的函数库。代码清单18展示了程序将一封已售出商品邮件推入邮件队列里面的具体步骤。

def send_sold_email_via_queue(conn, seller, item, price, buyer):
    data = {
        'seller_id': seller,                    #A
        'item_id': item,                        #A
        'price': price,                         #A
        'buyer_id': buyer,                      #A
        'time': time.time()                     #A
    }
    conn.rpush('queue:email', json.dumps(data)) #B

#A Prepare the item
#B Push the item onto the queue

send_sold_email_via_queue()函数要做的就是将一封待发送邮件推入一个由列表结构表示的队列里面,弄懂这一点应该不难。

从队列里面获取待发送邮件也非常容易实现。代码清单19展示了这一操作的实现代码:程序首先使用BLPOP命令从邮件队列里面弹出一个JSON对象,接着通过解码JSON对象来取得待发送邮件的相关信息,最后根据这些信息来发送邮件。

def process_sold_email_queue(conn):
    while not QUIT:
        packed = conn.blpop(['queue:email'], 30)                  #A
        if not packed:                                            #B
            continue                                              #B

        to_send = json.loads(packed[1])                           #C
        try:
            fetch_data_and_send_sold_email(to_send)               #D
        except EmailSendError as err:
            log_error("Failed to send sold email", err, to_send)
        else:
            log_success("Sent sold email", to_send)

#A Try to get a message to send
#B No message to send, try again
#C Load the packed email information
#D Send the email using our pre-written emailing function

process_sold_email_queue()函数的运作原理也非常简单直接,它要做的就是从队列里面取出待发送的邮件,并把邮件真正地发送出去。到目前为止,我们已经完成了一个用于执行邮件发送工作的任务队列,现在要考虑的问题是,如果要执行的任务不止一种,我们该怎么办?

多个可执行任务

因为BLPOP命令每次只会从队列里面弹出一封待发送邮件,所以待发送邮件不会出现重复,也不会被重复发送。并且因为队列只会存放待发送邮件,所以工作进程要处理的任务是非常单一的。在一些情况下,为每种任务单独使用一个队列的做法并不少见,但是在另外一些情况下,如果一个队列能够处理多种不同类型的任务,那么事情就会方便很多。代码清单20展示的工作进程会监视用户提供的多个队列,并从多个已知的已注册回调函数里面,选出一个函数来处理JSON编码的函数调用。队列中每个待执行任务的格式都为['FUNCTION_NAME', [ARG1, ARG2, ...]]。

def worker_watch_queue(conn, queue, callbacks):
    while not QUIT:
        packed = conn.blpop([queue], 30)                    #A
        if not packed:                                      #B
            continue                                        #B

        name, args = json.loads(packed[1])                  #C
        if name not in callbacks:                           #D
            log_error("Unknown callback %s"%name)           #D
            continue                                        #D
        callbacks[name](*args)                              #E

#A Try to get an item from the queue
#B There is nothing to work on, try again
#C Unpack the work item
#D The function is unknown, log the error and try again
#E Execute the task

有了这个通用的工作进程,我们就可以把邮件发送程序写成回调函数,并将它和其他回调函数一同传给工作进程使用。

任务优先级

在使用队列的时候,程序可能会需要让特定的操作优先于其他操作执行。比如对于Fake Game公司来说,他们可能会希望优先发送已售出商品邮件,其次才是已过期商品邮件。或者他们可能会希望优先发送密码重置邮件,其次才是即将推出的线上活动的相关邮件。本书之前介绍的BLPOP命令和BRPOP命令都允许用户给定多个列表作为弹出操作的执行对象:其中BLPOP命令将弹出第一个非空列表的第一个元素,而BRPOP命令则会弹出第一个非空列表的最后一个元素。

假设现在我们需要为任务设置高、中、低3种优先级别,其中:高优先级任务在出现之后会第一时间被执行,而中等优先级任务则会在没有任何高优先级任务存在的情况下被执行,而低优先级任务则会在既没有任何高优先级任务,又没有任何中等优先级任务的情况下被执行。实际上我们只需要修改代码清单20展示的worker_watch_queue()函数的其中两行代码,就可以给任务队列加上优先级特性,修改之后的代码如代码清单21所示。

def worker_watch_queues(conn, queues, callbacks):   #A
    while not QUIT:
        packed = conn.blpop(queues, 30)             #B
        if not packed:
            continue

        name, args = json.loads(packed[1])
        if name not in callbacks:
            log_error("Unknown callback %s"%name)
            continue
        callbacks[name](*args)

#A The first changed line to add priority support
#B The second changed line to add priority support

同时使用多个队列可以降低实现优先级特性的难度。除此之外,多队列有时候也会被用于分隔不同的任务(如一个队列存放公告邮件,而另一个队列则存放提醒邮件,诸如此类),在这种情况下,处理不同队列时可能会出现不公平的现象,为此,我们可以偶尔重新排列各个队列的顺序,使得针对队列的处理操作变得更公平一些——当某个队列的增长速度比其他队列的增长速度快的时候,这种重排操作尤为必要。

延迟任务

使用列表结构可以实现只能执行一种任务的队列,也可以实现通过调用不同回调函数来执行不同任务的队列,甚至还可以实现简单的优先级队列,但是有些时候,这些特性还不足以满足我们的需求。举个例子,假设Fake Game公司决定给游戏添加“延迟销售”特性,让玩家可以在未来的某个时候才开始销售自己的商品,而不是立即就开始进行销售。为了实现这个延迟销售特性,我们需要替换并修改现有的队列实现。

有几种不同的方法可以为队列中的任务添加延迟性质,以下是其中3种最直截了当的方法。

  • 在任务信息中包含任务的执行时间,如果工作进程发现任务的执行时间尚未来临,那么它将在短暂等待之后,把任务重新推入队列里面。
  • 工作进程使用一个本地的等待列表来记录所有需要在未来执行的任务,并在每次进行while循环的时候,检查等待列表并执行那些已经到期的任务。
  • 把所有需要在未来执行的任务都添加到有序集合里面,并将任务的执行时间设置为分值,另外再使用一个进程来查找有序集合里面是否存在可以立即被执行的任务,如果有的话,就从有序集合里面移除那个任务,并将它添加到适当的任务队列里面。

因为无论是进行短暂的等待,还是将任务重新推入队列里面,都会浪费工作进程的时间,所以我们不会采用第一种方法。此外,因为工作进程可能会因为崩溃而丢失本地记录的所有待执行任务,所以我们也不会采用第二种方法。最后,因为使用有序集合的第三种方法最简单和直接,所以我们将采取这一方法,并使用锁来保证任务从有序集合移动到任务队列时的安全性。

有序集合队列(ZSET queue)存储的每个被延迟的任务都是一个包含4个值的JSON列表,这4个值分别是:唯一标识符、处理任务的队列的名字、处理任务的回调函数的名字、传给回调函数的参数。和前面的章节需要生成随机ID时的做法一样,延迟任务包含的每个唯一标识符都是一个随机生成的128位UUID,这个唯一标识符可以用于区分每个被执行的任务,并在将来有需要的时候用来构建任务执行状态报告特性。在有序集合里面,任务的分值会被设置为任务的执行时间,而立即可执行的任务将被直接插入任务队列里面。代码清单22展示了创建延迟任务的代码(任务是否延迟是可选的,只要把任务的延迟时间设置为0就可以创建一个立即执行的任务)。

def execute_later(conn, queue, name, args, delay=0):
    identifier = str(uuid.uuid4())                          #A
    item = json.dumps([identifier, queue, name, args])      #B
    if delay > 0:
        conn.zadd('delayed:', item, time.time() + delay)    #C
    else:
        conn.rpush('queue:' + queue, item)                  #D
    return identifier                                       #E

#A Generate a unique identifier
#B Prepare the item for the queue
#C Delay the item
#D Execute the item immediately
#E Return the identifier

当任务无需被延迟而是可以立即执行的时候,execute_later()函数会直接将任务推入任务队列里面,而需要延迟执行的任务则会被添加到延迟有序集合里面。图10展示了一个使用延迟队列来记录待发送邮件的例子。

[图10 使用有序集合实现延迟任务队列]

因为Redis没有提供直接的方法可以阻塞有序集合直到元素的分值低于当前UNIX时间戳为止,所以我们需要自己来查找有序集合里面分值低于当前UNIX时间戳的任务。因为所有被延迟的任务都存储在同一个有序集合队列里面,所以程序只需要获取有序集合里面排名第一的元素以及该元素的分值就可以了:如果队列里面没有任何任务,或者任务的执行时间尚未来临,那么程序将在短暂等待之后重试;如果任务的执行时间已到,那么程序将根据任务包含的标识符来获取一个细粒度锁,接着从有序集合里面移除要被执行的任务,并将它添加到适当的任务队列里面。通过将可执行的任务添加到任务队列里面而不是直接执行它们,我们可以把获取可执行任务的进程数量限制在一两个之内,而不必根据工作进程的数量来决定运行多少个获取进程,这减少了获取可执行任务所需的花销。代码清单23展示了从延迟队列里面获取可执行任务的实现代码。

def poll_queue(conn):
    while not QUIT:
        item = conn.zrange('delayed:', 0, 0, withscores=True)   #A
        if not item or item[0][1] > time.time():                #B
            time.sleep(.01)                                     #B
            continue                                            #B

        item = item[0][0]                                       #C
        identifier, queue, function, args = json.loads(item)    #C

        locked = acquire_lock(conn, identifier)                 #D
        if not locked:                                          #E
            continue                                            #E

        if conn.zrem('delayed:', item):                         #F
            conn.rpush('queue:' + queue, item)                  #F

        release_lock(conn, identifier, locked)                  #G

#A Get the first item in the queue
#B No item or the item is still to be execued in the future
#C Unpack the item so that we know where it should go
#D Get the lock for the item
#E We couldn't get the lock, so skip it and try again
#F Move the item to the proper list queue
#G Release the lock

正如代码清单23所示,因为有序集合并不具备像列表那样的阻塞弹出机制,所以程序需要不断地进行循环,并尝试从队列里面获取要被执行的任务,虽然这一操作会增大网络和处理器的负载,但因为我们只会运行一两个这样的程序,所以这并不会消耗太多资源。如果想要进一步减少poll_queue()函数的运行开销,那么可以在函数里面添加一个自适应方法(adaptive method),让函数在一段时间内都没有发现可执行的任务时,自动延长休眠的时间,或者根据下一个任务的执行时间来决定休眠的时长,并将休眠时长的最大值限制为100毫秒,从而确保执行时间距离当前时间不远的任务可以及时被执行。

关于优先级

因为延迟任务最终都会被推入对应的任务队列里面,并以相同的优先级执行,所以延迟任务的优先级和任务队列里面存储的普通任务的优先级是基本相同的。但是,如果我们打算在延迟任务的执行时间到达时,优先执行这些任务的话,应该怎么办呢?

要做到这一点的最简单办法就是添加多个额外的队列,使得可以立即执行的延迟任务出现在队列的最前面。举个例子,对于“高优先级”、“中等优先级”、“低优先级”这3个队列,我们可以分别创建“被延迟的高优先级”、“被延迟的中等优先级”、“被延迟的低优先级”这3个队列,并将这些队列以["high-delayed", "high", "medium-delayed", "medium", "low-delayed", "low"]的顺序传入worker_watch_queues()函数里面,这样的话,具有相同优先级的延迟队列就会先于非延迟队列被处理。

一些读者可能会觉得好奇,“既然要将延迟任务放置到队列的最前面,那么为什么不使用LPUSH命令而是使用RPUSH命令呢?”假设所有工作进程都在处理中等优先级队列包含的任务,并且这些任务需要花费数秒钟才能执行完毕,如果这时有3个延迟任务可以执行,那么程序将使用LPUSH命令把它们依次推入中等优先级队列里面:首先推入第一个可执行的延迟任务,然后是第二个,最后是第三个。但是这样一来,最后被推入中等优先级队列里面的延迟任务就会最先被执行,这违背了我们对于“最先可执行的延迟任务总是最先被执行”的预期。

参考文档

  • Redis实战 第6章 使用Redis构建应用程序组件