【转】基础监控-同比告警
0赞基础监控的同比告警主要是针对服务器监控采集的指标,包括负载(load1/load5/load15)、平均CPU使用率、内存使用率、内外网流量、端口数量等,具体采集方法可参考《基础监控-服务器监控》。
一、告警原理
多个指标每分钟1个数据,比较当前分钟的前10分钟的7天平均值,如果幅度超过100%并且绝对值相差达到M则算是一次异常,包括上升/下降异常,如果一个指标持续两次上升异常或者持续两次下降异常(不包括先上升后下降或者先下降后上升情况)则开始告警给机器的对应负责人(运维/开发)。例如当前是10号11:00,则比较的是10:59,10:58...10:50共10分钟的最近7天的平均值,例如10:59则是10号、9、8...4共7天的10:59这个点的数据的平均值。实际情况一般是第二分钟才解析了前一分钟的数据,相当于当前是11点,则解析的是10:59这个点和它之前的10分钟的7天平均值的比较。
二、数据来源
基础监控-服务器监控每分钟会采集一份数据保存到Redis中,保存格式是 reportTime - hash,hash的格式是{ip1:{item1:value, item2:value2...}, ip2:{item1:value, item2:value2...}, ip3...},则每分钟一个redis hash,共7天 7*1440=10080个数据。实际情况保存的时候会多保留10分钟的数据,即7天+10分钟;由于reportTime是根据机器的实际时间来上报(这样画图才能保证是准确的),而某些机器没有NTP服务器或者其他原因导致时间不准,则reportTime则又会多种多样,所以导致的结果是redis的hash会变多一些,当然这并不影响我们的数据获取,因为整个同比告警就是根据画图来比较的,画图采用的是reportTime。采用hash保存到redis则不用每个ip读取一次redis,可以减少N次网络IO,大大提高程序速度。由于redis占据内存较多,大概10G左右,需要调整redis配置文件maxmemory的大小,不然redis会随机删除设置自动过期的key。
三、程序设计
1、DB设计
需要数据来源保存(redis)、异常展示表(mysql)、阈值配置表(mysql)、上次状态(redis)。异常展示表保存所有ip的异常描述信息、异常持续时间,可在页面展示;阈值配置表保存所有ip的阈值配置信息,每个ip的异常比率、各个指标的绝对值差、是否需要监控等;上次状态则用来判断是否需要告警的,持续2次同类异常则告警,使用redis保存即可。
mysql> show tables; +-----------------------------------+ | Tables_in_machineMonitor_overLast | +-----------------------------------+ | currentDisplay | | monitorConf | +-----------------------------------+
2、导入测试数据
测试数据是使用mysql和redis将数据从mysql导入redis中,线上数据是等程序完成后修改"服务器监控"的上报CGI来导入的。测试导入的时候遇到的坑参考《python处理json和redis hash的坑》
def initRedis(client):
if client not in CONF.redisInfo:
raise RedisNotFound("can not found redis %s in config" % client)
try:
pool = redis.ConnectionPool(**CONF.redisInfo[client]) # 线程安全
red = redis.Redis(connection_pool=pool)
red.randomkey() # check
except Exception, e:
raise RedisException("connect redis %s failed: %s" % (client, str(e)))
return red
def initDb(client):
if client not in CONF.dbInfo:
raise MysqlNotFound("can not found mysql db %s in config" % client)
try:
db = opMysql.OP_MYSQL(**CONF.dbInfo[client])
except Exception, e:
raise MysqlException("connect mysql %s failed: %s" % (client, str(e)))
code, errMsg = db.connect()
if 0 != code:
raise MysqlException("connect mysql %s failed: %s" % (client, errMsg))
return db
3、告警接口
调用"告警平台"接口,项目配置后可发送RTX/SMS/Wechat,可在页面查看历史记录,轻松修改配置和临时屏蔽等。调用接口直接使用urllib/urllib2库即可
postData = urllib.urlencode(values) req = urllib2.Request(CONF.amcUrl, postData) response = urllib2.urlopen(req, timeout=CONF.AMC_TIME_OUT) amcDict = json.loads(response.read()) code = int(amcDict["returnCode"]) errMsg = amcDict["errorMsg"]
4、解析来源的数据
将数据转成可识别字典,并做排错处理,将错误数据拒绝掉
5、使用numpy和panda求平均值
将来源数据解析后存入panda.DataFrame中,如果数据不存在则使用numpy.nan代替,使用panda.DataFrame().mean().round(2)求平均值并保留2位数。如果某一分钟的7天数据全部获取不到则拒绝解析当前值,如果是7天数据的部分数据获取不到则剔除该点并在平均值的除数相对减一,使用NAN代替该点则在mean中可以解决这种情况。增加自定义函数比较每个列是否满足幅度上升/下降100%(可配置)并且绝对值差达到M(可配置),是则返回True,否则返回False,判断所有返回值是否均为True或者均为False,是则符合异常场景。
初始化DataFrame
for item in vd:
if value is None or value[item] is None:
vd[item][lastDayKey] = numpy.nan
else:
vd[item][lastDayKey] = value[item]
vf = pandas.DataFrame(vd)
columns.append(vf.mean().round(2))
indexes.append(lastMinKey)
self.ipInfo[ip]["lastData"] = pandas.DataFrame(columns, index=indexes)
panda自定义函数比较并且判断是否需要告警
for item in curValue:
if curValue[item] is None: # error
continue
else:
curValue[item] = round(curValue[item], 2)
def overLastCompute(v2, absSub):
"""
:param v2: float
:param absSub: absolute subtract
:return: high/low/null
"""
v1 = curValue[item]
v2 = round(v2, 2)
if 0 == v2:
if v1 > absSub:
return "HIGH"
if v1 < -absSub:
return "LOW"
return "NULL"
subVal = abs(v1 - v2)
if subVal / v2 > CONF.RATIO and subVal > absSub:
if v1 > v2:
return "HIGH"
return "LOW"
return "NULL"
self.ipInfo[ip]["result"][item] = self.ipInfo[ip]["lastData"][item].apply(overLastCompute, absSub=self.monitorConf[ip][item])
res = self.ipInfo[ip]["result"][item] == "HIGH" # Series
if all(i for i in res):
resErr[item] = CONF.HIGH_ERR
if CONF.HIGH_ERR == self.lastCache[str(ip)][item]:
# will Alert if switch on
pass
else:
res = self.ipInfo[ip]["result"][item] == "LOW"
if all(i for i in res):
resErr[item] = CONF.LOW_ERR
if CONF.LOW_ERR == self.lastCache[str(ip)][item]:
# will Alert if switch on
pass
6、由于IP较多,并且主要逻辑在解析数据和panda的计算上,使用CPU比较多,则需要使用多进程,并且结合线程池将进程跑满,别浪费进程资源。
step = ipNums / multiprocessing.cpu_count()
ipList = list()
i = 0
j = 1
processList = list()
for ip in self.ipInfo:
ipS = str(ip)
if ipS not in self.lastCache:
self.lastCache[ipS] = copy.deepcopy(self.value)
ipList.append(ip)
i += 1
if i == step * j or i == ipNums:
j += 1
def innerRun():
wm = Pool.ThreadPool(CONF.POOL_SIZE)
for myIp in ipList:
kw = dict(ip=myIp, handlerKey=myIp)
wm.addJob(self.handleOne, **kw)
wm.waitForComplete()
ipListNums = len(ipList)
for tmp in xrange(ipListNums):
res = wm.getResult()
if res:
handlerKey, code, handlerRet, errMsg = res
if 0 != code:
continue
self.lastCache[str(handlerKey)] = handlerRet
process = multiprocessing.Process(target=innerRun)
process.start()
processList.append(process)
ipList = list()
for process in processList:
process.join()
四、优化
指标监控v2则是同比告警的升级版,数据量将会大好几倍,目前想到的优化如下
1、使用hbase替代redis
2、将程序做京广容灾,改成分布式运行,横向扩展多套程序并列运行

