使用Redis构建社交网站的流API

Published on 2017 - 06 - 25

在开发社交网站的过程中,我们可能会想要知道更多网站上正在发生的事情——比如网站每个小时会发布多少条新的状态消息,网站上最热门的主题是什么,网站上最经常被@指名的人是谁,诸如此类。要做到这一点,我们既可以专门执行一些调用(call)来收集这些信息,也可以在所有执行操作的函数内部记录这些信息,还有一种方法——也就是本节要介绍的方法——就是构建一些函数来广播(broadcast)简单的事件(event),然后由负责进行数据分析的事件监听器(event listener)来接收并处理这些事件。

在这一节,我们将构建一个与Twitter的流API具有相似的功能的流API后端。

流API跟我们前面为了仿制Twitter而构建的其他部分完全不同,前面几节实现的Twitter典型操作都需要尽快地执行并完成,而流API请求则需要在一段比较长的时间内持续地返回数据。

大多数新型社交网站都允许用户通过API获取信息。Twitter最近几年来的一个优势就在于,通过向第三方合作伙伴提供实时事件(real-time event),合作伙伴可以对数据进行各式各样新颖有趣的分析,而这些分析可能是Twitter自己没有时间或者没有兴趣开发的。

作为构建流API的第一步,让我们先来思考一下,自己到底想要处理和生产什么样的数据。

流API提供的数据

当用户使用我们的社交网站时,他们的一举一动都可以通过网站定义的API函数看到。在前面几节,我们花了大量时间来实现关注用户、取消关注用户、发布消息和删除消息这4个功能,随着我们不断地为社交网站开发新功能,用户的行为还会产生其他不同的事件。而流API的作用就是随着时间的推移,产生一个由事件组成的序列,以此来让整个网络上的客户端和其他服务及时地了解到网站目前正在发生的事情。

在构建流API的过程中需要进行各式各样的决策,这些决策主要和以下3个问题有关。

  • 流API需要对外公开哪些事件?
  • 是否需要进行访问限制?如果需要的话,采取何种方式实现?
  • 流API应该提供哪些过滤选项?

本节将回答这里提到的第一个问题和第三个问题,但暂时不会回答与访问限制措施有关的第二个问题,因为只有在社交网站涉及用户隐私或者系统资源的时候,我们才需要考虑访问限制的问题。

既然我们的社交网站已经实现了发布消息、删除消息、关注用户和取消关注用户这几个动作,那么我们至少应该为这些动作提供一些事件。为了简单起见,目前我们只会创建发布消息事件和删除消息事件,但是以本节创建和分发的结构为基础,为关注用户、取消关注用户、甚至是之后添加的其他动作创建相应的事件应该都不是一件难事。

我们的社交网站提供的过滤选项(filtering option)在特性和功能方面与Twitter为公开流(public stream)提供的API非常相似:用户既可以通过关注过滤器(基于用户进行过滤)、监测过滤器(基于关键字进行过滤)以及位置过滤器来获取过滤后的消息,又可以通过类似Twitter的消防水管(firehose)和样本(sample)这样的流来获取一些随机的消息。

在了解了使用流API可以取得哪些数据之后,接下来就让我们来看看流API是怎样提供这些数据的。

提供数据

在前面的章节中,每当我们展示向Redis发送命令请求的函数时,都会假设某个已经存在的Web服务器会在合适的时候调用这个函数。但是对于流API来说,向客户端提供流式数据所需的步骤比起简单地将函数插入(plug)到已有的Web服务栈(stack)里面要复杂得多。这是因为绝大多数Web服务器在执行操作的时候,都假设程序会一次性地将全部回复返回给请求,然而这种假设并不适用于流API。

每当新诞生的状态消息与过滤器相匹配的时候,流API就会将这条消息返回给客户端。尽管WebScokets和SPDY这样的新技术可以以增量的方式不断地生成数据,甚至进行服务器端的消息推送,但是这些技术的相关协议并未完全制定好,而且很多编程语言的客户端也未能完全地支持这些新技术。幸运的是,只要使用分块(chunked)传输编码,我们就可以使用HTTP服务器生成并发送增量式数据。

本节将构建一个简单的Web服务器,它可以通过分块HTTP回复向客户端返回流式数据。而接下来的一节则会在这个简单Web服务器的基础上,实现针对流式消息数据(streamed message data)的过滤功能。

为了构建流式HTTP Web服务器,我们需要用到Python编程语言的更高级的特性。前面章节展示的代码示例通常只会用到Python的函数,这一节展示的代码示例则会用到Python的类。这是因为Python已经包含了多个服务器类,我们只需要把这些类混合(mix)在一起,就可以实现Web服务器的各种功能,而不必从零开始构建整个Web服务器。如果读者曾经在其他编程语言里面使用过类,那么应该也不会对Python的类感到陌生,因为Python的类和其他编程语言的类并没有什么不同——这些类都旨在对数据进行封装,并提供操作被封装数据的方法。因为构建流式HTTP Web服务器所需的大部分功能都已经在函数库里面实现好了,所以我们只需要把这些功能组合在一起就可以了。

HTTP流服务器

Python提供了一系列套接字服务器函数库,通过混合这些库可以实现各种不同的功能。首先,我们会创建一个以多线程方式处理已到达请求的服务器,这个服务器每次接收到一个请求,都会创建一个新的线程来执行请求处理器(request handler),而请求处理器则会对GET和POST形式的HTTP请求进行一些非常简单的路由操作。代码清单9展示了这个多线程HTTP服务器,以及这个服务器使用的请求处理器。

class StreamingAPIServer(               #A
    SocketServer.ThreadingMixIn,        #B
    BaseHTTPServer.HTTPServer):         #B

    daemon_threads = True               #C

class StreamingAPIRequestHandler(               #D
    BaseHTTPServer.BaseHTTPRequestHandler):     #E

    def do_GET(self):                                       #F
        parse_identifier(self)                              #G
        if self.path != '/statuses/sample.json':            #H
            return self.send_error(404)                     #H

        process_filters(self)                               #I

    def do_POST(self):                                      #J
        parse_identifier(self)                              #K
        if self.path != '/statuses/filter.json':            #L
            return self.send_error(404)                     #L

        process_filters(self)                               #M

#A Create a new class called 'StreamingAPIServer'
#B This new class should have the ability to create new threads with each request, and should be a HTTPServer
#C Tell the internals of the threading server to shut down all client request threads if the main server thread dies
#D Create a new class called 'StreamingAPIRequestHandler'
#E This new class should be able to handle HTTP requests
#F Create a method that is called do_GET(), which will be executed on GET requests performed against this server
#G Call a helper function that handles the fetching of an identifier for the client
#H If the request is not a 'sample' or 'firehose' streaming GET request, return a '404 not found' error
#I Otherwise, call a helper function that actually handles the filtering
#J Create a method that is called do_POST(), which will be executed on POST requests performed against this server
#K Call a helper function that handles the fetching of an identifier for the client
#L If the request is not a user, keyword, or location filter, return a '404 not found' error
#M Otherwise, call a helper function that actually handles the filtering

代码清单9定义的服务器会为每个请求创建一个线程,线程会调用请求处理器对象的do_GET方法或者do_POST方法来分别处理两类主要的流API请求,其中do_GET方法负责处理针对过滤器的访问请求,而do_POST方法则负责处理针对随机消息的访问请求。

代码清单9虽然给出了服务器的定义,但它并没有给出启动服务器所需的代码,而实际运行这个服务器需要用到一些Python魔法,这些魔法可以让我们选择是载入模块并使用模块中定义的类,还是直接运行模块来启动流API服务器。代码清单10展示了用于载入模块并以守护进程形式运行服务器的代码。

if __name__ == '__main__'
    server = StreamingAPIServer(
            ('localhost',8080), StreamingAPIRequestHandler)
    print 'Starting server, use <Ctrl-C> to stop'
    server.serve_forever();

在将流API服务器的代码键入文件里面并运行服务器之前,别忘了我们还未介绍过服务器调用的parse_identifier()函数和process_filters()函数,接下来的两节将分别介绍这两个函数的定义。

标识客户端

代码清单11展示了parse_identifier()函数的定义,这个函数会通过语法分析从请求的查询参数里面提取出一个标识符,以此来获取与客户端有关的标识信息。如果需要把parse_identifier()函数应用于生产环境的话,我们还可以考虑增加一些代码,对客户端发来的这个标识符执行一些验证操作。

def parse_identifier(handler):
    handler.identifier = None       #A
    handler.query = {}              #A
    if '?' in handler.path:         #B
        handler.path, _, query = handler.path.partition('?')    #C
        handler.query = urlparse.parse_qs(query)                #D
        identifier = handler.query.get('identifier') or [None]  #E
        handler.identifier = identifier[0]                      #F

#A Set the identifier and query arguments to be palceholder values
#B If there were query arguments as part of the request, process them
#C Extract the query portion from the path, and update the path
#D Parse the query
#E Fetch the list of query arguments with the name 'identifier'
#F Use the first identifier passed

parse_identifier()函数的定义没有任何令人惊奇的地方:它首先为查询参数和标识符设置了初始值,接着对查询参数进行语法分析,最后将查询参数里面可用的标识符存储起来。

处理HTTP流

我们构建的HTTP服务器目前并不完整,因为它还缺少将过滤后的消息返回给客户端的代码。为了将过滤后的消息一个接一个地发送给客户端,服务器首先需要验证客户端发来的请求是否合法,如果请求没有问题的话,服务器就会向客户端发送通知,告知它,服务器现在将进入HTTP的分块传输模式,这个模式使得服务器可以在接收到消息的时候,将消息一个接一个地发送给客户端。代码清单12展示了执行验证操作并将过滤后的消息发送给客户端的函数。

FILTERS = ('track', 'filter', 'location')                   #A
def process_filters(handler):
    id = handler.identifier
    if not id:                                              #B
        return handler.send_error(401, "identifier missing")#B

    method = handler.path.rsplit('/')[-1].split('.')[0]     #C
    name = None
    args = None
    if method == 'filter':                                  #D
        data = cgi.FieldStorage(                                #E
            fp=handler.rfile,                                   #E
            headers=handler.headers,                            #E
            environ={'REQUEST_METHOD':'POST',                   #E
                     'CONTENT_TYPE':handler.headers['Content-Type'],#E
        })

        for name in data:                               #F
            if name in FILTERS:                         #F
                args = data.getfirst(name).lower().split(',')   #F
                break                                   #F

        if not args:                                            #G
            return handler.send_error(401, "no filter provided")#G
    else:
        args = handler.query                                #M

    handler.send_response(200)                              #H
    handler.send_header('Transfer-Encoding', 'chunked')     #H
    handler.end_headers()

    quit = [False]                                          #N
    for item in filter_content(id, method, name, args, quit):   #I
        try:
            handler.wfile.write('%X\r\n%s\r\n'%(len(item), item))   #J
        except socket.error:                                    #K
            quit[0] = True                                      #K
    if not quit[0]:
        handler.wfile.write('0\r\n\r\n')                        #L

#A Keep a listing of filters that need arguments
#B Return an error if an identifier was not provided by the client
#C Fetch the method, should be one of 'sample' or 'filter'
#D If this is a filtering method, we need to fetch the arguments
#E Parse the POST request to discover the type and arguments to the filter
#F Fetch any of the filters provided by the client request
#G If there were no filters specified, return an error
#M For sample requests, pass the query arguments as the 'args'
#H Finally return a response to the client, informing them that they will be receiving a streaming response
#N Use a Python list as a holder for a pass-by-reference variable, which will allow us to tell the content filter to stop receiving messages
#I Iterate over the results of the filter
#J Send the pre-encoded response to the client using the chunked encoding
#K If sending to the client caused an error, then we need to tell the subscriber to unsubscribe and shut down
#L Send the "end of chunks" message to the client if we haven't already disconnected

尽管process_filters()函数里面有几个比较难懂的地方,但它的基本构思不外乎就是确保服务器已经取得了客户端的标识符,并且成功地获取了请求指定的过滤参数。如果一切顺利的话,服务器会告知客户端,自己将向它发送流回复,并将实际的过滤器传递给生成器,然后由生成器产生符合过滤标准的消息序列。

以上展示的就是HTTP流服务器的构建方法。在接下来的一节中,我们将构建一些过滤器来对系统中的消息进行过滤。

对流消息进行过滤

前面一节构建了一个服务器来处理消息流,而这一节要做的则是为服务器添加消息过滤功能,使得客户端可以只接收自己感兴趣的消息。尽管我们构建的社交网站一时半会儿可能还不会有什么人气,但是在Twitter、Facebook甚至Google+这些热门的社交网站上面,每秒都会有数万甚至数十万的事件发生。对于我们自己和第三方合作伙伴来说,一个不漏地发送这些信息将带来高昂的带宽费用,因此让服务器只发送客户端想要的消息就变得相当重要了。

接下来我们将编写一些函数和类,这些函数和类会在消息以流的形式被发送至客户端之前,对消息进行过滤。这个带有消息过滤功能的流服务器除了可以像Twitter的firehose流一样,让客户端访问所有消息之外,还可以让客户端获取随机选取的一部分消息,或者只获取与特定用户、特定关键字或者特定位置有关的消息。

我们将使用Redis的PUBLISH命令和SUBSCRIBE命令来实现流服务器的其中一部分功能:当用户发布一条消息的时候,程序会将这条消息通过PUBLISH发送给某个频道,而各个过滤器则通过SUBSCRIBE来订阅并接收那个频道的消息,并在发现与过滤器相匹配的消息时,将消息回传(yield back)给Web服务器,然后由服务器将这些消息发送给客户端。

对状态消息的发布操作与删除操作进行更新

代码清单13展示了更新之后的create_status()函数,函数中新添加的那一行代码负责将消息发送给过滤器进行过滤。

def create_status(conn, uid, message, **data):
    pipeline = conn.pipeline(True)
    pipeline.hget('user:%s'%uid, 'login')
    pipeline.incr('status:id:')
    login, id = pipeline.execute()

    if not login:
        return None

    data.update({
        'message': message,
        'posted': time.time(),
        'id': id,
        'uid': uid,
        'login': login,
    })
    pipeline.hmset('status:%s'%id, data)
    pipeline.hincrby('user:%s'%uid, 'posts')
    pipeline.publish('streaming:status:', json.dumps(data)) #A
    pipeline.execute()
    return id

#A The added line to send a message to streaming filters

消息发送函数只需要添加一行代码就可以支持流过滤功能,接下来的代码清单14展示了更新之后的消息删除函数。

def delete_status(conn, uid, status_id):
    key = 'status:%s'%status_id
    lock = acquire_lock_with_timeout(conn, key, 1)
    if not lock:
        return None

    if conn.hget(key, 'uid') != str(uid):
        release_lock(conn, key, lock)
        return None

    pipeline = conn.pipeline(True)
    status = conn.hgetall(key)                                  #A
    status['deleted'] = True                                    #B
    pipeline.publish('streaming:status:', json.dumps(status))   #C
    pipeline.delete(key)
    pipeline.zrem('profile:%s'%uid, status_id)
    pipeline.zrem('home:%s'%uid, status_id)
    pipeline.hincrby('user:%s'%uid, 'posts', -1)
    pipeline.execute()

    release_lock(conn, key, lock)
    return True

#A Fetch the status message so that streaming filters can perform the same filters to determine whether the deletion should be passed to the client
#B Mark the status message as deleted
#C Publish the deleted status message to the stream

初看上去,读者可能会感到奇怪,delete_status()为什么要将被删除的状态消息完整地发送给过滤频道呢?从概念上来讲,程序在删除一条状态消息的时候,需要向所有曾经发布过这条消息的客户端发送“此消息已被删除”的信息,而对被删除消息执行发布该消息时使用的过滤器正好可以做到这一点,这就是程序将被删除的状态消息标记为“已被删除”,然后将它重新发送到过滤频道里面的原因。这种做法使得程序无需为每个客户端记录所有已发送消息的状态ID,从而简化了服务器的设计并降低了内存占用。

接收并过滤流消息

现在,每当服务器发布一条消息或者删除一条消息,它都会将消息发送至指定的频道,而程序只需要订阅那个频道,就可以开始接收并过滤消息了。代码清单15展示了服务器用于接收并过滤消息的代码:为了订阅指定频道的消息,程序需要创建一个特殊的pubsub对象;在订阅了频道之后,程序将对接收到的消息进行过滤,并根据消息是被发布还是被删除来判断应该生成两种不同消息中的哪一种。

@redis_connection('social-network')                         #A
def filter_content(conn, id, method, name, args, quit):
    match = create_filters(id, method, name, args)          #B

    pubsub = conn.pubsub()                      #C
    pubsub.subscribe(['streaming:status:'])     #C

    for item in pubsub.listen():                #D
        message = item['data']                  #E
        decoded = json.loads(message)           #E

        if match(decoded):                      #F
            if decoded.get('deleted'):                      #G
                yield json.dumps({                          #G
                    'id': decoded['id'], 'deleted': True})  #G
            else:
                yield message                   #H

        if quit[0]:                             #I
            break                               #I

    pubsub.reset()                              #J

#A Use our automatic connection decorator from chapter 5
#B Create the filter that will determine whether a message should be sent to the client
#C Prepare the subscription
#D Receive messages from the subscription
#E Get the status message information from the subscription structure
#F Check if the status message matched the filter
#G For deleted messages, send a special 'deleted' placeholder for the message
#H For matched status messages that are not deleted, send the message itself
#I If the web server no longer has a connection to the client, stop filtering messages
#J Reset the Redis connection to ensure that the Redis server clears its outgoing buffers if this wasn't fast enough

正如之前所说,filter_content()函数需要通过订阅Redis中的一个频道来接收状态消息的发布通知或者删除通知。除此之外,它还需要处理断线的流客户端,并正确地对连接进行清理以防止Redis存储了太多待发送数据。

Redis服务器提供了client-output-buffer-limit pubsub选项,它可以设置服务器在处理订阅操作时为每个客户端分配的最大输出缓冲区大小。为了保证Redis服务器在高负载下仍然能够正常运作,我们可能会将这个选项的值设定为低于默认的32MB,至于这个值实际要设置为多少,则取决于服务器要处理的客户端数量以及服务器的数据库里面存储了多少数据。

过滤消息

到目前为止,我们已经完成了除过滤器之外,实现状态消息过滤功能所需的其他所有程序,现在唯一要做的就是真正地实现过滤器。为了实现消息过滤功能,我们在前面已经做了非常多的准备工作,但过滤器本身的实现却并不复杂。为了创建过滤器,我们首先需要定义代码清单16展示的create_filters()函数,这个函数会根据用户的需要创建相应的过滤器。目前的create_filters()函数将假设客户端总是会发送合法的参数,如果读者需要把这个函数应用到生产环境的话,那么可以根据自己的需要,增加相应的正确性校验以及身份验证功能。

def create_filters(id, method, name, args):
    if method == 'sample':                      #A
        return SampleFilter(id, args)           #A
    elif name == 'track':                       #B
        return TrackFilter(args)                #B
    elif name == 'follow':                      #B
        return FollowFilter(args)               #B
    elif name == 'location':                    #B
        return LocationFilter(args)             #B
    raise Exception("Unknown filter")           #C

#A For the 'sample' method, we don't need to worry about names, just the arguments
#B For the 'filter' method, we actually worry about which of the filters we want to apply, so return the specific filters for them
#C If no filter matches, then raise an exception

很明显,我们需要分别实现create_filters()函数中提到的各个不同的过滤器。首先需要实现的是随机取样过滤器,这个过滤器会实现类似Twitter风格的firehose(消防水管)、 gardenhose(橡胶软管)和spritzer(汽水)访问等级(access level),代码清单17展示了这个过滤器的实现代码。

def SampleFilter(id, args):                             #A
    percent = int(args.get('percent', ['10'])[0], 10)   #B
    ids = range(100)                                    #C
    shuffler = random.Random(id)                        #C
    shuffler.shuffle(ids)                               #C
    keep = set(ids[:max(percent, 1)])                   #D

    def check(status):                                  #E
        return (status['id'] % 100) in keep             #F
    return check

#A We are defining a filter class called "SampleFilter", which are created by passing 'id' and 'args' parameters
#B The 'args' parameter is actually a dictionary, based on the parameters passed as part of the GET request
#C We use the 'id' parameter to randomly choose a subset of ids, the count of which is determined by the 'percent' argument passed
#D We will use a Python set to allow us to quickly determine whether a status message matches our criteria
#E If we create a specially named method called '__call__' on an instance, it will be called if the instance is used like a function
#F To filter status messages, we fetch the status id, find its value modulo 100, and return whether it is in the status ids that we want to accept

SampleFilter函数使用了闭包来将数据和行为封装在一起。除此之外,SampleFilter函数还做了一件有趣的事情——它使用用户提供的id参数来做随机数字生成器的种子(seed),以此来决定被过滤器选中的状态消息的ID,这使得随机取样过滤器能够接收到状态消息的deleted通知,即使在客户端曾经断过线的情况下,也是如此(但是客户端必须在删除通知到达之前重新连接服务器)。此外,程序使用了Python的集合而不是列表来判断ID在取模100之后是否位于过滤器接受的范围之内,这是因为Python集合查找一个元素是否存在的时间复杂度为O(1),而Python列表执行相同操作的复杂度为O(n)。

接下来要实现的是track过滤器,这个过滤器允许用户追踪状态消息中的单词(word)或者短语(phrase)。和代码清单17展示的随机取样过滤器一样,track过滤器也会使用闭包来将数据和过滤器功能封装在一起。代码清单18展示了track过滤器的定义。

def TrackFilter(list_of_strings):
    groups = []                                 #A
    for group in list_of_strings:               #A
        group = set(group.lower().split())      #A
        if group:
            groups.append(group)                #B

    def check(status):
        message_words = set(status['message'].lower().split())  #C
        for group in groups:                                #D
            if len(group & message_words) == len(group):    #E
                return True                                 #E
        return False
    return check

#A The filter should have been provided with a list of word groups, and the filter matches if a message has all of the words in any of the groups
#B We will only keep groups that have at least 1 word
#C We are going to split words in the message on whitespace
#D Then we are going to iterate over all of the groups
#E If all of the words in any of the groups match, we will accept the message with this filter

监测过滤器唯一要注意的地方是:消息需要与某个词组的所有单词而不是部分单词相匹配,才能被过滤器所接受。此外,监测过滤器也用到了Python集合,它们和Redis的集合一样,都提供了计算交集的能力。

接着来看follow过滤器,这个过滤器会匹配由给定用户群中某个用户所发送的状态消息,以及那些提及了用户群中某个用户的消息。代码清单89展示了follow过滤器的实现。

def FollowFilter(names):
    nset = set()                                    #A
    for name in names:                              #B
        nset.add('@' + name.lower().lstrip('@'))    #B

    def check(status):
        message_words = set(status['message'].lower().split())  #C
        message_words.add('@' + status['login'].lower())        #C

        return message_words & nset                             #D
    return check

#A We are going to try to match login names against posters and messages
#B Make all of the names consistently stored as '@username'
#C Construct a set of words from the message and the poster's name
#D Consider the message a match if any of the usernames provided match any of the whitespace-separated words in the message

和之前展示的其他过滤器一样,关注过滤器也使用了Python集合以便快速地判断消息发布者的名字是否存在于用户名集合当中,又或者用户名集合中的某个名字是否出现在了状态消息里面。

我们最后要实现的是位置过滤器。和之前展示过的其他过滤器不同,前面的内容并没有明确地说明怎样在状态消息里面添加位置信息,但是,因为create_status()函数和post_status()函数都接受额外的可选关键字参数,所以即使不修改这两个函数,我们也可以向它们提供包括位置信息在内的附加信息。代码清单20给出了位置过滤器的定义。

def LocationFilter(list_of_boxes):
    boxes = []                                                  #A
    for start in xrange(0, len(list_of_boxes)-3, 4):            #A
        boxes.append(map(float, list_of_boxes[start:start+4]))  #A

    def check(self, status):
        location = status.get('location')           #B
        if not location:                            #C
            return False                            #C

        lat, lon = map(float, location.split(','))  #D
        for box in self.boxes:                      #E
            if (box[1] <= lat <= box[3] and         #F
                box[0] <= lon <= box[2]):           #F
                return True                         #F
        return False
    return check

#A We are going to create a set of boxes that define the regions that should return messages
#B Try to fetch 'location' data from a status message
#C If the message has no location information, then it can't be inside the boxes
#D Otherwise, extract the latitude and longitude of the location
#E To match one of the boxes, we need to iterate over all boxes
#F If the message status location is within the required latitude and longitude range, then the status message matches the filter

对于位置过滤器来说,读者最好奇的可能就是程序定义匹配区域的方法了:程序假设客户端发送的请求会以逗号分隔多个数字的方式来定义各个区域,其中每个区域的经纬度范围由4个数来定义,这4个数依次为最小经度、最小纬度、最大经度和最大纬度——它们的排列顺序和Twitter的API一样。

大功告成!我们终于成功地构建起了一个基本的社交网站后端,它不仅拥有自己的流API,而且还有一个可运行的Web服务器以及多个过滤器。

参考文档