运维效能提升:实战中的Shell与Python自动化脚本分享

在现代IT运维体系中,面对动辄成百上千的服务器节点与错综复杂的业务架构,传统的“手工敲命令”模式早已无法满足高效、稳定、安全的要求。自动化不仅是运维工程师的“救命稻草”,更是企业降本增效的核心驱动力。

本文将结合日常运维痛点,分享几款在实际生产环境中久经考验的Shell与Python自动化脚本,涵盖主机巡检、日志清理、服务监控与数据备份等高频场景,希望能为各位同行的工具箱增添利器。

一、 Shell脚本:轻量级系统操作利器

Shell脚本凭借其与Linux系统底层的无缝结合,是处理文件操作、系统巡检等任务的首选。

1. 批量主机存活检测与系统信息巡检

在云原生或混合云架构中,节点动态伸缩,手动维护资产清单极易遗漏。此脚本通过读取IP列表,并发探测主机存活状态,并对存活主机提取基础系统信息。


#!/bin/bash
# 描述:批量主机巡检脚本
# 依赖:需配置SSH免密登录,ip_list.txt存放目标IP

IP_LIST="ip_list.txt"
PORT=22
USER="ops_user"
LOG_FILE="host_audit_$(date +%F).log"

echo "========== 开始巡检 $(date) ==========" > $LOG_FILE

while read -r IP; do
    # 1. 探测端口判断存活
    if nc -z -w 2 $IP $PORT &> /dev/null; then
        echo "[INFO] $IP 存活,开始采集信息..." | tee -a $LOG_FILE
        
        # 2. 远程执行命令获取CPU、内存及磁盘信息
        SYS_INFO=$(ssh -o ConnectTimeout=5 $USER@$IP "
            echo \"CPU负载: $(cat /proc/loadavg | awk '{print $1}')\"
            echo \"内存使用率: $(free -m | awk 'NR==2{printf \"%.2f%%\", $3*100/$2}')\"
            echo \"磁盘利用率: $(df -h / | awk 'NR==2{print $5}')\"
        ")
        echo -e "$IP 状态:\n$SYS_INFO" >> $LOG_FILE
    else
        echo "[WARN] $IP 无法连接,请检查网络或主机状态!" | tee -a $LOG_FILE
    fi
done < $IP_LIST

echo "========== 巡检结束,详情见 $LOG_FILE =========="

2. 智能化日志清理脚本

磁盘报警是运维常见的“惊吓”。全量删除日志可能掩盖问题,仅按时间删除又可能忽略单文件过大的情况。此脚本采用“时间+体积”双重维度进行智能清理。


#!/bin/bash
# 描述:智能清理旧日志,保留近期小体积日志,清理大体积或久远日志

LOG_DIR="/var/log/app"
DAYS_KEEP=7
SIZE_LIMIT_MB=500 # 单文件超过500MB即截断

find $LOG_DIR -type f -name "*.log" | while read -r LOG_FILE; do
    # 获取文件修改时间(天)和大小(MB)
    FILE_AGE=$(( ($(date +%s) - $(stat -c %Y "$LOG_FILE")) / 86400 ))
    FILE_SIZE=$(( $(stat -c %s "$LOG_FILE") / 1024 / 1024 ))

    # 策略一:超过保留天数的直接删除
    if [ $FILE_AGE -gt $DAYS_KEEP ]; then
        echo "删除过期日志: $LOG_FILE (${FILE_AGE}天)"
        rm -f "$LOG_FILE"
    # 策略二:未过期但体积超限的,清空内容(保留文件句柄,防止应用写入报错)
    elif [ $FILE_SIZE -gt $SIZE_LIMIT_MB ]; then
        echo "截断超大日志: $LOG_FILE (${FILE_SIZE}MB)"
        > "$LOG_FILE"
    fi
done

二、 Python脚本:复杂业务与API交互的王者

当涉及API调用、复杂数据结构处理或跨平台操作时,Python的丰富库生态使其远胜于Shell。

1. API服务健康检查与钉钉告警

微服务架构下,服务存活不仅看端口,更看API业务逻辑是否正常。此脚本对核心API进行探测,异常时通过Webhook自动推送告警,实现闭环。


import requests
import json
import time
from datetime import datetime

# 配置项
CHECK_URLS = [
    {"name": "用户中心API", "url": "https://api.example.com/users/health", "expect_code": 200},
    {"name": "订单服务API", "url": "https://api.example.com/orders/health", "expect_code": 200}
]
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"

def send_alert(service_name, error_msg):
    """发送钉钉告警"""
    headers = {'Content-Type': 'application/json'}
    payload = {
        "msgtype": "markdown",
        "markdown": {
            "title": "服务异常告警",
            "text": f"### ⚠️ 服务异常告警\n> **服务名**: {service_name}\n> **异常详情**: {error_msg}\n> **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        }
    }
    try:
        requests.post(DINGTALK_WEBHOOK, headers=headers, data=json.dumps(payload), timeout=5)
    except Exception as e:
        print(f"告警发送失败: {e}")

def check_services():
    """执行健康检查"""
    for svc in CHECK_URLS:
        try:
            # 设置超时防止脚本挂起
            resp = requests.get(svc["url"], timeout=10)
            if resp.status_code != svc["expect_code"]:
                raise Exception(f"状态码异常(期望:{svc['expect_code']}, 实际:{resp.status_code})")
            print(f"[OK] {svc['name']} - 状态正常")
        except Exception as e:
            error_detail = str(e)
            print(f"[FAIL] {svc['name']} - {error_detail}")
            send_alert(svc['name'], error_detail)

if __name__ == "__main__":
    check_services()

2. MySQL定时逻辑备份与云端上传

物理备份(如Xtrabackup)适合大数据量,但对于中小型业务库,逻辑备份(mysqldump)配合压缩与云存储,是最轻量且高效的容灾方案。


import os
import subprocess
import boto3 # 需安装boto3库:pip install boto3
from datetime import datetime

# 数据库与云存储配置
DB_HOST = "db.internal.example.com"
DB_USER = "backup_user"
DB_PASS = os.getenv("DB_PASSWORD") # 安全实践:从环境变量读取密码
DB_NAME = "core_business"
S3_BUCKET = "my-backup-bucket"

def backup_and_upload():
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    dump_file = f"/tmp/{DB_NAME}_{timestamp}.sql"
    gzip_file = f"{dump_file}.gz"

    # 1. 执行mysqldump并压缩
    print(f"开始导出数据库 {DB_NAME}...")
    dump_cmd = f"mysqldump -h{DB_HOST} -u{DB_USER} -p{DB_PASS} {DB_NAME} | gzip > {gzip_file}"
    try:
        subprocess.run(dump_cmd, shell=True, check=True, stderr=subprocess.PIPE)
    except subprocess.CalledProcessError as e:
        print(f"数据库导出失败: {e.stderr.decode()}")
        return

    # 2. 上传至S3兼容存储(如AWS S3、MinIO等)
    print("开始上传至云存储...")
    try:
        s3 = boto3.client('s3')
        s3_key = f"db_backups/{os.path.basename(gzip_file)}"
        s3.upload_file(gzip_file, S3_BUCKET, s3_key)
        print(f"上传成功: s3://{S3_BUCKET}/{s3_key}")
    except Exception as e:
        print(f"云存储上传失败: {e}")
    finally:
        # 3. 清理本地临时文件
        os.remove(gzip_file)
        print("本地临时文件已清理")

if __name__ == "__main__":
    backup_and_upload()

三、 运维脚本编写规范与