实时日志关键词监控系统代码如下:
""" 实时日志关键词监控系统 功能:持续跟踪日志文件,识别预设关键词模式并触发报警 特性: - 支持多关键词分级报警 - 处理日志轮转(rotation)问题 - 上下文异常捕获 - 频率抑制机制 版本:v2.3 """ import time import re from pathlib import Path from collections import deque import hashlib # 用于去重校验 class LogMonitor: def __init__(self, log_path, patterns, context_lines=5): """ 初始化日志监视器 :param log_path: 日志文件路径 :param patterns: 监控模式字典 示例:{'ERROR': r'\b(error|exception)\b', 'WARN': r'warning'} :param context_lines: 异常上下文捕获行数 """ self.log_path = Path(log_path) self.patterns = {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()} self.context_lines = context_lines self.position = 0 # 记录已读取的文件位置 self.history = deque(maxlen=1000) # 保存最近日志用于上下文 self.alert_cache = set() # 报警去重缓存 self._validate_path() def _validate_path(self): """路径合法性验证""" if not self.log_path.exists(): raise FileNotFoundError(f"日志文件 {self.log_path} 不存在") if not self.log_path.is_file(): raise IsADirectoryError(f"{self.log_path} 是目录而非文件") def _handle_log_rotation(self): """ 检测并处理日志轮转情况 原理:当文件inode或大小小于上次记录时,判定为发生轮转 """ current_inode = self.log_path.stat().st_ino current_size = self.log_path.stat().st_size if current_size < self.position or current_inode != self._last_inode: print(f"检测到日志轮转,重置读取位置 (原inode:{self._last_inode} 新inode:{current_inode})") self.position = 0 self.history.clear() self._last_inode = current_inode def tail_log(self): """读取新增日志内容""" self._handle_log_rotation() # 每次读取前检查轮转 with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f: f.seek(self.position) new_lines = f.readlines() self.position = f.tell() # 更新读取位置 # 处理Windows换行符差异 if new_lines and '\r' in new_lines[0]: new_lines = [line.replace('\r', '') for line in new_lines] return new_lines def analyze_lines(self, lines): """ 分析日志行并生成报警 返回结构:{告警级别: [匹配行详情]} """ alerts = {} for line in lines: line = line.strip() if not line: continue self.history.append(line) # 更新上下文缓存 # 生成当前行指纹用于去重 line_hash = hashlib.md5(line.encode()).hexdigest() if line_hash in self.alert_cache: continue # 多模式匹配 for level, pattern in self.patterns.items(): if pattern.search(line): context = self._get_context(line) alerts.setdefault(level, []).append({ 'raw': line, 'context': context, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S') }) self.alert_cache.add(line_hash) break # 匹配到高级别后停止检查低级别 return alerts def _get_context(self, current_line): """ 获取异常上下文 返回当前行附近的日志作为上下文 """ context = [] window_size = self.context_lines // 2 # 向前追溯 for line in reversed(self.history): if line == current_line: break context.insert(0, line) if len(context) >= window_size: break # 添加当前行 context.append(f">>> {current_line} <<<") # 向后查找(由于实时监控,新日志可能还未到达) return context[:self.context_lines] def continuous_monitor(self, interval=10, burst_threshold=5): """ 持续监控入口 :param interval: 检查间隔(秒) :param burst_threshold: 突发报警阈值(1分钟内同类型报警次数) """ burst_counter = {} # 突发计数器 {'ERROR': 5} while True: try: new_lines = self.tail_log() if new_lines: alerts = self.analyze_lines(new_lines) # 处理分级报警 for level, items in alerts.items(): # 突发检测 burst_counter[level] = burst_counter.get(level, 0) + len(items) if burst_counter[level] > burst_threshold: self.send_alert( f"[突发报警] {level}级别报警在1分钟内达到{burst_counter[level]}次", level='CRITICAL' ) burst_counter[level] = 0 # 重置计数器 # 发送单个报警 for item in items: self.send_alert( f"[{level}] 检测到异常\n" f"时间: {item['timestamp']}\n" f"内容: {item['raw']}\n" f"上下文:\n" + "\n".join(item['context']), level=level ) # 清理过期突发计数 for k in list(burst_counter.keys()): burst_counter[k] = max(0, burst_counter[k] - len(new_lines)) time.sleep(interval) except KeyboardInterrupt: print("监控终止") break except Exception as e: self.send_alert(f"监控器自身异常: {str(e)}", level='CRITICAL') time.sleep(60) # 避免频繁报错 def send_alert(self, message, level='ERROR'): """发送报警信息(示例:打印到控制台)""" # 实际应替换为邮件/钉钉/企业微信等发送逻辑 print(f"\n{'!' * 20} [{level}级别报警] {'!' * 20}\n{message}\n{'!' * 50}\n") if __name__ == '__main__': # 示例配置 monitor = LogMonitor( log_path="/var/log/syslog", patterns={ 'CRITICAL': r'(oom|out of memory|segmentation fault)', 'ERROR': r'(error|exception)', 'WARN': r'(warn|timeout)', 'AUTH': r'(authentication failed|invalid user)' }, context_lines=7 ) monitor.continuous_monitor(interval=15) |
代码解析
- 日志轮转处理机制
""" 日志轮转检测原理: 1. 记录文件的inode编号和大小 2. 当检测到当前inode变化 或 文件大小小于上次读取位置时: - 判定发生日志轮转 - 重置读取位置 - 清空上下文缓存 教学重点: - 理解inode在文件系统中的作用 - 掌握日志轮转的常见处理方式 - 处理多日志文件场景(如access.log -> access.log.1) """ |
- 上下文捕获逻辑
""" 上下文获取策略: 1. 维护固定长度的历史队列(deque) 2. 当发现异常行时: - 向前追溯获取上文 - 包含当前异常行 - 后续新日志自动作为下文 优势: - 无需重复读取文件 - 实时更新上下文内容 生产建议: - 根据日志密度调整上下文行数 - 添加时间范围限制(如最多30秒内的日志) """ |
- 报警频率控制
""" 防骚扰机制: 1. MD5去重:相同内容不重复报警 2. 突发检测:1分钟内同类型报警超过阈值时合并通知 3. 计数器衰减:每次检查后减少计数 扩展思路: - 添加静默期设置(如已报错的服务30分钟内不再提醒) - 实现报警升级策略(多次相同报警→更高优先级) """ |
- 编码处理技巧
""" with open(..., errors='ignore') as f: # 处理非UTF8字符问题 日志编码处理: - 使用errors='ignore'跳过非法字符 - 检测文件编码(使用chardet库) - 转换到统一编码(如UTF-8) """ |
运维实践指南
- 生产环境增强建议
# 在analyze_lines()中添加: # 1. 业务白名单过滤 if any(ignore in line for ignore in ['expected error', 'test environment']): continue # 2. 动态模式加载 def reload_patterns(self): """运行时重载匹配规则""" with open('patterns.yaml') as f: self.patterns = yaml.safe_load(f) |
- 性能优化技巧
""" 1. 使用更高效的数据结构: - 将self.history改为双向链表实现 - 使用Bloom Filter替代MD5去重 2. IO优化: - 使用inotify机制(需安装pyinotify)替代轮询 - 增大读取缓冲区 """ |
- 监控器自保护
# 在continuous_monitor()中添加: # 1. 资源限制 self.process = psutil.Process() if self.process.memory_percent() > 30: self.send_alert("监控器内存占用过高!", level='CRITICAL') # 2. 看门狗机制 with open('/tmp/monitor_heartbeat', 'w') as f: f.write(time.strftime('%Y-%m-%d %H:%M:%S')) |
- 日志样例测试
# 生成测试日志 for i in {1..100}; do echo "[$(date)] This is a test error message $i" >> test.log sleep 0.1 done |
扩展功能建议
- 多日志源支持
def add_log_source(self, new_path): """动态添加监控日志""" self.additional_sources.append(LogMonitor(new_path, self.patterns)) |
- 智能模式学习
from sklearn.feature_extraction.text import TfidfVectorizer def learn_anomaly_patterns(self): """基于历史日志训练异常检测模型""" vectorizer = TfidfVectorizer() X = vectorizer.fit_transform(self.history) # 训练异常检测模型... |
- 与工单系统集成
def create_ticket(self, alert): """自动创建运维工单""" jira.create_issue( project='OPS', summary=f"[自动工单] {alert['level']}级别日志报警", description=alert['context'] ) |