2026年运维实战:提升十倍效率的自动化脚本分享
2026年运维实战:提升十倍效率的自动化脚本分享
步入2026年,随着云原生架构与AIOps的深度融合,运维工作的复杂度呈指数级上升。尽管各类商业化运维平台层出不穷,但在日常的运维战壕中,轻量、灵活的Shell与Python脚本依然是工程师们最趁手的“瑞士军刀”。将重复性劳动自动化,不仅是对个人时间的解放,更是降低人为配置错误的关键。本文将分享三个在2026年运维实战中高频使用的自动化脚本,涵盖巡检、监控与安全防护。
一、 Shell脚本:多维度系统健康巡检一键生成
每天早晨面对几十甚至上百台服务器的巡检,如果逐台登录查看,不仅耗时且极易遗漏。以下Shell脚本可通过Cron定时任务在每天凌晨自动执行,将CPU、内存、磁盘及僵尸进程等关键指标汇总生成结构化报告,并通过邮件或企业微信发送。
#!/bin/bash
# 2026系统健康巡检脚本
# 定义阈值
CPU_THRESHOLD=80
MEM_THRESHOLD=85
DISK_THRESHOLD=90
REPORT_FILE="/var/log/daily_health_$(date +%Y%m%d).txt"
echo "===== 2026年系统健康巡检报告 =====" > $REPORT_FILE
echo "巡检时间: $(date '+%Y-%m-%d %H:%M:%S')" >> $REPORT_FILE
echo "主机名: $(hostname)" >> $REPORT_FILE
echo "==================================" >> $REPORT_FILE
# 1. CPU负载检查
CPU_LOAD=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}')
CPU_INT=${CPU_LOAD%.*}
if [ "$CPU_INT" -gt "$CPU_THRESHOLD" ]; then
echo "[警告] CPU使用率: ${CPU_LOAD}% (超过阈值${CPU_THRESHOLD}%)" >> $REPORT_FILE
else
echo "[正常] CPU使用率: ${CPU_LOAD}%" >> $REPORT_FILE
fi
# 2. 内存使用检查
MEM_USAGE=$(free -m | awk 'NR==2{printf "%.2f", $3/$2*100}')
MEM_INT=${MEM_USAGE%.*}
if [ "$MEM_INT" -gt "$MEM_THRESHOLD" ]; then
echo "[警告] 内存使用率: ${MEM_USAGE}% (超过阈值${MEM_THRESHOLD}%)" >> $REPORT_FILE
else
echo "[正常] 内存使用率: ${MEM_USAGE}%" >> $REPORT_FILE
fi
# 3. 磁盘空间检查
df -h | grep -vE '^Filesystem|tmpfs|cdrom' | awk '{ print $5 " " $1 }' | while read output;
do
usage=$(echo $output | awk '{ print $1}' | cut -d'%' -f1 )
partition=$(echo $output | awk '{ print $2 }' )
if [ "$usage" -ge "$DISK_THRESHOLD" ]; then
echo "[警告] 分区 $partition 磁盘使用率: ${usage}%" >> $REPORT_FILE
fi
done
# 4. 僵尸进程检查
ZOMBIE_COUNT=$(ps aux | grep 'Z' | grep -v grep | wc -l)
if [ "$ZOMBIE_COUNT" -gt 0 ]; then
echo "[警告] 发现僵尸进程数量: $ZOMBIE_COUNT" >> $REPORT_FILE
else
echo "[正常] 未发现僵尸进程" >> $REPORT_FILE
fi
# 可在此处追加发送邮件或Webhook的逻辑
# send_webhook $REPORT_FILE
二、 Python脚本:智能日志异常检测与告警收敛
在微服务时代,日志爆炸是常态。传统的grep加定时任务往往会导致“告警风暴”。2026年的监控更强调“收敛”与“智能提取”。以下Python脚本能够统计错误日志出现频次,当同一类错误在设定时间窗口内频繁出现时,才触发一次告警,有效避免告警疲劳。
#!/usr/bin/env python3
# 2026智能日志巡检与告警收敛脚本
import re
from collections import defaultdict
import requests
import time
LOG_FILE = "/var/log/app/application.log"
WEBHOOK_URL = "https://qyapi.example.com/cgi-bin/webhook/send?key=YOUR_KEY"
TIME_WINDOW = 300 # 5分钟时间窗口
THRESHOLD = 10 # 同类错误超过10次才告警
error_pattern = re.compile(r'\[(ERROR|CRITICAL)\]\s+(.*?)\s+-')
error_counter = defaultdict(int)
last_alert_time = defaultdict(float)
def send_alert(error_msg, count):
"""发送企业微信/钉钉告警"""
msg = f"⚠️ 2026日志告警收敛\n近5分钟内 [{error_msg}] 出现 {count} 次,请排查!"
payload = {"msgtype": "text", "text": {"content": msg}}
try:
requests.post(WEBHOOK_URL, json=payload, timeout=5)
except Exception as e:
print(f"Webhook发送失败: {e}")
def monitor_log():
# 模拟实时读取日志尾部(生产环境建议使用watchdog或pyinotify)
with open(LOG_FILE, 'r') as f:
f.seek(0, 2) # 跳到文件末尾
while True:
line = f.readline()
if not line:
time.sleep(0.5)
continue
match = error_pattern.search(line)
if match:
error_type = match.group(2).strip()
current_time = time.time()
# 检查是否在时间窗口外,若在窗口外则重置计数
if current_time - last_alert_time[error_type] > TIME_WINDOW:
error_counter[error_type] = 0
last_alert_time[error_type] = current_time
error_counter[error_type] += 1
# 达到阈值触发告警并重置
if error_counter[error_type] >= THRESHOLD:
send_alert(error_type, error_counter[error_type])
error_counter[error_type] = 0
last_alert_time[error_type] = current_time
if __name__ == "__main__":
monitor_log()
三、 Python脚本:云原生环境SSL证书到期巡检
2026年,HTTPS已成为所有内外网服务的强制标准。证书过期导致的业务中断依然是运维的“低级却致命”的失误。针对K8s Ingress或大量微服务域名,以下脚本可批量扫描证书有效期,并在到期前14天发出预警。
#!/usr/bin/env python3
# 2026 SSL证书到期巡检脚本
import ssl
import socket
from datetime import datetime, timedelta
import smtplib
from email.message import EmailMessage
DOMAINS = ["api.company.com", "portal.company.com", "k8s-ingress.internal.com"]
ALERT_DAYS = 14
def check_ssl_expiry(domain, port=443):
context = ssl.create_default_context()
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain)
conn.settimeout(5.0)
try:
conn.connect((domain, port))
ssl_info = conn.getpeercert()
expire_date = datetime.strptime(ssl_info['notAfter'], '%b %d %H:%M:%S %Y %Z')
remaining_days = (expire_date - datetime.now()).days
return remaining_days, expire_date
except Exception as e:
return -1, str(e)
finally:
conn.close()
def send_email(domain, days, expire_date):
# 邮件通知逻辑省略,可按需接入SMTP
print(f"告警: {domain} 证书将在 {days} 天后到期(到期日: {expire_date})")
if __name__ == "__main__":
print("===== 2026 SSL证书巡检 =====")
for domain in DOMAINS:
days, expire_info = check_ssl_expiry(domain)
if days == -1:
print(f"[异常] {domain} 连接失败: {expire_info}")
elif days <= ALERT_DAYS:
send_email(domain, days, expire_info)
else:
print(f"[正常] {domain