运维效能提升:实战中的Shell与Python自动化脚本分享
运维效能提升:实战中的Shell与Python自动化脚本分享
在现代IT运维体系中,面对动辄成百上千的服务器节点与错综复杂的业务架构,传统的“手工敲命令”模式早已无法满足高效、稳定、安全的要求。自动化不仅是运维工程师的“救命稻草”,更是企业降本增效的核心驱动力。
本文将结合日常运维痛点,分享几款在实际生产环境中久经考验的Shell与Python自动化脚本,涵盖主机巡检、日志清理、服务监控与数据备份等高频场景,希望能为各位同行的工具箱增添利器。
一、 Shell脚本:轻量级系统操作利器
Shell脚本凭借其与Linux系统底层的无缝结合,是处理文件操作、系统巡检等任务的首选。
1. 批量主机存活检测与系统信息巡检
在云原生或混合云架构中,节点动态伸缩,手动维护资产清单极易遗漏。此脚本通过读取IP列表,并发探测主机存活状态,并对存活主机提取基础系统信息。
#!/bin/bash
# 描述:批量主机巡检脚本
# 依赖:需配置SSH免密登录,ip_list.txt存放目标IP
IP_LIST="ip_list.txt"
PORT=22
USER="ops_user"
LOG_FILE="host_audit_$(date +%F).log"
echo "========== 开始巡检 $(date) ==========" > $LOG_FILE
while read -r IP; do
# 1. 探测端口判断存活
if nc -z -w 2 $IP $PORT &> /dev/null; then
echo "[INFO] $IP 存活,开始采集信息..." | tee -a $LOG_FILE
# 2. 远程执行命令获取CPU、内存及磁盘信息
SYS_INFO=$(ssh -o ConnectTimeout=5 $USER@$IP "
echo \"CPU负载: $(cat /proc/loadavg | awk '{print $1}')\"
echo \"内存使用率: $(free -m | awk 'NR==2{printf \"%.2f%%\", $3*100/$2}')\"
echo \"磁盘利用率: $(df -h / | awk 'NR==2{print $5}')\"
")
echo -e "$IP 状态:\n$SYS_INFO" >> $LOG_FILE
else
echo "[WARN] $IP 无法连接,请检查网络或主机状态!" | tee -a $LOG_FILE
fi
done < $IP_LIST
echo "========== 巡检结束,详情见 $LOG_FILE =========="
2. 智能化日志清理脚本
磁盘报警是运维常见的“惊吓”。全量删除日志可能掩盖问题,仅按时间删除又可能忽略单文件过大的情况。此脚本采用“时间+体积”双重维度进行智能清理。
#!/bin/bash
# 描述:智能清理旧日志,保留近期小体积日志,清理大体积或久远日志
LOG_DIR="/var/log/app"
DAYS_KEEP=7
SIZE_LIMIT_MB=500 # 单文件超过500MB即截断
find $LOG_DIR -type f -name "*.log" | while read -r LOG_FILE; do
# 获取文件修改时间(天)和大小(MB)
FILE_AGE=$(( ($(date +%s) - $(stat -c %Y "$LOG_FILE")) / 86400 ))
FILE_SIZE=$(( $(stat -c %s "$LOG_FILE") / 1024 / 1024 ))
# 策略一:超过保留天数的直接删除
if [ $FILE_AGE -gt $DAYS_KEEP ]; then
echo "删除过期日志: $LOG_FILE (${FILE_AGE}天)"
rm -f "$LOG_FILE"
# 策略二:未过期但体积超限的,清空内容(保留文件句柄,防止应用写入报错)
elif [ $FILE_SIZE -gt $SIZE_LIMIT_MB ]; then
echo "截断超大日志: $LOG_FILE (${FILE_SIZE}MB)"
> "$LOG_FILE"
fi
done
二、 Python脚本:复杂业务与API交互的王者
当涉及API调用、复杂数据结构处理或跨平台操作时,Python的丰富库生态使其远胜于Shell。
1. API服务健康检查与钉钉告警
微服务架构下,服务存活不仅看端口,更看API业务逻辑是否正常。此脚本对核心API进行探测,异常时通过Webhook自动推送告警,实现闭环。
import requests
import json
import time
from datetime import datetime
# 配置项
CHECK_URLS = [
{"name": "用户中心API", "url": "https://api.example.com/users/health", "expect_code": 200},
{"name": "订单服务API", "url": "https://api.example.com/orders/health", "expect_code": 200}
]
DINGTALK_WEBHOOK = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
def send_alert(service_name, error_msg):
"""发送钉钉告警"""
headers = {'Content-Type': 'application/json'}
payload = {
"msgtype": "markdown",
"markdown": {
"title": "服务异常告警",
"text": f"### ⚠️ 服务异常告警\n> **服务名**: {service_name}\n> **异常详情**: {error_msg}\n> **时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
}
try:
requests.post(DINGTALK_WEBHOOK, headers=headers, data=json.dumps(payload), timeout=5)
except Exception as e:
print(f"告警发送失败: {e}")
def check_services():
"""执行健康检查"""
for svc in CHECK_URLS:
try:
# 设置超时防止脚本挂起
resp = requests.get(svc["url"], timeout=10)
if resp.status_code != svc["expect_code"]:
raise Exception(f"状态码异常(期望:{svc['expect_code']}, 实际:{resp.status_code})")
print(f"[OK] {svc['name']} - 状态正常")
except Exception as e:
error_detail = str(e)
print(f"[FAIL] {svc['name']} - {error_detail}")
send_alert(svc['name'], error_detail)
if __name__ == "__main__":
check_services()
2. MySQL定时逻辑备份与云端上传
物理备份(如Xtrabackup)适合大数据量,但对于中小型业务库,逻辑备份(mysqldump)配合压缩与云存储,是最轻量且高效的容灾方案。
import os
import subprocess
import boto3 # 需安装boto3库:pip install boto3
from datetime import datetime
# 数据库与云存储配置
DB_HOST = "db.internal.example.com"
DB_USER = "backup_user"
DB_PASS = os.getenv("DB_PASSWORD") # 安全实践:从环境变量读取密码
DB_NAME = "core_business"
S3_BUCKET = "my-backup-bucket"
def backup_and_upload():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
dump_file = f"/tmp/{DB_NAME}_{timestamp}.sql"
gzip_file = f"{dump_file}.gz"
# 1. 执行mysqldump并压缩
print(f"开始导出数据库 {DB_NAME}...")
dump_cmd = f"mysqldump -h{DB_HOST} -u{DB_USER} -p{DB_PASS} {DB_NAME} | gzip > {gzip_file}"
try:
subprocess.run(dump_cmd, shell=True, check=True, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
print(f"数据库导出失败: {e.stderr.decode()}")
return
# 2. 上传至S3兼容存储(如AWS S3、MinIO等)
print("开始上传至云存储...")
try:
s3 = boto3.client('s3')
s3_key = f"db_backups/{os.path.basename(gzip_file)}"
s3.upload_file(gzip_file, S3_BUCKET, s3_key)
print(f"上传成功: s3://{S3_BUCKET}/{s3_key}")
except Exception as e:
print(f"云存储上传失败: {e}")
finally:
# 3. 清理本地临时文件
os.remove(gzip_file)
print("本地临时文件已清理")
if __name__ == "__main__":
backup_and_upload()