使用Redis构建消息拉取系统

Published on 2017 - 06 - 18

两个或多个客户端在互相发送和接收消息的时候,通常会使用以下两种方法来传递消息。第一种方法被称为消息推送(push messaging),也就是由发送者来确保所有接收者已经成功接收到了消息。Redis内置了用于进行消息推送的PUBLISH命令和SUBSCRIBE命令。第二种方法被称为消息拉取(pull messaging),这种方法要求接收者自己去获取存储在某种邮箱(mailbox)里面的消息。

尽管消息推送非常有用,但是当客户端因为某些原因而没办法一直保持在线的时候,采用这一消息传递方法的程序就会出现各种各样的问题。为了解决这个问题,我们将编写两个不同的消息拉取方法,并使用它们来代替PUBLISH命令和SUBSCRIBE命令。

因为只有单个接收者的消息传递操作和前面介绍过的先进先出队列有很多共通之处,所以本节首先会介绍如何实现只有单个接收者的消息传递操作,然后再介绍如何开发具有多个接收者的消息传递操作。通过使用自制的多接收者消息传递操作来代替Redis的PUBLISH命令和SUBSCRIBE命令,即使接收者曾经断开过连接,它也可以一封不漏地收到所有发给它的消息。

单接收者消息的发送与订阅替代品

Redis的其中一种常见用法,就是让不同种类的客户端(如服务器进程、聊天室用户等)去监听或者等待它们各自所独有的频道,并作为频道消息的唯一接收者,从频道那里接收传来的消息。很多程序员都选择了使用Redis的PUBLISH命令和SUBSCRIBE命令来发送和等待消息,但是当我们需要在遇到连接故障的情况下仍然不丢失任何消息的时候,PUBLISH命令和SUBSCRIBE命令就派不上用场了。

让我们把目光从Fake Game公司转向Fake Garage创业公司,后者正打算开发一个移动通信应用程序,这个应用通过连接服务器来发送和接收类似短信或彩信的消息,它基本上就是一个文字短信和图片彩信的替代品。应用程序的身份验证部分以及通信部分将由使用Redis作为后端的Web服务器负责,除此之外,消息的存储和路由也是由Redis负责。

每条消息都只会被发送至一个客户端,这一点极大地简化了我们要解决的问题。为了以这种方式来处理消息,我们将为每个移动客户端使用一个列表结构。发送者会把消息放到接收者的列表里面,而接收者客户端则通过发送请求来获取最新的消息。通过使用HTTP 1.1协议的流水线请求特性或者新型的Web套接字功能,移动客户端既可以在一次请求里面获取所有未读消息,又可以每次请求只获取一条未读消息,还可以通过使用LTRIM命令移除列表的前10个元素来获取最新的10条未读消息。

图11展示了用户jack451的未读消息队列是什么样子的。

[图11 jack451的未读消息队列中包含了来自Jill的消息以及来自jack451妈妈的消息]

因为未读消息队列是使用列表结构来实现的,所以发送者只需要检查接收者的未读消息队列,就可以知道接收者最近是否有上线、接收者是否已经收到了之前发送的消息,以及接收者是否有太多未读消息等待处理。对于像PUBLISH命令和SUBSCRIBE命令这种要求接收者必须一直在线的系统来说,被传递的消息可能会丢失,而客户端根本不会察觉这一点。此外,在旧版Redis里面,速度缓慢的客户端可能会导致输出缓冲区不受控制地增长,而在新版Redis里面,速度缓慢的客户端可能会被断开连接。

我们已经实现了只有单个接收者的消息传递操作,接下来是时候讲讲如何在给定频道有多个监听者的情况下,替换PUBLISH命令和SUBSCRIBE命令了。

多接收者消息的发送与订阅替代品

只有单个接收者的消息传递操作虽然有用,但它还是没办法取代PUBLISH命令和SUBSCRIBE命令在多接收者消息传递方面的作用。为此,我们可以改变一下自己看待这个问题的方式。Redis的PUBLISH命令和SUBSCRIBE命令在很多方面就像一个群组聊天(group chat)功能,一个用户是否在线决定了他能否进行群组聊天,而我们想要做的就是去掉“用户需要一直在线才能接收到消息”这一要求,并以群组聊天为背景,实现具有多个接收者的消息传递操作。

我们接下来要解决的仍然是Fake Garage创业公司的问题。在快速地实现了单个用户之间的消息传递系统之后,Fake Garage创业公司意识到使用应用程序来取代短信这个想法的确很棒,并且很多用户都要求他们为应用程序添加群组聊天功能。和之前一样,因为应用程序的客户端可能会在任何时候进行连接或者断开连接,所以我们不能使用内置的PUBLISH命令和SUBSCRIBE命令来实现群组聊天功能。

每个新创建的群组都会有一些初始用户,各个用户都可以按照自己的意愿来参加或者离开群组。群组使用有序集合来记录参加群组的用户,其中有序集合的成员为用户的名字,而成员的分值则是用户在群组内接收到的最大消息ID。用户也会使用有序集合来记录自己参加的所有群组,其中有序集合的成员为群组ID,而成员的分值则是用户在群组内接收到的最大消息ID。图12展示了一些用户信息和群组信息的例子。

[图12 一些群组聊天数据和用户数据示例。群组有序集合(chat ZSET)展示了群组内的用户以及用户已读的最大群组消息ID。至于已读有序集合(seen ZSET)则列出了用户参加的各个群组的ID,以及用户在这些群里面已读的最大群组消息ID]

如图12所示,用户jason22和jeff24都参加了chat:827群组,其中用户jason22看了6条群组消息中的5条。

创建群组聊天会话

群组聊天产生的内容会以消息为成员、消息ID为分值的形式存储在有序集合里面。在创建新群组的时候,程序首先会对一个全局计数器执行自增操作,以此来获得一个新的群组ID。之后,程序会把群组的初始用户全部添加到一个有序集合里面,并将这些用户在群组里面的最大已读消息ID初始化为0,另外还会把这个新群组的ID添加到记录用户已参加群组的有序集合里面。最后,程序会将一条初始化消息放置到群组有序集合里面,以此来向参加聊天的用户发送初始化消息。代码清单24展示了用于创建新群组的代码。

def create_chat(conn, sender, recipients, message, chat_id=None):
    chat_id = chat_id or str(conn.incr('ids:chat:'))      #A

    recipients.append(sender)                             #E
    recipientsd = dict((r, 0) for r in recipients)        #E

    pipeline = conn.pipeline(True)
    pipeline.zadd('chat:' + chat_id, **recipientsd)       #B
    for rec in recipients:                                #C
        pipeline.zadd('seen:' + rec, chat_id, 0)          #C
    pipeline.execute()

    return send_message(conn, chat_id, sender, message)   #D
#A Get a new chat id
#E Set up a dictionary of users to scores to add to the chat ZSET
#B Create the set with the list of people participating
#C Initialize the seen zsets
#D Send the message

create_chat()函数在调用dict()对象构造器的时候使用了生成器表达式(generator expression),使得我们可以快速地构建起一个将多个用户与分值0进行关联的字典,而ZADD命令则通过这个字典来在一次调用中记录多个群组用户。

生成器表达式与字典构造通过传入一个由成双成对的值组成的序列,我们可以快速地构建起一个Python字典,其中每对值的第一个项会成为字典的键,而第二个项则会成为键的值。代码清单24中展示的某些代码看上去有些奇怪,这是因为程序以内联(in-line)的方式生成了将要传给字典的序列。这种生成序列的技术被称为生成器表达式。

发送消息

为了向群组发送消息,程序需要创建一个新的消息ID,并将想要发送的消息添加到群组消息有序集合(chat’s messages ZSET)里面。虽然这个消息发送操作包含了一个竞争条件,但只要使用锁就可以很容易地解决这个问题。代码清单25展示了使用锁来实现的消息发送操作的具体代码。

def send_message(conn, chat_id, sender, message):
    identifier = acquire_lock(conn, 'chat:' + chat_id)
    if not identifier:
        raise Exception("Couldn't get the lock")
    try:
        mid = conn.incr('ids:' + chat_id)                #A
        ts = time.time()                                 #A
        packed = json.dumps({                            #A
            'id': mid,                                   #A
            'ts': ts,                                    #A
            'sender': sender,                            #A
            'message': message,                          #A
        })                                               #A

        conn.zadd('msgs:' + chat_id, packed, mid)        #B
    finally:
        release_lock(conn, 'chat:' + chat_id, identifier)
    return chat_id

#A Prepare the message
#B Send the message to the chat

发送群组消息的绝大部分工作都是在筹备待发送消息的各项信息,之后只要把准备好的消息添加到有序集合里面,发送操作就完成了。send_message()函数使用锁包围了构建消息的代码以及将消息添加到有序集合里面的代码,这样做的原因和我们之前使用锁来实现计数器信号量的原因是一样的。一般来说,当程序使用一个来自Redis的值去构建另一个将要被添加到Redis里面的值时,就需要使用锁或者由WATCH、MULTI和EXEC组成的事务来消除竞争条件。考虑到锁的性能比事务要好,所以send_message()函数选择了使用锁而不是事务。

在这一节中,我们学习了如何创建群组并发送群组消息,在接下来的内容里面,我们将继续学习如何让用户查看自己参加了哪些群组、自己有多少未读消息,以及用户是如何接收消息的。

获取消息

为了获取用户的所有未读消息,程序需要对记录用户数据的有序集合执行ZRANGE命令,以此来获取群组ID以及已读消息ID,然后根据这两个ID,对用户参与的所有群组的消息有序集合执行ZRANGEBYSCORE命令,以此来取得用户在各个群组内的未读消息。在取得聊天消息之后,程序将根据消息ID对已读有序集合以及群组有序集合里面的用户记录进行更新。最后,程序会查找并清除那些已经被所有人接收到了的群组消息。代码清单26展示了消息获取操作的具体实现代码。

def fetch_pending_messages(conn, recipient):
    seen = conn.zrange('seen:' + recipient, 0, -1, withscores=True) #A

    pipeline = conn.pipeline(True)

    for chat_id, seen_id in seen:                               #B
        pipeline.zrangebyscore(                                 #B
            'msgs:' + chat_id, seen_id+1, 'inf')                #B
    chat_info = zip(seen, pipeline.execute())                   #C

    for i, ((chat_id, seen_id), messages) in enumerate(chat_info):
        if not messages:
            continue
        messages[:] = map(json.loads, messages)
        seen_id = messages[-1]['id']                            #D
        conn.zadd('chat:' + chat_id, recipient, seen_id)        #D

        min_id = conn.zrange(                                   #E
            'chat:' + chat_id, 0, 0, withscores=True)           #E

        pipeline.zadd('seen:' + recipient, chat_id, seen_id)    #F
        if min_id:
            pipeline.zremrangebyscore(                          #G
                'msgs:' + chat_id, 0, min_id[0][1])             #G
        chat_info[i] = (chat_id, messages)
    pipeline.execute()

    return chat_info

#A Get the last message ids received
#B Fetch all new messages
#C Prepare information about the data to be returned
#D Update the 'chat' ZSET with the most recently received message
#E Discover messages that have been seen by all users
#F Update the 'seen' ZSET
#G Clean out messages that have been seen by all users

获取未读消息的工作就是遍历用户参与的所有群组,取出每个群组的未读消息,并顺便清理那些已经被所有群组用户看过的消息。

加入群组和离开群组

我们已经知道了如何从群组里面获取未读消息,接下来要做的就是实现加入群组和离开群组这两个操作了。为了把用户加入给定的群组里面,程序需要将群组的ID作为成员添加到用户的已读消息有序集合里面,并将这个群组最新一条消息的ID设置为成员的分值。此外,程序还会将用户添加到群组的成员列表里面,而用户在成员列表里面的分值同样为最新群组消息的ID。代码清单27展示了加入群组这一操作的具体实现代码。

def join_chat(conn, chat_id, user):
    message_id = int(conn.get('ids:' + chat_id))                #A

    pipeline = conn.pipeline(True)
    pipeline.zadd('chat:' + chat_id, user, message_id)          #B
    pipeline.zadd('seen:' + user, chat_id, message_id)          #C
    pipeline.execute()

#A Get the most recent message id for the chat
#B Add the user to the chat member list
#C Add the chat to the users's seen list

join_chat()函数要做的就是在用户和群组之间以及群组和用户的已读消息有序集合之间,建立起正确的引用信息。

为了将用户从给定的群组中移除,程序需要从群组有序集合里面移除用户的ID,并从用户的已读消息有序集合里面移除给定群组的相关信息。在移除操作完成之后,如果群组已经没有任何成员存在,那么群组的消息有序集合以及消息ID计数器将被删除。如果群组还有成员存在,那么程序将再次查找并清除那些已经被所有成员阅读过的群组消息。代码清单28展示了离开群组这一操作的具体实现代码。

def leave_chat(conn, chat_id, user):
    pipeline = conn.pipeline(True)
    pipeline.zrem('chat:' + chat_id, user)                      #A
    pipeline.zrem('seen:' + user, chat_id)                      #A
    pipeline.zcard('chat:' + chat_id)                           #B

    if not pipeline.execute()[-1]:
        pipeline.delete('msgs:' + chat_id)                      #C
        pipeline.delete('ids:' + chat_id)                       #C
        pipeline.execute()
    else:
        oldest = conn.zrange(                                   #D
            'chat:' + chat_id, 0, 0, withscores=True)           #D
        conn.zremrangebyscore('msgs:' + chat_id, 0, oldest[0][1])     #E

#A Remove the user from the chat
#B Find the number of remaining group members
#C Delete the chat
#D Find the oldest message seen by all users
#E Delete old messages from the chat

在用户离开群组之后执行清理操作并不困难,只是要小心注意各种细节,以免忘了对有序集合和ID进行处理。

本节以群组聊天为背景,介绍了构建一个完整的多接收者消息拉取系统的具体方法,每当我们希望接收者不会因为断线而错过任何消息的时候,就可以使用本节介绍的方法来代替PUBLISH命令和SUBSCRIBE命令。如果有需要的话,我们也可以多花一点儿工夫,把群组聊天实现里面的有序集合结构换成列表结构,或者把发送消息时的加锁操作转移到清理旧消息时执行。我们之所以坚持使用有序集合而不是列表,是因为使用有序集合可以更方便地从每个群组里面取出当前的消息ID。同样地,通过将加锁操作交给消息发送者来执行,消息接收者可以免于请求额外的数据,并且也无需在执行清理操作时进行加锁,这从总体上提高了性能。

参考文档

  • Redis实战 第6章 使用Redis构建应用程序组件