百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

Python实用案例之实时日志关键词监控系统

ztj100 2025-03-14 22:34 7 浏览 0 评论

实时日志关键词监控系统代码如下:

"""

实时日志关键词监控系统

功能:持续跟踪日志文件,识别预设关键词模式并触发报警

特性:

- 支持多关键词分级报警

- 处理日志轮转(rotation)问题

- 上下文异常捕获

- 频率抑制机制

版本:v2.3

"""

import time

import re

from pathlib import Path

from collections import deque

import hashlib # 用于去重校验

class LogMonitor:

def __init__(self, log_path, patterns, context_lines=5):

"""

初始化日志监视器

:param log_path: 日志文件路径

:param patterns: 监控模式字典

示例:{'ERROR': r'\b(error|exception)\b', 'WARN': r'warning'}

:param context_lines: 异常上下文捕获行数

"""

self.log_path = Path(log_path)

self.patterns = {k: re.compile(v, re.IGNORECASE) for k, v in patterns.items()}

self.context_lines = context_lines

self.position = 0 # 记录已读取的文件位置

self.history = deque(maxlen=1000) # 保存最近日志用于上下文

self.alert_cache = set() # 报警去重缓存

self._validate_path()

def _validate_path(self):

"""路径合法性验证"""

if not self.log_path.exists():

raise FileNotFoundError(f"日志文件 {self.log_path} 不存在")

if not self.log_path.is_file():

raise IsADirectoryError(f"{self.log_path} 是目录而非文件")

def _handle_log_rotation(self):

"""

检测并处理日志轮转情况

原理:当文件inode或大小小于上次记录时,判定为发生轮转

"""

current_inode = self.log_path.stat().st_ino

current_size = self.log_path.stat().st_size

if current_size < self.position or current_inode != self._last_inode:

print(f"检测到日志轮转,重置读取位置 (原inode:{self._last_inode} 新inode:{current_inode})")

self.position = 0

self.history.clear()

self._last_inode = current_inode

def tail_log(self):

"""读取新增日志内容"""

self._handle_log_rotation() # 每次读取前检查轮转

with open(self.log_path, 'r', encoding='utf-8', errors='ignore') as f:

f.seek(self.position)

new_lines = f.readlines()

self.position = f.tell() # 更新读取位置

# 处理Windows换行符差异

if new_lines and '\r' in new_lines[0]:

new_lines = [line.replace('\r', '') for line in new_lines]

return new_lines

def analyze_lines(self, lines):

"""

分析日志行并生成报警

返回结构:{告警级别: [匹配行详情]}

"""

alerts = {}

for line in lines:

line = line.strip()

if not line:

continue

self.history.append(line) # 更新上下文缓存

# 生成当前行指纹用于去重

line_hash = hashlib.md5(line.encode()).hexdigest()

if line_hash in self.alert_cache:

continue

# 多模式匹配

for level, pattern in self.patterns.items():

if pattern.search(line):

context = self._get_context(line)

alerts.setdefault(level, []).append({

'raw': line,

'context': context,

'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')

})

self.alert_cache.add(line_hash)

break # 匹配到高级别后停止检查低级别

return alerts

def _get_context(self, current_line):

"""

获取异常上下文

返回当前行附近的日志作为上下文

"""

context = []

window_size = self.context_lines // 2

# 向前追溯

for line in reversed(self.history):

if line == current_line:

break

context.insert(0, line)

if len(context) >= window_size:

break

# 添加当前行

context.append(f">>> {current_line} <<<")

# 向后查找(由于实时监控,新日志可能还未到达)

return context[:self.context_lines]

def continuous_monitor(self, interval=10, burst_threshold=5):

"""

持续监控入口

:param interval: 检查间隔(秒)

:param burst_threshold: 突发报警阈值(1分钟内同类型报警次数)

"""

burst_counter = {} # 突发计数器 {'ERROR': 5}

while True:

try:

new_lines = self.tail_log()

if new_lines:

alerts = self.analyze_lines(new_lines)

# 处理分级报警

for level, items in alerts.items():

# 突发检测

burst_counter[level] = burst_counter.get(level, 0) + len(items)

if burst_counter[level] > burst_threshold:

self.send_alert(

f"[突发报警] {level}级别报警在1分钟内达到{burst_counter[level]}次",

level='CRITICAL'

)

burst_counter[level] = 0 # 重置计数器

# 发送单个报警

for item in items:

self.send_alert(

f"[{level}] 检测到异常\n"

f"时间: {item['timestamp']}\n"

f"内容: {item['raw']}\n"

f"上下文:\n" + "\n".join(item['context']),

level=level

)

# 清理过期突发计数

for k in list(burst_counter.keys()):

burst_counter[k] = max(0, burst_counter[k] - len(new_lines))

time.sleep(interval)

except KeyboardInterrupt:

print("监控终止")

break

except Exception as e:

self.send_alert(f"监控器自身异常: {str(e)}", level='CRITICAL')

time.sleep(60) # 避免频繁报错

def send_alert(self, message, level='ERROR'):

"""发送报警信息(示例:打印到控制台)"""

# 实际应替换为邮件/钉钉/企业微信等发送逻辑

print(f"\n{'!' * 20} [{level}级别报警] {'!' * 20}\n{message}\n{'!' * 50}\n")

if __name__ == '__main__':

# 示例配置

monitor = LogMonitor(

log_path="/var/log/syslog",

patterns={

'CRITICAL': r'(oom|out of memory|segmentation fault)',

'ERROR': r'(error|exception)',

'WARN': r'(warn|timeout)',

'AUTH': r'(authentication failed|invalid user)'

},

context_lines=7

)

monitor.continuous_monitor(interval=15)

代码解析

  1. 日志轮转处理机制

"""

日志轮转检测原理:

1. 记录文件的inode编号和大小

2. 当检测到当前inode变化 或 文件大小小于上次读取位置时:

- 判定发生日志轮转

- 重置读取位置

- 清空上下文缓存

教学重点:

- 理解inode在文件系统中的作用

- 掌握日志轮转的常见处理方式

- 处理多日志文件场景(如access.log -> access.log.1)

"""

  1. 上下文捕获逻辑

"""

上下文获取策略:

1. 维护固定长度的历史队列(deque)

2. 当发现异常行时:

- 向前追溯获取上文

- 包含当前异常行

- 后续新日志自动作为下文

优势:

- 无需重复读取文件

- 实时更新上下文内容

生产建议:

- 根据日志密度调整上下文行数

- 添加时间范围限制(如最多30秒内的日志)

"""

  1. 报警频率控制

"""

防骚扰机制:

1. MD5去重:相同内容不重复报警

2. 突发检测:1分钟内同类型报警超过阈值时合并通知

3. 计数器衰减:每次检查后减少计数

扩展思路:

- 添加静默期设置(如已报错的服务30分钟内不再提醒)

- 实现报警升级策略(多次相同报警→更高优先级)

"""

  1. 编码处理技巧

"""

with open(..., errors='ignore') as f:

# 处理非UTF8字符问题

日志编码处理:

- 使用errors='ignore'跳过非法字符

- 检测文件编码(使用chardet库)

- 转换到统一编码(如UTF-8)

"""

运维实践指南

  1. 生产环境增强建议

# 在analyze_lines()中添加:

# 1. 业务白名单过滤

if any(ignore in line for ignore in ['expected error', 'test environment']):

continue

# 2. 动态模式加载

def reload_patterns(self):

"""运行时重载匹配规则"""

with open('patterns.yaml') as f:

self.patterns = yaml.safe_load(f)

  1. 性能优化技巧

"""

1. 使用更高效的数据结构:

- 将self.history改为双向链表实现

- 使用Bloom Filter替代MD5去重

2. IO优化:

- 使用inotify机制(需安装pyinotify)替代轮询

- 增大读取缓冲区

"""

  1. 监控器自保护

# 在continuous_monitor()中添加:

# 1. 资源限制

self.process = psutil.Process()

if self.process.memory_percent() > 30:

self.send_alert("监控器内存占用过高!", level='CRITICAL')

# 2. 看门狗机制

with open('/tmp/monitor_heartbeat', 'w') as f:

f.write(time.strftime('%Y-%m-%d %H:%M:%S'))

  1. 日志样例测试

# 生成测试日志

for i in {1..100}; do

echo "[$(date)] This is a test error message $i" >> test.log

sleep 0.1

done

扩展功能建议

  1. 多日志源支持

def add_log_source(self, new_path):

"""动态添加监控日志"""

self.additional_sources.append(LogMonitor(new_path, self.patterns))

  1. 智能模式学习

from sklearn.feature_extraction.text import TfidfVectorizer

def learn_anomaly_patterns(self):

"""基于历史日志训练异常检测模型"""

vectorizer = TfidfVectorizer()

X = vectorizer.fit_transform(self.history)

# 训练异常检测模型...

  1. 与工单系统集成

def create_ticket(self, alert):

"""自动创建运维工单"""

jira.create_issue(

project='OPS',

summary=f"[自动工单] {alert['level']}级别日志报警",

description=alert['context']

)

相关推荐

Whoosh,纯python编写轻量级搜索工具

引言在许多应用程序中,搜索功能是至关重要的。Whoosh是一个纯Python编写的轻量级搜索引擎库,可以帮助我们快速构建搜索功能。无论是在网站、博客还是本地应用程序中,Whoosh都能提供高效的全文搜...

如何用Python实现二分搜索算法(python二分法查找代码)

如何用Python实现二分搜索算法二分搜索(BinarySearch)是一种高效的查找算法,适用于在有序数组中快速定位目标值。其核心思想是通过不断缩小搜索范围,每次将问题规模减半,时间复杂度为(O...

路径扫描 -- dirsearch(路径查找器怎么使用)

外表干净是尊重别人,内心干净是尊重自己,干净,在今天这个时代,应该是一种极高的赞美和珍贵。。。----网易云热评一、软件介绍Dirsearch是一种命令行工具,可以强制获取web服务器中的目录和文件...

78行Python代码帮你复现微信撤回消息!

来源:悟空智能科技本文约700字,建议阅读5分钟。本文基于python的微信开源库itchat,教你如何收集私聊撤回的信息。...

从零开始学习 Python!2《进阶知识》 Python进阶之路

欢迎来到Python学习的进阶篇章!如果你说已经掌握了基础语法,那么这篇就是你开启高手之路的大门。我们将一起探讨面向对象编程...

白帽黑客如何通过dirsearch脚本工具扫描和收集网站敏感文件

一、背景介绍...

Python之txt数据预定替换word预定义定位标记生成word报告(四)

续接Python之txt数据预定替换word预定义定位标记生成word报告(一)https://mp.toutiao.com/profile_v4/graphic/preview?pgc_id=748...

假期苦短,我用Python!这有个自动回复拜年信息的小程序

...

Python——字符串和正则表达式中的反斜杠(&#39;\&#39;)问题详解

在本篇文章里小编给大家整理的是关于Python字符串和正则表达式中的反斜杠('\')问题以及相关知识点,有需要的朋友们可以学习下。在Python普通字符串中在Python中,我们用'\'来转义某些普通...

Python re模块:正则表达式综合指南

Python...

Python中re模块详解(rem python)

在《...

python之re模块(python re模块sub)

re模块一.re模块的介绍1.什么是正则表达式"定义:正则表达式是一种对字符和特殊字符操作的一种逻辑公式,从特定的字符中,用正则表达字符来过滤的逻辑。(也是一种文本模式;)2、正则表达式可以帮助我们...

MySQL、PostgreSQL、SQL Server 数据库导入导出实操全解

在数字化时代,数据是关键资产,数据库的导入导出操作则是连接数据与应用场景的桥梁。以下是常见数据库导入导出的实用方法及代码,包含更多细节和特殊情况处理,助你应对各种实际场景。一、MySQL数据库...

Zabbix监控系统系列之六:监控 mysql

zabbix监控mysql1、监控规划在创建监控项之前要尽量考虑清楚要监控什么,怎么监控,监控数据如何存储,监控数据如何展现,如何处理报警等。要进行监控的系统规划需要对Zabbix很了解,这里只是...

mysql系列之一文详解Navicat工具的使用(二)

本章内容是系列内容的第二部分,主要介绍Navicat工具的使用。若查看第一部分请见:...

取消回复欢迎 发表评论: