【转】基础监控-同比告警
0赞基础监控的同比告警主要是针对服务器监控采集的指标,包括负载(load1/load5/load15)、平均CPU使用率、内存使用率、内外网流量、端口数量等,具体采集方法可参考《基础监控-服务器监控》。
一、告警原理
多个指标每分钟1个数据,比较当前分钟的前10分钟的7天平均值,如果幅度超过100%并且绝对值相差达到M则算是一次异常,包括上升/下降异常,如果一个指标持续两次上升异常或者持续两次下降异常(不包括先上升后下降或者先下降后上升情况)则开始告警给机器的对应负责人(运维/开发)。例如当前是10号11:00,则比较的是10:59,10:58...10:50共10分钟的最近7天的平均值,例如10:59则是10号、9、8...4共7天的10:59这个点的数据的平均值。实际情况一般是第二分钟才解析了前一分钟的数据,相当于当前是11点,则解析的是10:59这个点和它之前的10分钟的7天平均值的比较。
二、数据来源
基础监控-服务器监控每分钟会采集一份数据保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},则每分钟一个redis hash,共7天 7*1440=10080个数据。实际情况保存的时候会多保留10分钟的数据,即7天+10分钟;由于reportTime是根据机器的实际时间来上报(这样画图才能保证是准确的),而某些机器没有NTP服务器或者其他原因导致时间不准,则reportTime则又会多种多样,所以导致的结果是redis的hash会变多一些,当然这并不影响我们的数据获取,因为整个同比告警就是根据画图来比较的,画图采用的是reportTime。采用hash保存到redis则不用每个ip读取一次redis,可以减少N次网络IO,大大提高程序速度。由于redis占据内存较多,大概10G左右,需要调整redis配置文件maxmemory的大小,不然redis会随机删除设置自动过期的key。
三、程序设计
1、DB设计
需要数据来源保存(redis)、异常展示表(mysql)、阈值配置表(mysql)、上次状态(redis)。异常展示表保存所有ip的异常描述信息、异常持续时间,可在页面展示;阈值配置表保存所有ip的阈值配置信息,每个ip的异常比率、各个指标的绝对值差、是否需要监控等;上次状态则用来判断是否需要告警的,持续2次同类异常则告警,使用redis保存即可。
mysql> show tables; +-----------------------------------+ | Tables_in_machineMonitor_overLast | +-----------------------------------+ | currentDisplay | | monitorConf | +-----------------------------------+
2、导入测试数据
测试数据是使用mysql和redis将数据从mysql导入redis中,线上数据是等程序完成后修改"服务器监控"的上报CGI来导入的。测试导入的时候遇到的坑参考《python处理json和redis hash的坑》
def initRedis(client): if client not in CONF.redisInfo: raise RedisNotFound("can not found redis %s in config" % client) try: pool = redis.ConnectionPool(**CONF.redisInfo[client]) # 线程安全 red = redis.Redis(connection_pool=pool) red.randomkey() # check except Exception, e: raise RedisException("connect redis %s failed: %s" % (client, str(e))) return red def initDb(client): if client not in CONF.dbInfo: raise MysqlNotFound("can not found mysql db %s in config" % client) try: db = opMysql.OP_MYSQL(**CONF.dbInfo[client]) except Exception, e: raise MysqlException("connect mysql %s failed: %s" % (client, str(e))) code, errMsg = db.connect() if 0 != code: raise MysqlException("connect mysql %s failed: %s" % (client, errMsg)) return db
3、告警接口
调用"告警平台"接口,项目配置后可发送RTX/SMS/Wechat,可在页面查看历史记录,轻松修改配置和临时屏蔽等。调用接口直接使用urllib/urllib2库即可
postData = urllib.urlencode(values) req = urllib2.Request(CONF.amcUrl, postData) response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT) amcDict = json.loads(response.read()) code = int(amcDict["returnCode"]) errMsg = amcDict["errorMsg"]
4、解析来源的数据
将数据转成可识别字典,并做排错处理,将错误数据拒绝掉
5、使用numpy和panda求平均值
将来源数据解析后存入panda.DataFrame中,如果数据不存在则使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值并保留2位数。如果某一分钟的7天数据全部获取不到则拒绝解析当前值,如果是7天数据的部分数据获取不到则剔除该点并在平均值的除数相对减一,使用NAN代替该点则在mean中可以解决这种情况。增加自定义函数比较每个列是否满足幅度上升/下降100%(可配置)并且绝对值差达到M(可配置),是则返回True,否则返回False,判断所有返回值是否均为True或者均为False,是则符合异常场景。
初始化DataFrame
for item in vd: if value is None or value[item] is None: vd[item][lastDayKey] = numpy.nan else: vd[item][lastDayKey] = value[item] vf = pandas.DataFrame(vd) columns.append(vf.mean().round(2)) indexes.append(lastMinKey) self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)
panda自定义函数比较并且判断是否需要告警
for item in curValue: if curValue[item] is None: # error continue else: curValue[item] = round(curValue[item], 2) def overLastCompute(v2, absSub): """ :param v2: float :param absSub: absolute subtract :return: high/low/null """ v1 = curValue[item] v2 = round(v2, 2) if 0 == v2: if v1 > absSub: return "HIGH" if v1 < -absSub: return "LOW" return "NULL" subVal = abs(v1 - v2) if subVal / v2 > CONF.RATIO and subVal > absSub: if v1 > v2: return "HIGH" return "LOW" return "NULL" self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item]) res = self.ipInfo[ip]["result"][item] == "HIGH" # Series if all(i for i in res): resErr[item] = CONF.HIGH_ERR if CONF.HIGH_ERR == self.lastCache[str(ip)][item]: # will Alert if switch on pass else: res = self.ipInfo[ip]["result"][item] == "LOW" if all(i for i in res): resErr[item] = CONF.LOW_ERR if CONF.LOW_ERR == self.lastCache[str(ip)][item]: # will Alert if switch on pass
6、由于IP较多,并且主要逻辑在解析数据和panda的计算上,使用CPU比较多,则需要使用多进程,并且结合线程池将进程跑满,别浪费进程资源。
step = ipNums / multiprocessing.cpu_count() ipList = list() i = 0 j = 1 processList = list() for ip in self.ipInfo: ipS = str(ip) if ipS not in self.lastCache: self.lastCache[ipS] = copy.deepcopy(self.value) ipList.append(ip) i += 1 if i == step * j or i == ipNums: j += 1 def innerRun(): wm = Pool.ThreadPool(CONF.POOL_SIZE) for myIp in ipList: kw = dict(ip=myIp, handlerKey=myIp) wm.addJob(self.handleOne, **kw) wm.waitForComplete() ipListNums = len(ipList) for tmp in xrange(ipListNums): res = wm.getResult() if res: handlerKey, code, handlerRet, errMsg = res if 0 != code: continue self.lastCache[str(handlerKey)] = handlerRet process = multiprocessing.Process(target=innerRun) process.start() processList.append(process) ipList = list() for process in processList: process.join()
四、优化
指标监控v2则是同比告警的升级版,数据量将会大好几倍,目前想到的优化如下
1、使用hbase替代redis
2、将程序做京广容灾,改成分布式运行,横向扩展多套程序并列运行