使用Redis构建简单的社交网站

Published on 2017 - 06 - 25

本文将构建一个和Twitter的后端功能几乎完全相同的社交网站,并对构建这个网站所需的数据结构以及概念进行介绍。虽然本章介绍的知识并不足以构建一个像Twitter那样规模宏大的网站,但这些知识应该能帮助读者更好地理解社交网站是如何由简单的结构和数据构建而成的。

用户和状态

在用户与Twitter进行交互时,用户和状态消息这两类对象是最为重要的。用户对象存储了用户的基本身份标识信息、用户的关注者人数、用户已发布的状态消息数量等信息。用户对象对于社交网站来说非常重要,因为它是构建其他可用并且有趣的数据的起点。除了用户对象以外,状态消息也同样重要,因为它记录了不同的用户都说了些什么,以及不同用户之间进行了什么交流,这些由用户创建的状态消息是社交网站真正的内容。

这一节将说明用户对象和状态消息对象都存储了哪些数据,以及程序是使用什么Redis结构来存储这些信息的,除此之外,本节还会对用于创建新用户的函数进行介绍。

首先,让我们来看看程序使用什么结构来表示用户对象,以及程序是如何创建新用户的。

用户信息

在各式各样的在线服务网站以及社交网站里面,用户对象常常是构建其他一切功能的基础,本章要介绍的仿Twitter网站也不例外。

我们使用散列来存储用户信息,这些信息包括用户的用户名、用户拥有的关注者人数、用户正在关注的人的数量、用户已经发布的状态消息的数量、用户的注册日期以及其他一些元信息(meta-information)。图1展示了一个使用散列存储用户信息的例子,其中被展示用户的名字为dr_josiah,这也是笔者在Twitter上使用的用户名。


[图1 使用散列存储用户信息的示例]

从图1可以看出笔者的关注者数量以及其他一些信息。当一个新用户进行注册的时候,程序需要做的就是根据用户指定的用户名以及当时的时间戳,创建一个正在关注数量、关注者数量、已发布状态消息数量都被设置为0的对象。代码清单1展示了程序是如何执行用户账号创建操作的。

def create_user(conn, login, name):
    llogin = login.lower()
    lock = acquire_lock_with_timeout(conn, 'user:' + llogin, 1) #A
    if not lock:                            #B
        return None                         #B

    if conn.hget('users:', llogin):         #C
        release_lock(conn, 'user:' + llogin, lock)  #C
        return None                         #C

    id = conn.incr('user:id:')              #D
    pipeline = conn.pipeline(True)
    pipeline.hset('users:', llogin, id)     #E
    pipeline.hmset('user:%s'%id, {          #F
        'login': login,                     #F
        'id': id,                           #F
        'name': name,                       #F
        'followers': 0,                     #F
        'following': 0,                     #F
        'posts': 0,                         #F
        'signup': time.time(),              #F
    })
    pipeline.execute()
    release_lock(conn, 'user:' + llogin, lock)  #G
    return id                               #H

#A Try to acquire the lock for the lowercased version of the login name. This function is defined in chapter 6
#B If we couldn't get the lock, then someone else already has the same login name
#C We also store a HASH of lowercased login names to user ids, so if there is already a login name that maps to an ID, we know and won't give it to a second person
#D Each user is given a unique id, generated by incrementing a counter
#E Add the lowercased login name to the HASH that maps from login names to user ids
#F Add the user information to the user's HASH
#G Release the lock over the login name
#H Return the id of the user

创建新用户的函数除了会对存储用户信息的散列进行初始化之外,还会对用户的用户名进行加锁,这个加锁操作是必须的,它可以防止多个请求(request)在同一时间内使用相同的用户名来创建新用户。在对用户名进行加锁之后,程序会检查这个用户名是否已经被其他用户抢先占用了,如果这个用户名尚未被占用的话,那么程序会为这个用户生成一个独一无二的ID,并将用户名与用户ID进行关联,最后将这个用户信息存储到新创建的散列里面。

敏感的用户信息因为程序会频繁地取出存储用户信息的散列用于渲染模板,或者直接用作API请求的回复,所以程序不能将散列后的密码、邮件地址等敏感信息存储在这个用户信息散列里面。从现在开始,我们会假设这些敏感信息都存储在其他键甚至其他数据库里面。

在了解了创建新用户的方法之后,接下来我们将学习如何为我们的仿Twitter网站创建状态消息。

状态消息

正如之前所说,程序既会将用户的个人信息存储到用户简介(profile)里面,又会将用户所说的话记录到状态消息里面,并且和存储用户个人信息时的方法一样,程序也使用散列结构来存储状态消息。

除了消息本身之外,程序还会在散列里面存储消息发布的时间、消息发布者的ID和用户名(这样在处理一个状态消息对象的时候,程序就不必为了获取发布者的用户名而查找发布者的用户对象了),以及其他一些关于状态消息的附加信息。图2展示了一个状态消息的例子。


[图2 使用散列存储状态消息的例子]

图2展示的就是表示一个基本的状态消息所需的全部东西。代码清单2展示了创建这种状态消息的代码。

def create_status(conn, uid, message, **data):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')   #A
    pipeline.incr('status:id:')             #B
    login, id = pipeline.execute()

    if not login:                           #C
        return None                         #C

    data.update({
        'message': message,                 #D
        'posted': time.time(),              #D
        'id': id,                           #D
        'uid': uid,                         #D
        'login': login,                     #D
    })
    pipeline.hmset('status:%s'%id, data)    #D
    pipeline.hincrby('user:%s'%uid, 'posts')#E
    pipeline.execute()
    return id                               #F

#A Get the user's login name from their user id
#B Create a new id for the status message
#C Verify that we have a proper user account before posting
#D Prepare and set the data for the status message
#E Record the fact that a status message has been posted
#F Return the id of the newly created status message

创建状态消息的函数并没有什么让人感到意外的地方,它首先获取用户的用户名,接着获取一个新的状态消息ID,最后将所有信息组合起来并将它们存储到散列里面。

主页时间线

用户在已登录的情况下访问Twitter时,首先看到的是他们自己的主页时间线,这个时间线是一个列表,它由用户以及用户正在关注的人所发布的状态消息组成。因为主页时间线是用户访问网站时的主要入口,所以这些数据必须尽可能地易于获取。

本节将对主页时间线存储的数据进行介绍,并说明如何快速地获取并展示主页时间线。除此之外,本节还会介绍其他重要的状态消息时间线。

我们希望能够尽快地获取展示一个页面所需的全部数据,因此我们决定使用有序集合来实现主页时间线,并使用有序集合的成员来记录状态消息的ID,而有序集合的分值则用于记录状态消息发布时的时间戳。图3展示了一个使用有序集合实现主页时间线的例子。

[图3 主页时间线将成双成对的状态消息ID和时间戳记录到了有序集合里面,其中时间戳用于对状态消息进行排序,而状态消息ID则用于获取状态消息本身]

因为主页时间线只存储了状态消息的ID而不是状态消息本身,所以负责获取最新发布的状态消息的函数除了要获取状态消息的ID之外,还需要根据所得的ID获取相应的状态消息数据。代码清单3展示了从主页时间线里面获取给定页数的状态消息的代码。

def get_status_messages(conn, uid, timeline='home:', page=1, count=30):#A
    statuses = conn.zrevrange(                                  #B
        '%s%s'%(timeline, uid), (page-1)*count, page*count-1)   #B

    pipeline = conn.pipeline(True)
    for id in statuses:                                         #C
        pipeline.hgetall('status:%s'%id)                        #C

    return filter(None, pipeline.execute())                     #D

#A We will take an optional 'timeline' argument, as well as page size and status message counts
#B Fetch the most recent status ids in the timeline
#C Actually fetch the status messages themselves
#D Filter will remove any 'missing' status messages that had been previously deleted

代码清单所示的函数会根据状态消息的发布时间,按照从新到旧的顺序从用户指定的时间线里面获取状态消息,如果用户没有指定要获取的时间线,那么函数默认会获取用户的主页时间线。

除了主页时间线之外,另一个重要的时间线是用户的个人时间线(profile timeline),这两种时间线之间的区别在于主页时间线可以包含其他人发布的状态消息,而个人时间线只会包含用户自己发布的状态消息。用户的个人时间线会在用户的个人页面上进行展示,个人页面是判断一个用户是否有趣的主要入口。只要在调用get_status_messages()函数的时候,将timeline参数的值设置为profile:,就可以获取给定用户发布的状态消息。

关注者列表和正在关注列表

Twitter这类平台的一个主要作用,就是让用户与其他人分享自己的构思、想法和梦想,在这些网站上关注一个人意味着你对这个人所说的话感兴趣,并期待着对方也会对你进行关注。

本节将介绍程序是如何管理用户的正在关注列表以及关注者列表的,并说明当用户开始关注某人或者停止关注某人的时候,用户的主页时间线将出现怎样的变化。

正如上一节所说,用户的主页时间线和个人时间线都是由有序集合存储的,这些有序集合储着状态消息的ID以及状态消息发布时的时间戳。用户的正在关注列表以及关注者列表同样由有序集合存储,其中有序集合的成员为用户ID,而分值则记录了用户开始关注某人或者被某人关注时的时间戳。图4展示了用户的正在关注列表以及关注者列表的样子。

图4 为了记录哪些人正在关注给定的用户,程序会将用户ID和时间戳组成一对存储到有序集合里面,其中用户ID记录了是谁在关注给定的用户,而时间戳则记录了他们是在什么时候开始关注给定用户的。与此类似,给定用户正在关注的人也是由组成一对的用户ID和时间戳存储在有序集合里面,其中用户ID记录了被关注的人,而时间戳则记录了给定用户关注他们的时间

当用户开始关注或者停止关注另一个用户的时候,程序就需要对这两个用户的正在关注有序集合以及关注者有序集合进行更新,并修改他们在用户信息散列里面记录的关注数量和被关注数量。如果用户执行的是关注操作,那么程序在对以上提到的有序集合和散列进行更新之后,还需要从被关注用户的个人时间线里面,复制一些状态消息ID到执行关注操作的用户的主页时间线里面,从而使得用户在关注另一个用户之后,可以立即看见被关注用户所发布的状态消息。代码清单4展示了实现关注操作的具体代码。

HOME_TIMELINE_SIZE = 1000
def follow_user(conn, uid, other_uid):
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if conn.zscore(fkey1, other_uid):   #B
        return None                     #B

    now = time.time()

    pipeline = conn.pipeline(True)
    pipeline.zadd(fkey1, other_uid, now)    #C
    pipeline.zadd(fkey2, uid, now)          #C
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1, withscores=True)   #E
    following, followers, status_and_score = pipeline.execute()[-3:]

    pipeline.hincrby('user:%s'%uid, 'following', int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', int(followers))  #F
    if status_and_score:
        pipeline.zadd('home:%s'%uid, **dict(status_and_score))  #G
    pipeline.zremrangebyrank('home:%s'%uid, 0, -HOME_TIMELINE_SIZE-1)#G

    pipeline.execute()
    return True                         #H

#A Cache the following and followers key names
#B If the other_uid is already being followed, return
#C Add the uids to the proper following and followers ZSETs
#E Fetch the most recent HOME_TIMELINE_SIZE status messages from the newly followed user's profile timeline
#F Update the known size of the following and followers ZSETs in each user's HASH
#G Update the home timeline of the following user, keeping only the most recent 1000 status messages
#H Return that the user was correctly followed

follow_user()函数的行为和之前描述的一样:它首先将关注者和被关注者双方的用户ID添加到相应的正在关注有序集合以及关注者有序集合里面,然后获取这两个有序集合的大小,并从被关注用户的个人时间线上面获取最新的状态消息ID。当函数取得了所需的数据之后,它就会对用户信息散列里面的正在关注数量以及关注者数量进行更新,并将之前取得的状态消息ID添加到执行关注操作的用户的主页时间线里面。

在关注某个人并阅读他的状态消息一段时间之后,用户可能会想要取消对那个人的关注。实现取消关注操作的方法和实现关注操作的方法正好相反:程序会从正在关注有序集合以及关注者有序集合里面移除关注者和被关注者双方的用户ID,并从执行取消关注操作的用户的主页时间线里面移除被取消关注的人所发布的状态消息,最后对两个用户的正在关注数量以及关注者数量进行更新。代码清单5展示了取消关注操作的实现方法。

def unfollow_user(conn, uid, other_uid):
    fkey1 = 'following:%s'%uid          #A
    fkey2 = 'followers:%s'%other_uid    #A

    if not conn.zscore(fkey1, other_uid):   #B
        return None                         #B

    pipeline = conn.pipeline(True)
    pipeline.zrem(fkey1, other_uid)                 #C
    pipeline.zrem(fkey2, uid)                       #C
    pipeline.zrevrange('profile:%s'%other_uid,      #E
        0, HOME_TIMELINE_SIZE-1)                    #E
    following, followers, statuses = pipeline.execute()[-3:]

    pipeline.hincrby('user:%s'%uid, 'following', -int(following))        #F
    pipeline.hincrby('user:%s'%other_uid, 'followers', -int(followers))  #F
    if statuses:
        pipeline.zrem('home:%s'%uid, *statuses)                 #G

    pipeline.execute()
    return True                         #H

#A Cache the following and followers key names
#B If the other_uid is not being followed, return
#C Remove the uids the proper following and followers ZSETs
#E Fetch the most recent HOME_TIMELINE_SIZE status messages from the user that we stopped following
#F Update the known size of the following and followers ZSETs in each user's HASH
#G Update the home timeline, removing any status messages from the previously followed user
#H Return that the unfollow executed successfully

unfollow_user()函数会找到执行取消关注操作的用户以及被取消关注的用户,对他们的正在关注有序集合以及关注者有序集合进行更新,并修改他们的正在关注数量以及关注者数量,最后从执行取消关注操作的用户的主页时间线里面移除被取消关注的用户所发布的状态消息。至此,我们已经成功地实现了关注用户和取消关注用户这两个重要的操作。

状态消息的发布与删除

在类似Twitter这样的网站上面,用户可以执行的一个最基本的操作就是发布状态消息。人们通过发布状态消息来与其他人分享自己的想法,并通过阅读其他人发布的状态消息来了解对方的所见所闻。前面的8.1.2节中展示了如何创建一条状态消息,也展示了状态消息包含的各项数据,但它既没有介绍怎样将状态消息添加到用户的个人时间线里面,也没有介绍怎样将状态消息添加到用户的每个关注者的主页时间线里面。

本节将对状态消息创建之后发生的事情进行介绍,说明一条新的状态消息是如何被添加到每个关注者的主页时间线里面的。除此之外,本节还会介绍删除已发布的状态消息的方法。

本文前面已经介绍了程序是如何创建新的状态消息的,而在此之后,程序要做的就是想办法把新状态消息的ID添加到每个关注者的主页时间线里面。具体的添加方式会根据消息发布人拥有的关注者数量的多少而有所不同。如果用户的关注者数量相对比较少(比如说,不超过1000人),那么程序可以立即更新每个关注者的主页时间线。但是,如果用户的关注者数量非常庞大(比如说,100万人,甚至像Twitter上面的某些人那样,有2500万关注者),那么尝试直接执行添加操作将导致发布消息的用户需要长时间地进行等待,超出合理的等待时间。

为了让发布操作可以尽快地返回,程序需要做两件事情。首先,在发布状态消息的时候,程序会将状态消息的ID添加到前1000个关注者的主页时间线里面。根据Twitter的一项统计表明,关注者数量在1000人以上的用户只有10万~25万,而这10万~25万用户只占了活跃用户数量的0.1%,这意味着99.9%的消息发布人在这一阶段就可以完成自己的发布操作,而剩下的0.1%则需要接着执行下一个步骤。

其次,对于那些关注者数量超过1000人的用户来说,程序会使用类似于使用Redis构建任务队列节中介绍的系统来开始一项延迟任务。代码清单6展示了程序是如何将状态更新推送给各个关注者的。

def post_status(conn, uid, message, **data):
    id = create_status(conn, uid, message, **data)  #A
    if not id:              #B
        return None         #B

    posted = conn.hget('status:%s'%id, 'posted')    #C
    if not posted:                                  #D
        return None                                 #D

    post = {str(id): float(posted)}
    conn.zadd('profile:%s'%uid, **post)             #E

    syndicate_status(conn, uid, post)       #F
    return id

#A Create a status message using the earlier function
#B If the creation failed, return
#C Get the time that the message was posted
#D If the post wasn't found, return
#E Add the status message to the user's profile timeline
#F Actually push the status message out to the followers of the user

注意,post_status()函数将状态更新操作分成了两个部分来执行。第一个部分调用create_status()函数创建状态消息,并将这条状态消息添加到消息发送人的个人时间线里面。而第二个部分则调用syndicate_status()函数将新建的状态消息添加到各个关注者的主页时间线里面,其中syndicate_status()函数的定义如代码清单7所示。

POSTS_PER_PASS = 1000           #A
def syndicate_status(conn, uid, post, start=0):
    followers = conn.zrangebyscore('followers:%s'%uid, start, 'inf',#B
        start=0, num=POSTS_PER_PASS, withscores=True)   #B

    pipeline = conn.pipeline(False)
    for follower, start in followers:                    #E
        pipeline.zadd('home:%s'%follower, **post)        #C
        pipeline.zremrangebyrank(                        #C
            'home:%s'%follower, 0, -HOME_TIMELINE_SIZE-1)#C
    pipeline.execute()

    if len(followers) >= POSTS_PER_PASS:                    #D
        execute_later(conn, 'default', 'syndicate_status',  #D
            [conn, uid, post, start])                       #D

#A Only send to 1000 users per pass
#B Fetch the next group of 1000 followers, starting at the last person to be updated last time
#E Iterating through the followers results will update the 'start' variable, which we can later pass on to subsequent syndicate_status() calls
#C Add the status to the home timelines of all of the fetched followers, and trim the home timelines so they don't get too big
#D If at least 1000 followers had received an update, execute the remaining updates in a task

syndicate_status()函数会将状态消息添加到前1000个关注者的主页时间线里面,并在关注者数量超过1000个的时候,调用6.4节中定义的API来开始一个延迟任务,并把剩余的添加操作交给延迟任务来完成。通过上面展示的post_status()函数和syndicate_status()函数,用户现在可以发布新的状态消息,并将状态消息发送给他的所有关注者了。

除了思考如何发布状态消息之外,我们还要考虑如何去删除一条已经发布的状态消息。

删除一条状态消息的方法实际上非常简单,因为get_status_messages()函数在返回那些被取出的状态消息之前,会先使用Python的filter()函数过滤掉所有已经被删除了的状态消息,所以在删除一条状态消息的时候,程序只需要删除存储了那条状态消息的散列,并对消息发送者的已发送状态消息数量进行更新就可以了。代码清单8展示了用于删除已发布的状态消息的函数。

def delete_status(conn, uid, status_id):
    key = 'status:%s'%status_id
    lock = acquire_lock_with_timeout(conn, key, 1)  #A
    if not lock:                #B
        return None             #B

    if conn.hget(key, 'uid') != str(uid):   #C
        release_lock(conn, key, lock)       #C
        return None                         #C

    pipeline = conn.pipeline(True)
    pipeline.delete(key)                            #D
    pipeline.zrem('profile:%s'%uid, status_id)      #E
    pipeline.zrem('home:%s'%uid, status_id)         #F
    pipeline.hincrby('user:%s'%uid, 'posts', -1)    #G
    pipeline.execute()

    release_lock(conn, key, lock)
    return True

#A Acquire a lock around the status object to ensure that no one else is trying to delete it when we are
#B If we didn't get the lock, return
#C If the user doesn't match the user stored in the status message, return
#D Delete the status message
#E Remove the status message id from the user's profile timeline
#F Remove the status message id from the user's home timeline
#G Reduce the number of posted messages in the user information HASH

在删除状态消息并对用户已发布状态消息数量进行更新的同时,delete_status()函数还会从用户的主页时间线和个人时间线里面移除被删除的状态消息,虽然这个移除操作在技术上并非必须,但它无须花费多少力气就可以让两条时间线变得更干净一些,这又何乐而不为呢?

从用户的角度来看,能够发布和删除状态消息多多少少就算是完成了仿Twitter社交网站的基本功能了。如果读者想要进一步提高这个网站的用户体验的话,那么可以考虑给网站增加以下特性。

  • 私人用户,关注这些用户需要经过主人的批准。
  • 收藏(注意状态消息的私密性)。
  • 用户之间可以进行私聊。
  • 对消息进行回复将产生一个会话流(conversation flow)。
  • 转发消息。
  • 使用@指名一个用户,或者使用#标记一个话题。
  • 记录用户使用@指名了谁。
  • 针对广告行为和滥用行为的投诉与管理机制。

以上提到的特性可以使我们的仿Twitter网站的功能变得更加丰富,不过这些特性并非在所有情况下都是必需的,读者可以根据自己的需要选择添加哪些特性。除了Twitter提供的特性之外,读者也可以考虑添加一些来自其他社交网站的附加功能。

  • 对状态消息进行“赞”或者“+1”。
  • 根据“重要性”对状态消息进行排序。
  • 在预先设置的一群用户之间进行私聊
  • 对用户进行分组,只有组员能够关注组时间线(group timeline)并在里面发布状态消息。小组可以是公开的、私密的甚至是公告形式的。

参考文档