使用Redis实现计数器

Published on 2017 - 06 - 07

知道我们的网站在最近5分钟内获得了10 000次点击,或者数据库在最近5秒内处理了200次写入和600次读取,是非常有用的。通过在一段时间内持续地记录这些信息,我们可以注意到流量的骤增或渐增情况,预测何时需要对服务器进行升级,从而防止系统因为负荷超载而下线。

这一节将分别介绍使用Redis来实现计数器的方法以及使用Redis来进行数据统计的方法,并在最后讨论如何简化示例中的数据统计操作。本节展示的例子都是由实际的用例和需求驱动的。首先,让我们来看看,如何使用Redis来实现时间序列计数器(time series counter),以及如何使用这些计数器来记录和监测应用程序的行为。

将计数器存储到Redis里面

在监控应用程序的同时,持续地收集信息是一件非常重要的事情。那些影响网站响应速度以及网站所能服务的页面数量的代码改动、新的广告营销活动或者是刚刚接触系统的新用户,都有可能会彻底地改变网站载入页面的数量,并因此而影响网站的各项性能指标。但如果我们平时不记录任何指标数据的话,我们就不可能知道指标发生了变化,也就不可能知道网站的性能是在提高还是在下降。

为了收集指标数据并进行监视和分析,我们将构建一个能够持续创建并维护计数器的工具,这个工具创建的每个计数器都有自己的名字(名字里带有网站点击量、销量或者数据库查询字样的计数器都是比较重要的计数器)。这些计数器会以不同的时间精度(如1秒、5秒、1分钟等)存储最新的120个数据样本,用户也可以根据自己的需要,对取样的数量和精度进行修改。

实现计数器首先要考虑的就是如何存储计数器信息,接下来将说明我们是如何将计数器信息存储到Redis里面的。

对计数器进行更新

为了对计数器进行更新,我们需要存储实际的计数器信息。对于每个计数器以及每种精度,如网站点击量计数器和5秒,我们将使用一个散列来存储网站在每个5秒时间片(time slice)之内获得的点击量,其中,散列的每个键都是某个时间片的开始时间,而键对应的值则存储了网站在该时间片之内获得的点击量。图1展示了一个点击量计数器存储的其中一部分数据,这个计数器以每5秒为一个时间片记录着网站的点击量。


[图1 这个散列展示了2012年5月7日早晨7点40分左右,网站在每个5秒时间片之内获得的点击量]

为了能够清理计数器包含的旧数据,我们需要在使用计数器的同时,对被使用的计数器进行记录。为了做到这一点,我们需要一个有序序列(ordered sequence),这个序列不能包含任何重复元素,并且能够让我们一个接一个地遍历序列中包含的所有元素。虽然同时使用列表和集合可以实现这种序列,但同时使用两种数据结构需要编写更多代码,并且会增加客户端和Redis之间的通信往返次数。实际上,实现有序序列更好的办法是使用有序集合,有序集合的各个成员分别由计数器的精度以及计数器的名字组成,而所有成员的分值都为0。因为所有成员的分值都被设置成了0,所以Redis在尝试按分值对有序集合进行排序的时候,就会发现这一点,并改为使用成员名进行排序,这使得一组给定的成员总是具有固定的排列顺序,从而可以方便地对这些成员进行顺序性的扫描。图2展示了一个有序集合,这个有序集合记录了正在使用的计数器。


[图2 这个有序集合展示了一些目前正在使用的计数器]

既然我们已经知道应该使用什么结构来记录并表示计数器了,现在是时候来考虑一下如何使用和更新这些计数器了。代码清单3展示了程序更新计数器的方法:对于每种时间片精度,程序都会将计数器的精度和名字作为引用信息添加到记录已有计数器的有序集合里面,并增加散列计数器在指定时间片内的计数值。

PRECISION = [1, 5, 60, 300, 3600, 18000, 86400]         #A

def update_counter(conn, name, count=1, now=None):
    now = now or time.time()                            #B
    pipe = conn.pipeline()                              #C
    for prec in PRECISION:                              #D
        pnow = int(now / prec) * prec                   #E
        hash = '%s:%s'%(prec, name)                     #F
        pipe.zadd('known:', hash, 0)                    #G
        pipe.hincrby('count:' + hash, pnow, count)      #H
    pipe.execute()
#A The precision of the counters in seconds: 1 second, 5 seconds, 1 minute, 5 minutes, 1 hour, 5 hours, 1 day - adjust as necessary
#B Get the current time to know when is the proper time to add to
#C Create a transactional pipeline so that later cleanup can work correctly
#D Add entries for all precisions that we record
#E Get the start of the current time slice
#F Create the named hash where this data will be stored
#G Record a reference to the counters into a ZSET with the score 0 so we can clean up after ourselves
#H Update the counter for the given name and time precision

更新计数器信息的过程并不复杂,程序只需要为每种时间片精度执行ZADD命令和HINCRBY命令就可以了。与此类似,从指定精度和名字的计数器里面获取技术数据也是一件非常容易的事情,代码清单4展示了用于执行这一操作的代码:程序首先使用HGETALL命令来获取整个散列,接着将命令返回的时间片和计数器的值从原来的字符串格式转换成数字格式,根据时间对数据进行排序,最后返回排序后的数据。

def get_counter(conn, name, precision):
    hash = '%s:%s'%(precision, name)                #A
    data = conn.hgetall('count:' + hash)            #B
    to_return = []                                  #C
    for key, value in data.iteritems():             #C
        to_return.append((int(key), int(value)))    #C
    to_return.sort()                                #D
    return to_return
#A Get the name of the key where we will be storing counter data
#B Fetch the counter data from Redis
#C Convert the counter data into something more expected
#D Sort our data so that older samples are first

get_counter()函数的工作方式就和之前描述的一样:它获取计数器数据并将其转换成整数,然后根据时间先后对转换后的数据进行排序。在弄懂了如何获取计数器存储的数据之后,接下来我们要考虑的是如何防止这些计数器存储过多的数据。

清理旧计数器

经过前面的介绍,我们已经知道了怎样将计数器存储到Redis里面,以及怎样从计数器里面取出数据。但是,如果我们只是一味地对计数器进行更新而不执行任何清理操作的话,那么程序最终将会因为存储了过多的数据而导致内存不足。好在我们事先已经将所有已知的计数器都记录到了一个有序集合里面,所以对计数器进行清理只需要遍历有序集合并删除其中的旧计数器就可以了。

为什么不使用EXPIRE?EXPIRE命令的其中一个限制就是它只能应用于整个键,而不能只对键的某一部分数据进行过期处理。并且因为我们将同一个计数器在不同精度下的所有计数数据都存放到了同一个键里面,所以我们必须定期地对计数器进行清理。如果读者有兴趣的话,也可以试试改变计数器组织数据的方式,使用Redis的过期键功能来代替手工的清理操作。

在处理(process)和清理(clean up)旧计数器的时候,有几件事情是需要我们格外留心的,其中包括以下几件。

  • 任何时候都可能会有新的计数器被添加进来。
  • 同一时间可能会有多个不同的清理操作在执行。
  • 对于一个每天只更新一次的计数器来说,以每分钟一次的频率尝试清理这个计数器只会浪费计算资源。
  • 如果一个计数器不包含任何数据,那么程序就不应该尝试对它进行清理。

我们接下来要构建一个守护进程函数,这个守护进程函数会不断地重复循环直到系统终止这个进程为止。为了尽可能地降低清理操作的执行负载,守护进程会以每分钟一次的频率清理那些每分钟更新一次或者每分钟更新多次的计数器,而对于那些更新频率低于每分钟一次的计数器,守护进程则会根据计数器自身的更新频率来决定对它们进行清理的频率。比如说,对于每秒更新一次或者每5秒更新一次的计数器,守护进程将以每分钟一次的频率清理这些计数器;而对于每5分钟更新一次的计数器,守护进程将以每5分钟一次的频率清理这些计数器。

清理程序通过对记录已知计数器的有序集合执行ZRANGE命令来一个接一个的遍历所有已知的计数器。在对计数器执行清理操作的时候,程序会取出计数器记录的所有计数样本的开始时间,并移除那些开始时间位于指定截止时间之前的样本,清理之后的计数器最多只会保留最新的120个样本。如果一个计数器在执行清理操作之后不再包含任何样本,那么程序将从记录已知计数器的有序集合里面移除这个计数器的引用信息。以上给出的描述大致地说明了计数器清理函数的运作原理,至于程序的一些边界情况最好还是通过代码来说明,要了解该函数的所有细节,请看代码清单5。

def clean_counters(conn):
    pipe = conn.pipeline(True)
    passes = 0                                                  #A
    while not QUIT:                                             #C
        start = time.time()                                     #D
        index = 0                                               #E
        while index < conn.zcard('known:'):                     #E
            hash = conn.zrange('known:', index, index)          #F
            index += 1
            if not hash:
                break
            hash = hash[0]
            prec = int(hash.partition(':')[0])                  #G
            bprec = int(prec // 60) or 1                        #H
            if passes % bprec:                                  #I
                continue

            hkey = 'count:' + hash
            cutoff = time.time() - SAMPLE_COUNT * prec          #J
            samples = map(int, conn.hkeys(hkey))                #K
            samples.sort()                                      #L
            remove = bisect.bisect_right(samples, cutoff)       #L

            if remove:                                          #M
                conn.hdel(hkey, *samples[:remove])              #M
                if remove == len(samples):                      #N
                    try:
                        pipe.watch(hkey)                        #O
                        if not pipe.hlen(hkey):                 #P
                            pipe.multi()                        #P
                            pipe.zrem('known:', hash)           #P
                            pipe.execute()                      #P
                            index -= 1                          #B
                        else:
                            pipe.unwatch()                      #Q
                    except redis.exceptions.WatchError:         #R
                        pass                                    #R

        passes += 1                                             #S
        duration = min(int(time.time() - start) + 1, 60)        #S
        time.sleep(max(60 - duration, 1))                       #T
#A Keep a record of the number of passes so that we can balance cleaning out per-second vs. per-day counters
#C Keep cleaning out counters until we are told to stop
#D Get the start time of the pass to calculate the total duration
#E Incrementally iterate over all known counters
#F Get the next counter to check
#G Get the precision of the counter
#H We are going to be taking a pass every 60 seconds or so, so we are going to try to clean out counters at roughly the rate that they are written to
#I Try the next counter if we aren't supposed to check this one on this pass (for example, we have taken 3 passes, but the counter has a precision of 5 minutes)
#J Find the cutoff time for the earliest sample that we should keep, given the precision and number of samples that we want to keep
#K Fetch the times of the samples, and convert the strings to integers
#L Determine the number of samples that should be deleted
#M Remove the samples as necessary
#N We have a reason to potentially remove the counter from the list of known counters ZSET
#O Watch the counter hash for changes
#P Verify that the counter hash is empty, and if so, remove it from the known counters
#B If we deleted a counter, then we can use the same index next pass
#Q The hash is not empty, keep it in the list of known counters
#R Someone else changed the counter hash by adding counters, which means that it has data, so we will leave the counter in the list of known counters
#S Update our passes and duration variables for the next pass, as an attempt to clean out counters as often as they are seeing updates
#T Sleep the remainder of the 60 seconds, or at least 1 second, just to offer a bit of a rest

正如之前所说,clean_counters()函数会一个接一个地遍历有序集合里面记录的计数器,查找需要进行清理的计数器。程序在每次遍历时都会对计数器进行检查,确保只清理应该清理的计数器。当程序尝试清理一个计数器的时候,它会取出计数器记录的所有数据样本,并判断哪些样本是需要被删除的。如果程序在对一个计数器执行清理操作之后,认为这个计数器已经不再包含任何数据,那么程序会检查这个计数器是否已经被清空,并在确认了它已经被清空之后,将它从记录已知计数器的有序集合中移除。最后,在遍历完所有计数器之后,程序会计算此次遍历耗费的时长,如果为了执行清理操作而预留的一分钟时间没有完全耗尽,那么程序将休眠直到这一分钟过去为止,然后继续进行下次遍历。

在和一个真实的网站打交道的时候,知道页面每天的点击量可以帮助我们判断是否需要对页面进行缓存。但是,如果被频繁访问的页面只需要花费2毫秒来进行渲染,而其他流量只有十分之一的页面却需要花费2秒来进行渲染,那么在缓存被频繁访问的页面之前,我们可以先将注意力放到优化渲染速度较慢的页面上面。在接下来的一节中,我们将不再使用计数器来记录页面的点击量,而是通过记录聚合统计数据来更准确地判断哪些地方需要进行优化。

使用Redis存储统计数据

首先需要说明的一点是,为了将统计数据存储到Redis里面,笔者曾经实现过5种不同的方法,本节介绍的方法综合了这5种方法里面的众多优点,具有非常大的灵活性和可扩展性。

对于一种给定的上下文(context)和类型,程序将使用一个有序集合来记录这个上下文以及这个类型的最小值(min)、最大值(max)、样本数量(count)、值的和(sum)、值的平方之和(sumsq)等信息,并通过这些信息来计算平均值以及标准差。程序将值存储在有序集合里面并非是为了按照分值对成员进行排序,而是为了对存储着统计信息的有序集合和其他有序集合进行并集计算,并通过MIN和MAX这两个聚合函数来筛选相交的元素。图3展示了一个存储统计数据的有序集合示例,它记录了ProfilePage(个人简介页面)上下文的AccessTime(访问时间)统计数据。


[图3 个人简介页面的访问时间统计示例]

注意,因为有序集合会按照分值对成员进行排序,所以有序集合里面排列各个成员的顺序和我们上面介绍这些成员时的顺序并不相同

既然我们已经知道了程序要存储的是什么类型的数据,那么接下来要考虑的就是如何将这些数据写到数据结构里面了。代码清单6展示了负责更新统计数据的代码。和之前介绍过的常见日志程序一样,统计程序在写入数据之前会进行检查,确保被记录的是当前这一个小时的统计数据,并将不属于当前这一个小时的旧数据进行归档。在此之后,程序会构建两个临时有序集合,其中一个用于保存最小值,而另一个则用于保存最大值。然后使用ZUNIONSTORE命令以及它的两个聚合函数MIN和MAX,分别计算两个临时有序集合与记录当前统计数据的有序集合之间的并集结果。通过使用ZUNIONSTORE命令,程序可以快速地更新统计数据,而无需使用WATCH去监视可能会频繁进行更新的存储统计数据的键,因为这个键可能会频繁地进行更新。程序在并集计算完毕之后就会删除那些临时有序集合,并使用ZINCRBY命令对统计数据有序集合里面的count、sum、sumsq这3个成员进行更新。

def update_stats(conn, context, type, value, timeout=5):
    destination = 'stats:%s:%s'%(context, type)                 #A
    start_key = destination + ':start'                          #B
    pipe = conn.pipeline(True)
    end = time.time() + timeout
    while time.time() < end:
        try:
            pipe.watch(start_key)                               #B
            now = datetime.utcnow().timetuple()                 #B
            hour_start = datetime(*now[:4]).isoformat()         #B

            existing = pipe.get(start_key)
            pipe.multi()
            if existing and existing < hour_start:
                pipe.rename(destination, destination + ':last') #B
                pipe.rename(start_key, destination + ':pstart') #B
                pipe.set(start_key, hour_start)                 #B

            tkey1 = str(uuid.uuid4())
            tkey2 = str(uuid.uuid4())
            pipe.zadd(tkey1, 'min', value)                      #C
            pipe.zadd(tkey2, 'max', value)                      #C
            pipe.zunionstore(destination,                       #D
                [destination, tkey1], aggregate='min')          #D
            pipe.zunionstore(destination,                       #D
                [destination, tkey2], aggregate='max')          #D

            pipe.delete(tkey1, tkey2)                           #E
            pipe.zincrby(destination, 'count')                  #F
            pipe.zincrby(destination, 'sum', value)             #F
            pipe.zincrby(destination, 'sumsq', value*value)     #F

            return pipe.execute()[-3:]                          #G
        except redis.exceptions.WatchError:
            continue                                            #H
#A Set up the destination statistics key
#B Handle the current hour/last hour like in common_log()
#C Add the value to the temporary keys
#D Union the temporary keys with the destination stats key with the appropriate min/max aggregate
#E Clean up the temporary keys
#F Update the count, sum, and sum of squares members of the zset
#G Return the base counter info so that the caller can do something interesting if necessary
#H If the hour just turned over and the stats have already been shuffled over, try again

update_status()函数的前半部分代码基本上可以忽略不看,而update_status()函数的后半部分则做了我们前面描述过的事情:程序首先创建两个临时有序集合,然后使用适当的聚合函数,对存储统计数据的有序集合以及两个临时有序集合分别执行ZUNIONSTORE命令;最后,删除临时有序集合,并将并集计算所得的统计数据更新到存储统计数据的有序集合里面。update_status()函数展示了将统计数据存储到有序集合里面的方法,但如果我们想要获取统计数据的话,又应该怎样做呢?

代码清单7展示了程序取出统计数据的方法:程序会从记录统计数据的有序集合里面取出所有被存储的值,并计算出平均值和标准差。其中,平均值可以通过值的和(sum)除以取样数量(count)来计算得出;而标准差的计算则更复杂一些,程序需要多做一些工作才能根据已有的统计信息计算出标准差,但是为了简洁起见,这里不会解释计算标准差时用到的数学知识。

def get_stats(conn, context, type):
    key = 'stats:%s:%s'%(context, type)                                 #A
    data = dict(conn.zrange(key, 0, -1, withscores=True))               #B
    data['average'] = data['sum'] / data['count']                       #C
    numerator = data['sumsq'] - data['sum'] ** 2 / data['count']        #D
    data['stddev'] = (numerator / (data['count'] - 1 or 1)) ** .5       #E
    return data
#A Set up the key that we are fetching our statistics from
#B Fetch our basic statistics and package them as a dictionary
#C Calculate the average
#D Prepare the first part of the calculation of standard deviation
#E Finish our calculation of standard deviation

除了用于计算标准差的代码之外,get_stats()函数并没有什么难懂的地方,如果读者愿意花些时间在维基百科上了解一下什么叫做标准差的话,那么读懂那些计算标准差的代码应该也不是什么难事。尽管有了那么多统计数据,但我们可能还不太清楚自己应该观察哪些数据,而接下来的一节就会来解答这个问题。

简化统计数据的记录与发现

在将统计数据存储到Redis里面之后,接下来我们该做些什么呢?说得更详细一点,在知道了访问每个页面所需的时间之后,我们要怎样才能找到那些生成速度较慢的网页?或者说,当某个页面的生成速度变得比以往要慢的时候,我们如何才能知悉这一情况?简单来说,为了发现以上提到的这些情况,我们需要存储更多信息,而具体的方法将在这一节里面介绍。

要记录页面的访问时长,程序就必须在页面被访问时进行计时。为了做到这一点,我们可以在各个不同的页面设置计时器,并添加代码来记录计时的结果,但更好的办法是直接实现一个能够进行计时并将计时结果存储起来的东西,让它将平均访问速度最慢的页面都记录到一个有序集合里面,并向我们报告哪些页面的载入时间变得比以前更长了。

为了计算和记录访问时长,我们会编写一个Python上下文管理器(context manager),并使用这个上下文管理器来包裹起那些需要计算并记录访问时长的代码。代码清单8展示了用于计算和记录访问时长的上下文管理器:程序首先会取得当前时间,接着执行被包裹的代码,然后计算这些代码的执行时长,并将结果记录到Redis里面;除此之外,程序还会对记录当前上下文最大访问时间的有序集合进行更新。

@contextlib.contextmanager                                              #A
def access_time(conn, context):
    start = time.time()                                                 #B
    yield                                                               #C

    delta = time.time() - start                                         #D
    stats = update_stats(conn, context, 'AccessTime', delta)            #E
    average = stats[1] / stats[0]                                       #F

    pipe = conn.pipeline(True)
    pipe.zadd('slowest:AccessTime', context, average)                   #G
    pipe.zremrangebyrank('slowest:AccessTime', 0, -101)                 #H
    pipe.execute()
#A Make this Python generator into a context manager
#B Record the start time
#C Let the block of code that we are wrapping run
#D Calculate the time that the block took to execute
#E Update the stats for this context
#F Calculate the average
#G Add the average to a ZSET that holds the slowest access times
#H Keep the slowest 100 items in the AccessTime ZSET

因为access_time()上下文管理器里面有一些没办法只用三言两语来解释的概念,所以我们最好还是直接通过使用这个管理器来了解它是如何运作的。接下来的这段代码展示了使用access_time()上下文管理器记录Web页面访问时长的方法,负责处理被记录页面的是一个回调函数:

def process_view(conn, callback):               #A
    with access_time(conn, request.path):       #B
        return callback()                       #C
#A This example web view takes the Redis connection as well as a callback to generate the content
#B This is how you would use the access time context manager to wrap a block of code
#C This is executed when the 'yield' statement is hit from within the context manager

在看过这个例子之后,即使读者没有学习过上下文管理器的创建方法,但是至少也已经知道该如何去使用它了。这个例子使用了访问时间上下文管理器来计算生成一个页面需要花费多长时间,此外,同样的上下文管理器还可以用于计算数据库查询花费的时长,或者用来计算渲染一个模板所需的时长。

参考文档