2026年运维实战:提升十倍效率的自动化脚本分享

步入2026年,随着云原生架构与AIOps的深度融合,运维工作的复杂度呈指数级上升。尽管各类商业化运维平台层出不穷,但在日常的运维战壕中,轻量、灵活的Shell与Python脚本依然是工程师们最趁手的“瑞士军刀”。将重复性劳动自动化,不仅是对个人时间的解放,更是降低人为配置错误的关键。本文将分享三个在2026年运维实战中高频使用的自动化脚本,涵盖巡检、监控与安全防护。

一、 Shell脚本:多维度系统健康巡检一键生成

每天早晨面对几十甚至上百台服务器的巡检,如果逐台登录查看,不仅耗时且极易遗漏。以下Shell脚本可通过Cron定时任务在每天凌晨自动执行,将CPU、内存、磁盘及僵尸进程等关键指标汇总生成结构化报告,并通过邮件或企业微信发送。


#!/bin/bash
# 2026系统健康巡检脚本

# 定义阈值
CPU_THRESHOLD=80
MEM_THRESHOLD=85
DISK_THRESHOLD=90
REPORT_FILE="/var/log/daily_health_$(date +%Y%m%d).txt"

echo "===== 2026年系统健康巡检报告 =====" > $REPORT_FILE
echo "巡检时间: $(date '+%Y-%m-%d %H:%M:%S')" >> $REPORT_FILE
echo "主机名: $(hostname)" >> $REPORT_FILE
echo "==================================" >> $REPORT_FILE

# 1. CPU负载检查
CPU_LOAD=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}')
CPU_INT=${CPU_LOAD%.*}
if [ "$CPU_INT" -gt "$CPU_THRESHOLD" ]; then
    echo "[警告] CPU使用率: ${CPU_LOAD}% (超过阈值${CPU_THRESHOLD}%)" >> $REPORT_FILE
else
    echo "[正常] CPU使用率: ${CPU_LOAD}%" >> $REPORT_FILE
fi

# 2. 内存使用检查
MEM_USAGE=$(free -m | awk 'NR==2{printf "%.2f", $3/$2*100}')
MEM_INT=${MEM_USAGE%.*}
if [ "$MEM_INT" -gt "$MEM_THRESHOLD" ]; then
    echo "[警告] 内存使用率: ${MEM_USAGE}% (超过阈值${MEM_THRESHOLD}%)" >> $REPORT_FILE
else
    echo "[正常] 内存使用率: ${MEM_USAGE}%" >> $REPORT_FILE
fi

# 3. 磁盘空间检查
df -h | grep -vE '^Filesystem|tmpfs|cdrom' | awk '{ print $5 " " $1 }' | while read output;
do
    usage=$(echo $output | awk '{ print $1}' | cut -d'%' -f1 )
    partition=$(echo $output | awk '{ print $2 }' )
    if [ "$usage" -ge "$DISK_THRESHOLD" ]; then
        echo "[警告] 分区 $partition 磁盘使用率: ${usage}%" >> $REPORT_FILE
    fi
done

# 4. 僵尸进程检查
ZOMBIE_COUNT=$(ps aux | grep 'Z' | grep -v grep | wc -l)
if [ "$ZOMBIE_COUNT" -gt 0 ]; then
    echo "[警告] 发现僵尸进程数量: $ZOMBIE_COUNT" >> $REPORT_FILE
else
    echo "[正常] 未发现僵尸进程" >> $REPORT_FILE
fi

# 可在此处追加发送邮件或Webhook的逻辑
# send_webhook $REPORT_FILE

二、 Python脚本:智能日志异常检测与告警收敛

在微服务时代,日志爆炸是常态。传统的grep加定时任务往往会导致“告警风暴”。2026年的监控更强调“收敛”与“智能提取”。以下Python脚本能够统计错误日志出现频次,当同一类错误在设定时间窗口内频繁出现时,才触发一次告警,有效避免告警疲劳。


#!/usr/bin/env python3
# 2026智能日志巡检与告警收敛脚本

import re
from collections import defaultdict
import requests
import time

LOG_FILE = "/var/log/app/application.log"
WEBHOOK_URL = "https://qyapi.example.com/cgi-bin/webhook/send?key=YOUR_KEY"
TIME_WINDOW = 300  # 5分钟时间窗口
THRESHOLD = 10     # 同类错误超过10次才告警

error_pattern = re.compile(r'\[(ERROR|CRITICAL)\]\s+(.*?)\s+-')
error_counter = defaultdict(int)
last_alert_time = defaultdict(float)

def send_alert(error_msg, count):
    """发送企业微信/钉钉告警"""
    msg = f"⚠️ 2026日志告警收敛\n近5分钟内 [{error_msg}] 出现 {count} 次,请排查!"
    payload = {"msgtype": "text", "text": {"content": msg}}
    try:
        requests.post(WEBHOOK_URL, json=payload, timeout=5)
    except Exception as e:
        print(f"Webhook发送失败: {e}")

def monitor_log():
    # 模拟实时读取日志尾部(生产环境建议使用watchdog或pyinotify)
    with open(LOG_FILE, 'r') as f:
        f.seek(0, 2) # 跳到文件末尾
        while True:
            line = f.readline()
            if not line:
                time.sleep(0.5)
                continue
            
            match = error_pattern.search(line)
            if match:
                error_type = match.group(2).strip()
                current_time = time.time()
                
                # 检查是否在时间窗口外,若在窗口外则重置计数
                if current_time - last_alert_time[error_type] > TIME_WINDOW:
                    error_counter[error_type] = 0
                    last_alert_time[error_type] = current_time
                
                error_counter[error_type] += 1
                
                # 达到阈值触发告警并重置
                if error_counter[error_type] >= THRESHOLD:
                    send_alert(error_type, error_counter[error_type])
                    error_counter[error_type] = 0
                    last_alert_time[error_type] = current_time

if __name__ == "__main__":
    monitor_log()

三、 Python脚本:云原生环境SSL证书到期巡检

2026年,HTTPS已成为所有内外网服务的强制标准。证书过期导致的业务中断依然是运维的“低级却致命”的失误。针对K8s Ingress或大量微服务域名,以下脚本可批量扫描证书有效期,并在到期前14天发出预警。


#!/usr/bin/env python3
# 2026 SSL证书到期巡检脚本

import ssl
import socket
from datetime import datetime, timedelta
import smtplib
from email.message import EmailMessage

DOMAINS = ["api.company.com", "portal.company.com", "k8s-ingress.internal.com"]
ALERT_DAYS = 14

def check_ssl_expiry(domain, port=443):
    context = ssl.create_default_context()
    conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain)
    conn.settimeout(5.0)
    
    try:
        conn.connect((domain, port))
        ssl_info = conn.getpeercert()
        expire_date = datetime.strptime(ssl_info['notAfter'], '%b %d %H:%M:%S %Y %Z')
        remaining_days = (expire_date - datetime.now()).days
        return remaining_days, expire_date
    except Exception as e:
        return -1, str(e)
    finally:
        conn.close()

def send_email(domain, days, expire_date):
    # 邮件通知逻辑省略,可按需接入SMTP
    print(f"告警: {domain} 证书将在 {days} 天后到期(到期日: {expire_date})")

if __name__ == "__main__":
    print("===== 2026 SSL证书巡检 =====")
    for domain in DOMAINS:
        days, expire_info = check_ssl_expiry(domain)
        if days == -1:
            print(f"[异常] {domain} 连接失败: {expire_info}")
        elif days <= ALERT_DAYS:
            send_email(domain, days, expire_info)
        else:
            print(f"[正常] {domain