使用Redis进行文件分发

Published on 2017 - 06 - 19

在构建分布式软件和分布式系统的时候,我们常常需要在多台机器上复制、分发或者处理数据文件,而现有的工具可以以几种不同的方式来完成这些工作:如果服务器需要持续地分发文件,那么常见的做法是使用NFS或者Samba来载入一个路径(path)或者驱动器;对于内容会逐渐发生变化的文件来说,常见的做法是使用一款名为Rsync的软件来尽量减少两个系统之间需要传输的数据量;在需要将多个文件副本分发到多台机器上面的时候,可以使用BitTorrent协议来将文件部分地(partial)分发到多台机器上面,然后通过让各台机器互相分享自己所拥有的数据来降低服务器的负载。

遗憾的是,以上提到的所有方法都有显著的安装成本以及相对的价值。虽然NFS和Samba都很好用,但是由于这两种技术都对操作系统进行了整合,所以它们在网络连接不完美的时候都会出现明显的问题(有时候甚至在网络连接无恙的情况下,也是如此)。Rsync旨在解决网络不稳定带来的问题,让单个文件或者多个文件可以部分地进行传送和续传(resume),但Rsync在开始传输文件之前必须先下载整个文件,并且负责获取文件的软件也必须与Rsync进行对接,这一点是否可行也是一个需要考虑的地方。尽管BitTorrent是一个了不起的技术,但它也只适用于服务器在发送文件方面遇到了限制或者网络未被充分使用的情况下,并且这种技术也需要软件与BitTorrent客户端进行对接,而我们需要获取文件的系统上可能并没有合适的BitTorrent客户端可用。

除了上面提到的问题之外,上述3种方法还需要设置并维护账号、权限以及服务器。因为我们已经有了一个安装完毕、正在运行并且随时可用的Redis,所以我们还是使用Redis来进行文件分发比较好,这也可以避免使用其他软件时碰到的一些问题:Redis的客户端会妥善地处理连接故障,通过客户端也可以直接获取数据,并且针对数据的处理操作可以立即执行而不必等待整个文件出现。

根据地理位置聚合用户数据

Fake Game公司打算从国家、地区、城市等多个不同纬度,对用户随着时间形成的访问模式进行聚合计算,为此,Fake Game公司需要分析许多体积以GB计算的日志文件,而我们要做的就是实现执行聚合计算所需的回调函数,并使用这些函数来实时地分析日志数据。

Fake Game公司已经存在了大约两年时间,他们每天的用户数量大约有10万人,而每个用户每天大约会产生10个事件,也就是总共有大约73亿行的日志需要分析。如果我们使用的是前面提到的几种文件分发技术的其中一种,那么程序就需要先将日志复制到进行日志分析的各台机器上面,然后才真正地开始进行日志分析。这种做法虽然可行,但是复制日志这一操作潜在地延缓了日志分析操作的进行,并且还会占用每台机器的存储空间,因为直到日志分析完成之后,复制到机器上的日志才会被清除。

虽然我们可以考虑编写一个一次性的MapReduce过程来处理所有日志文件,以此来代替将文件复制到各个机器里面的做法,但MapReduce并不会在各个待处理的任务之间共享内存(每个任务通常就是一个日志行),而手动地进行内存共享只会浪费更多的时间。说得更具体一些,程序如果将IP所属城市的查找表(lookup table)载入Python的内存里面,那么它就可以以每秒大约20万次的速度执行IP所属城市查找操作,这比单个Redis实例执行相同查询时的速度还要快。与此类似,如果我们使用MapReduce来处理日志的话,那么至少需要同时运行好几个Redis实例才能跟得上MapReduce的处理速度。

在理解了NFS和Samba、文件复制、MapReduce这几种常见的技术并不适合用来解决Fake Game公司目前面临的问题之后,接下来我们将看到实际执行查找操作时需要解决的几个问题。

在本地进行数据聚合计算

为了高效地处理数量繁多的日志,程序在对Redis进行更新之前,需要先将聚合数据缓存到本地,以此来减少程序执行所需的通信往返次数。这样做的原因在于:如果程序每天需要处理大约1000万个日志行的话,那么它就需要对Redis进行大约1000万次的写入,而如果程序在本地对每个国家在一天之内产生的日志进行聚合计算的话,因为国家的数量只有大约300个,所以它只需要向Redis写入大约300个值就可以了。这显著地降低了程序与Redis之间的通信往返次数,减少了需要执行的命令数量,并最终缩短了处理日志所需的时间。

如果我们不采取任何本地缓存措施,那么进行10次聚合计算就需要花费大约10天时间来处理所有数据。幸运的是,所有在一天之内产生的国家维度或者地区维度的日志,都可以在完成聚合计算之后再发送给Redis。因为我们的数据取样集合中只有大约350 000个城市,其中10%的城市覆盖了超过90%的玩家,所以我们同样可以在本地缓存所有城市维度的聚合数据。只要把聚合数据缓存到本地,聚合计算的吞吐量就不会被Redis所限制。

假设我们已经由有序集合和散列组成的IP查找表创建了缓存副本,那么剩下要考虑的就是如何对日志进行聚合计算了。首先,让我们来了解一下聚合计算需要处理的日志行——它们包含IP地址、日期、时间以及被执行的操作,就像这样:

为了每天对不同国家的日志行进行聚合计算,程序会把以上格式的日志行作为其中一个参数,传递给执行聚合计算的回调函数,而回调函数则负责对相应的国家计数器执行自增操作,并在处理完所有日志行之后,将聚合计算的结果写入Redis里面。代码清单29展示了执行这一聚合计算的回调函数的源代码。

aggregates = defaultdict(lambda: defaultdict(int))      #A

def daily_country_aggregate(conn, line):
    if line:
        line = line.split()
        ip = line[0]                                    #B
        day = line[1]                                   #B
        country = find_city_by_ip_local(ip)[2]          #C
        aggregates[day][country] += 1                   #D
        return

    for day, aggregate in aggregates.items():           #E
        conn.zadd('daily:country:' + day, **aggregate)  #E
        del aggregates[day]                             #E

#A Prepare the local aggregate dictionary
#B Extract the information from our log lines
#C Find the country from the IP address
#D Increment our local aggregate
#E The day file is done, write our aggregate to Redis

daily_country_aggregate()函数是我们编写和实现的第一个聚合函数,本节接下来要介绍的其他聚合函数与这个函数非常相似,并且同样易于编写。事不宜迟,让我们赶紧进入更有趣的主题——考虑如何通过Redis来发送日志文件。

发送日志文件

为了将日志数据传递给日志处理器,我们需要用到两个不同的日志数据操作组件。第一个组件是一个脚本,这个脚本会根据指定的键名将日志文件存储到Redis里面,并使用之前介绍的群组聊天功能,将存储日志的键名发布到群组里面,然后等待日志分析操作执行完毕时的通知(notification)到来(这样做是为了避免程序使用的内存数量超出机器的限制)。这个通知将会告知脚本,一个与被存储日志文件具有相似名字的数据库键的值已经被设置成了10,也就是程序使用的聚合进程数量。代码清单30展示了这个复制日志文件并在之后对无用数据进行清理的脚本。

def copy_logs_to_redis(conn, path, channel, count=10,
                       limit=2**30, quit_when_done=True):
    bytes_in_redis = 0
    waiting = deque()
    create_chat(conn, 'source', map(str, range(count)), '', channel) #I
    count = str(count)
    for logfile in sorted(os.listdir(path)):               #A
        full_path = os.path.join(path, logfile)

        fsize = os.stat(full_path).st_size
        while bytes_in_redis + fsize > limit:              #B
            cleaned = _clean(conn, channel, waiting, count)#B
            if cleaned:                                    #B
                bytes_in_redis -= cleaned                  #B
            else:                                          #B
                time.sleep(.25)                            #B

        with open(full_path, 'rb') as inp:                 #C
            block = ' '                                    #C
            while block:                                   #C
                block = inp.read(2**17)                    #C
                conn.append(channel+logfile, block)        #C

        send_message(conn, channel, 'source', logfile)     #D

        bytes_in_redis += fsize                            #E
        waiting.append((logfile, fsize))                   #E

    if quit_when_done:                                     #F
        send_message(conn, channel, 'source', ':done')     #F

    while waiting:                                         #G
        cleaned = _clean(conn, channel, waiting, count)    #G
        if cleaned:                                        #G
            bytes_in_redis -= cleaned                      #G
        else:                                              #G
            time.sleep(.25)                                #G

def _clean(conn, channel, waiting, count):                 #H
    if not waiting:                                        #H
        return 0                                           #H
    w0 = waiting[0][0]                                     #H
    if conn.get(channel + w0 + ':done') == count:          #H
        conn.delete(channel + w0, channel + w0 + ':done')  #H
        return waiting.popleft()[1]                        #H
    return 0                                               #H

#I Create the chat that will be used to send messages to clients
#A Iterate over all of the logfiles
#B Clean out finished files if we need more room
#C Upload the file to Redis
#D Notify the listeners that the file is ready
#E Update our local information about Redis' memory use
#F We are out of files, so signal that it is done
#G Clean up the files when we are done
#H How we actually perform the cleanup from Redis

为了将日志复制到Redis里面,copy_logs_to_redis()函数需要执行很多细致的步骤,这些步骤主要用于防止一次将过多数据推入Redis里面,并在日志文件被所有客户端读取完毕之后,正确地执行清理操作。告知日志处理器有新的文件可供处理并不困难,相比起来,针对日志文件的设置、发送和清理操作要繁琐得多。

接收日志文件

处理日志文件的第二个步骤,就是使用一组函数和生成器,从群组里面获取日志的文件名,然后根据日志的名字对存储在Redis里面的日志文件进行处理,并在处理完成之后,对复制进程正在等待的那些键进行更新。除此之外,程序还会使用回调函数来处理每个日志行,并更新聚合数据。代码清单31展示了其中一个这样的函数。

def process_logs_from_redis(conn, id, callback):
    while 1:
        fdata = fetch_pending_messages(conn, id)                    #A

        for ch, mdata in fdata:
            for message in mdata:
                logfile = message['message']

                if logfile == ':done':                                #B
                    return                                            #B
                elif not logfile:
                    continue

                block_reader = readblocks                             #C
                if logfile.endswith('.gz'):                           #C
                    block_reader = readblocks_gz                      #C

                for line in readlines(conn, ch+logfile, block_reader):#D
                    callback(conn, line)                              #E
                callback(conn, None)                                  #F

                conn.incr(ch + logfile + ':done')                     #G

        if not fdata:
            time.sleep(.1)

#A Fetch the list of files
#B No more logfiles
#C Choose a block reader
#D Iterate over the lines
#E Pass each line to the callback
#F Force a flush of our aggregate caches
#G Report that we are finished with the log

因为我们把“从Redis里面读取日志文件”这一费时费力的工作交给了生成日志行序列的辅助函数来执行,所以从日志文件里面提取信息的整个过程还是相当直观的。此外,程序还会通过对日志文件计数器执行自增操作来提醒文件发送者,以免文件发送进程忘了清理已经完成处理的日志文件。

处理日志文件

上一节中曾经提到过,我们将一部分解码(decode)文件的工作交给了那些返回数据生成器的函数来执行,而代码清单32展示的readlines()函数就是其中一个这样的函数,它接受一个Redis连接、一个数据库键和一个块迭代回调函数(block iterating callback)作为参数,对迭代回调函数产生的数据进行遍历,查找数据中的换行符(line break),并将各个日志行返回给函数的调用者。在取得数据块之后,函数会定位到数据块最后一个日志行的结尾处,并对位于结尾之前的所有日志行进行分割,然后一个接一个地向调用者返回这些日志行。当函数将一个数据块的所有日志行都返回给了调用者之后,它会将剩下的那些不完整的行追加到下一个数据块的前面,如果所有数据块都已经遍历完毕,那么函数会直接将数据块的最后一个日志行返回给调用者。虽然Python提供了多种不同的方法用于查找换行符并从中提取出文本行,但是rfind()函数和split()函数完成这一工作的速度是最快的。

def readlines(conn, key, rblocks):
    out = ''
    for block in rblocks(conn, key):
        out += block
        posn = out.rfind('\n')                      #A
        if posn >= 0:                               #B
            for line in out[:posn].split('\n'):     #C
                yield line + '\n'                   #D
            out = out[posn+1:]                      #E
        if not block:                               #F
            yield out
            break

#A Find the rightmost linebreak if any - rfind() returns -1 on failure
#B We found a line break
#C Split on all of the line breaks
#D Yield each line
#E Keep track of the trailing data
#F We are out of data

通过使用两个读取器中的一个来产生数据块,高层次的日志行生成函数readlines()可以专注于寻找日志中的换行符。

readlines()函数使用的两个数据块生成回调函数readblocks()和readblocks_gz()都会从Redis里面读取数据块,其中readblocks()函数会直接向调用者返回被读取的数据块,而readblocks_gz()函数则会自动解压gzip格式的压缩文件。通过有意识地区分日志行的遍历操作和读取操作,我们可以尽量提供有用且可复用的数据读取方法。代码清单33展示了readblocks()生成器的源代码。

def readblocks(conn, key, blocksize=2**17):
    lb = blocksize
    pos = 0
    while lb == blocksize:                                  #A
        block = conn.substr(key, pos, pos + blocksize - 1)  #B
        yield block                                         #C
        lb = len(block)                                     #C
        pos += lb                                           #C
    yield ''

#A Keep going while we got as much as we expected
#B Fetch the block
#C Prepare for the next pass

readblocks()生成器的主要目的在于对数据块读取操作进行抽象,使得我们以后可以使用其他类型的读取器来代替它——如文件系统读取器、memcached读取器、有序集合读取器,或者接下来要展示的,用于处理Redis存储的gzip压缩文件的块读取器。代码清单34展示了readblocks_gz()生成器的源代码。

def readblocks_gz(conn, key):
    inp = ''
    decoder = None
    for block in readblocks(conn, key, 2**17):                  #A
        if not decoder:
            inp += block
            try:
                if inp[:3] != "\x1f\x8b\x08":                #B
                    raise IOError("invalid gzip data")          #B
                i = 10                                          #B
                flag = ord(inp[3])                              #B
                if flag & 4:                                    #B
                    i += 2 + ord(inp[i]) + 256*ord(inp[i+1])    #B
                if flag & 8:                                    #B
                    i = inp.index('\0', i) + 1                  #B
                if flag & 16:                                   #B
                    i = inp.index('\0', i) + 1                  #B
                if flag & 2:                                    #B
                    i += 2                                      #B

                if i > len(inp):                                #C
                    raise IndexError("not enough data")         #C
            except (IndexError, ValueError):                    #C
                continue                                        #C

            else:
                block = inp[i:]                                 #D
                inp = None                                      #D
                decoder = zlib.decompressobj(-zlib.MAX_WBITS)   #D
                if not block:
                    continue

        if not block:                                           #E
            yield decoder.flush()                               #E
            break

        yield decoder.decompress(block)                         #F

#A Read the raw data from Redis
#B Parse the header information so that we can get the compressed data
#C We haven't read the full header yet
#D We found the header, prepare the decompressor
#E We are out of data, yield the last chunk
#F Yield a decompressed block

readblocks_gz()函数的大部分代码都用在了对gzip头信息进行分析上,但这是有必要的。对于像我们这里分析的日志文件来说,使用gzip压缩可以将存储空间减少至原来的1/2至1/5,并且解压缩的速度也是相当快的。尽管很多新型的压缩算法可以提供更好的压缩效果(如bzip2、lzma或xz,还有很多其他的)或者更快的压缩速度(如lz4、lzop、snappy、QuickLZ,还有很多其他的),但是这些算法的应用范围都不及gz算法那么广泛,又或者没有像gz算法那样,提供能够在压缩比率和CPU使用比率之间进行取舍的选项。

参考文档

  • Redis实战 第6章 使用Redis构建应用程序组件