James Bryant

【转】基础监控-同比告警

0
阅读(1620)

基础监控的同比告警主要是针对服务器监控采集的指标,包括负载(load1/load5/load15)、平均CPU使用率、内存使用率、内外网流量、端口数量等,具体采集方法可参考《基础监控-服务器监控》。

一、告警原理

  多个指标每分钟1个数据,比较当前分钟的前10分钟的7天平均值,如果幅度超过100%并且绝对值相差达到M则算是一次异常,包括上升/下降异常,如果一个指标持续两次上升异常或者持续两次下降异常(不包括先上升后下降或者先下降后上升情况)则开始告警给机器的对应负责人(运维/开发)。例如当前是10号11:00,则比较的是10:59,10:58...10:50共10分钟的最近7天的平均值,例如10:59则是10号、9、8...4共7天的10:59这个点的数据的平均值。实际情况一般是第二分钟才解析了前一分钟的数据,相当于当前是11点,则解析的是10:59这个点和它之前的10分钟的7天平均值的比较。

二、数据来源

  基础监控-服务器监控每分钟会采集一份数据保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},则每分钟一个redis hash,共7天 7*1440=10080个数据。实际情况保存的时候会多保留10分钟的数据,即7天+10分钟;由于reportTime是根据机器的实际时间来上报(这样画图才能保证是准确的),而某些机器没有NTP服务器或者其他原因导致时间不准,则reportTime则又会多种多样,所以导致的结果是redis的hash会变多一些,当然这并不影响我们的数据获取,因为整个同比告警就是根据画图来比较的,画图采用的是reportTime。采用hash保存到redis则不用每个ip读取一次redis,可以减少N次网络IO,大大提高程序速度。由于redis占据内存较多,大概10G左右,需要调整redis配置文件maxmemory的大小,不然redis会随机删除设置自动过期的key。

三、程序设计

1、DB设计

需要数据来源保存(redis)、异常展示表(mysql)、阈值配置表(mysql)、上次状态(redis)。异常展示表保存所有ip的异常描述信息、异常持续时间,可在页面展示;阈值配置表保存所有ip的阈值配置信息,每个ip的异常比率、各个指标的绝对值差、是否需要监控等;上次状态则用来判断是否需要告警的,持续2次同类异常则告警,使用redis保存即可。

复制代码

mysql> show tables;
+-----------------------------------+
| Tables_in_machineMonitor_overLast |
+-----------------------------------+
| currentDisplay                    |
| monitorConf                       |
+-----------------------------------+

复制代码

2、导入测试数据

测试数据是使用mysql和redis将数据从mysql导入redis中,线上数据是等程序完成后修改"服务器监控"的上报CGI来导入的。测试导入的时候遇到的坑参考《python处理json和redis hash的坑

复制代码

def initRedis(client):
    if client not in CONF.redisInfo:
        raise RedisNotFound("can not found redis %s in config" % client)
    try:
        pool = redis.ConnectionPool(**CONF.redisInfo[client])  # 线程安全
        red = redis.Redis(connection_pool=pool)
        red.randomkey()  # check
    except Exception, e:
        raise RedisException("connect redis %s failed: %s" % (client, str(e)))
    return red


def initDb(client):
    if client not in CONF.dbInfo:
        raise MysqlNotFound("can not found mysql db %s in config" % client)
    try:
        db = opMysql.OP_MYSQL(**CONF.dbInfo[client])
    except Exception, e:
        raise MysqlException("connect mysql %s failed: %s" % (client, str(e)))
    code, errMsg = db.connect()
    if 0 != code:
        raise MysqlException("connect mysql %s failed: %s" % (client, errMsg))
    return db

复制代码

3、告警接口

调用"告警平台"接口,项目配置后可发送RTX/SMS/Wechat,可在页面查看历史记录,轻松修改配置和临时屏蔽等。调用接口直接使用urllib/urllib2库即可

postData = urllib.urlencode(values)
req = urllib2.Request(CONF.amcUrl, postData)
response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT)
amcDict = json.loads(response.read())
code = int(amcDict["returnCode"])
errMsg = amcDict["errorMsg"]

4、解析来源的数据

将数据转成可识别字典,并做排错处理,将错误数据拒绝掉

5、使用numpy和panda求平均值

将来源数据解析后存入panda.DataFrame中,如果数据不存在则使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值并保留2位数。如果某一分钟的7天数据全部获取不到则拒绝解析当前值,如果是7天数据的部分数据获取不到则剔除该点并在平均值的除数相对减一,使用NAN代替该点则在mean中可以解决这种情况。增加自定义函数比较每个列是否满足幅度上升/下降100%(可配置)并且绝对值差达到M(可配置),是则返回True,否则返回False,判断所有返回值是否均为True或者均为False,是则符合异常场景。

初始化DataFrame

复制代码

for item in vd:
    if value is None or value[item] is None:
        vd[item][lastDayKey] = numpy.nan
    else:
        vd[item][lastDayKey] = value[item]           
vf = pandas.DataFrame(vd)
            
columns.append(vf.mean().round(2))  
indexes.append(lastMinKey)   

self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)

复制代码

panda自定义函数比较并且判断是否需要告警

复制代码

for item in curValue:
    if curValue[item] is None:  # error
        continue
    else:
        curValue[item] = round(curValue[item], 2)
    
        def overLastCompute(v2, absSub):
            """
            :param v2: float  
            :param absSub: absolute subtract 
            :return: high/low/null
            """
            v1 = curValue[item]
            v2 = round(v2, 2)
            if 0 == v2:
                if v1 > absSub:
                    return "HIGH"
                if v1 < -absSub:
                    return "LOW"
                return "NULL"
            subVal = abs(v1 - v2)
            if subVal / v2 > CONF.RATIO and subVal > absSub:
                if v1 > v2:
                    return "HIGH"
                return "LOW"
            return "NULL"
        self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item])
        res = self.ipInfo[ip]["result"][item] == "HIGH"  # Series
        if all(i for i in res):
            resErr[item] = CONF.HIGH_ERR
            if CONF.HIGH_ERR == self.lastCache[str(ip)][item]:
                # will  Alert if switch on 
                pass
        else:
            res = self.ipInfo[ip]["result"][item] == "LOW"
            if all(i for i in res):
                resErr[item] = CONF.LOW_ERR
                if CONF.LOW_ERR == self.lastCache[str(ip)][item]:
                    # will  Alert if switch on 
                    pass

复制代码

6、由于IP较多,并且主要逻辑在解析数据和panda的计算上,使用CPU比较多,则需要使用多进程,并且结合线程池将进程跑满,别浪费进程资源。

复制代码

step = ipNums / multiprocessing.cpu_count()
ipList = list()
i = 0
j = 1
processList = list()

for ip in self.ipInfo:
    ipS = str(ip)
    if ipS not in self.lastCache:
        self.lastCache[ipS] = copy.deepcopy(self.value)
    ipList.append(ip)
    i += 1
    if i == step * j or i == ipNums:
        j += 1

        def innerRun():
            wm = Pool.ThreadPool(CONF.POOL_SIZE)
            for myIp in ipList:
                kw = dict(ip=myIp, handlerKey=myIp)
                wm.addJob(self.handleOne, **kw)
            wm.waitForComplete()
            ipListNums = len(ipList)
            for tmp in xrange(ipListNums):
                res = wm.getResult()
                if res:
                    handlerKey, code, handlerRet, errMsg = res
                    if 0 != code:
                        continue
                    self.lastCache[str(handlerKey)] = handlerRet
        process = multiprocessing.Process(target=innerRun)
        process.start()
        processList.append(process)
        ipList = list()

for process in processList:
    process.join()

复制代码

四、优化

指标监控v2则是同比告警的升级版,数据量将会大好几倍,目前想到的优化如下

1、使用hbase替代redis

2、将程序做京广容灾,改成分布式运行,横向扩展多套程序并列运行