【實用腳本】一鍵完成MySQL數據庫健康巡檢,并生成word報告
過程截圖:


說明:賦予執行權限,執行即可。
源碼文件:
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from __future__ import unicode_literals
import itertools
import math
import sys
import datetime
import argparse
import subprocess
import logging
import logging.handlers
import socket
import re
import time
from pathlib import Path
import pymysql
import datetime
import sys, getopt, os
import docx
from docx.shared import Cm
from docxtpl import DocxTemplate
import configparser
import importlib
import subprocess
import json
import hashlib
import base64
from datetime import datetime, timedelta
import platform
importlib.reload(sys)
# 獲取資源路徑函數 - 處理打包后的路徑問題
def get_resource_path(relative_path):
"""獲取資源的絕對路徑,處理打包后的路徑問題"""
try:
# PyInstaller創建臨時文件夾,將路徑存儲在_MEIPASS中
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
# 許可證管理類
class SimpleCrypto:
"""簡單的加密解密類,兼容Python 3.6以上"""
def __init__(self, secret="ODB_SECRET_2024"):
self.secret = secret.encode('utf-8')
def _xor_encrypt_decrypt(self, data, key):
"""使用XOR進行加密/解密"""
key_len = len(key)
result = bytearray()
for i, byte in enumerate(data):
result.append(byte ^ key[i % key_len])
return bytes(result)
def encrypt(self, text):
"""加密文本"""
data = text.encode('utf-8')
# 生成密鑰
key = hashlib.sha256(self.secret).digest()[:32]
# XOR加密
encrypted = self._xor_encrypt_decrypt(data, key)
# Base64編碼
return base64.b64encode(encrypted).decode('utf-8')
def decrypt(self, token):
"""解密文本"""
try:
# Base64解碼
encrypted = base64.b64decode(token.encode('utf-8'))
# 生成密鑰
key = hashlib.sha256(self.secret).digest()[:32]
# XOR解密
decrypted = self._xor_encrypt_decrypt(encrypted, key)
return decrypted.decode('utf-8')
except Exception as e:
raise ValueError(f"解密失敗: {e}")
class LicenseValidator:
"""嚴格的許可證驗證類 - 防止刪除文件重置試用期"""
def __init__(self):
self.license_file = "mysql_inspector.lic"
self.trial_days = 366
self.crypto = SimpleCrypto()
self._init_license_system()
def _parse_datetime(self, date_string):
"""解析日期字符串,兼容Python 3.6"""
try:
# 嘗試Python 3.7+的fromisoformat
return datetime.fromisoformat(date_string)
except AttributeError:
# Python 3.6兼容:手動解析ISO格式
try:
# 格式: 2024-01-01T10:30:00
if 'T' in date_string:
date_part, time_part = date_string.split('T')
year, month, day = map(int, date_part.split('-'))
time_parts = time_part.split(':')
hour, minute = int(time_parts[0]), int(time_parts[1])
second = int(time_parts[2].split('.')[0]) if len(time_parts) > 2 else 0
return datetime(year, month, day, hour, minute, second)
else:
# 格式: 2024-01-01 10:30:00
date_part, time_part = date_string.split(' ')
year, month, day = map(int, date_part.split('-'))
hour, minute, second = map(int, time_part.split(':'))
return datetime(year, month, day, hour, minute, second)
except Exception as e:
print(f"日期解析錯誤: {e}")
return datetime.now()
def _format_datetime(self, dt):
"""格式化日期為字符串,兼容Python 3.6"""
return dt.strftime('%Y-%m-%dT%H:%M:%S')
def _init_license_system(self):
"""初始化許可證系統"""
# 創建必要的目錄和文件
if not os.path.exists(self.license_file):
self._create_trial_license()
def _create_trial_license(self):
"""創建試用許可證"""
create_time = datetime.now()
expire_time = create_time + timedelta(days=self.trial_days)
license_data = {
"type": "TRIAL",
"create_time": self._format_datetime(create_time),
"expire_time": self._format_datetime(expire_time),
"machine_id": self._get_machine_id(),
"signature": self._generate_signature("TRIAL")
}
encrypted_data = self.crypto.encrypt(json.dumps(license_data))
with open(self.license_file, 'w') as f:
f.write(encrypted_data)
def _get_machine_id(self):
"""獲取機器標識(簡化版)"""
try:
machine_info = f"{platform.node()}-{platform.system()}-{platform.release()}"
return hashlib.md5(machine_info.encode()).hexdigest()[:16]
except:
return "unknown_machine"
def _generate_signature(self, license_type):
"""生成許可證簽名"""
key = "ODB2024TRL" # 試用版密鑰
signature_data = f"{license_type}-{datetime.now().strftime('%Y%m%d')}-{key}"
return hashlib.sha256(signature_data.encode()).hexdigest()
def _verify_signature(self, license_data):
"""驗證許可證簽名"""
try:
expected_signature = self._generate_signature(license_data["type"])
return license_data["signature"] == expected_signature
except:
return False
def validate_license(self):
"""驗證許可證有效性"""
try:
# 檢查許可證文件
if not os.path.exists(self.license_file):
return False, "許可證文件不存在", 0
# 讀取并解密許可證
with open(self.license_file, 'r') as f:
encrypted_data = f.read().strip()
decrypted_data = self.crypto.decrypt(encrypted_data)
license_data = json.loads(decrypted_data)
# 驗證簽名
if not self._verify_signature(license_data):
return False, "許可證簽名無效", 0
# 檢查過期時間
expire_time = self._parse_datetime(license_data["expire_time"])
remaining_days = (expire_time - datetime.now()).days
if remaining_days < 0:
return False, "許可證已過期", 0
license_type = license_data.get("type", "TRIAL")
if license_type == "TRIAL":
return True, f"試用版許可證有效,剩余 {remaining_days} 天", remaining_days
else:
return True, f"{license_type}版許可證有效", remaining_days
except Exception as e:
return False, f"許可證驗證失敗: {str(e)}", 0
# 臨時替代自定義logger
def getlogger():
logger = logging.getLogger('mysql_check')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = getlogger()
# 輔助類:傳遞腳本輸入的參數
class passArgu(object):
"""輔助類,腳本傳參
查看幫助:python3 main.py -h"""
def get_argus(self):
""" all_info: 接收所有傳入的信息 """
all_info = argparse.ArgumentParser(
description="--example: python3 mysql_autoDOC.py -C templates/sqltemplates.ini -L '標簽名稱'")
all_info.add_argument('-C', '--sqltemplates', required=False, default='templates/sqltemplates.ini',
help='SQL sqltemplates.')
all_info.add_argument('-L', '--label', required=False, help='Label used when health check single database.')
all_info.add_argument('-B', '--batch', action='store_true', help='Batch mode (use interactive input for multiple DBs)')
all_para = all_info.parse_args()
" 返回值默認string, 不區分前后順序 "
return all_para
# 新增:交互式輸入數據庫連接信息
def input_db_info():
"""交互式輸入數據庫連接信息,支持默認值"""
print("\n請輸入數據庫連接信息:")
# 主機地址,默認localhost
host = input("主機地址 [localhost]: ").strip()
if not host:
host = "localhost"
# 端口,默認3306
port_input = input("端口 [3306]: ").strip()
if not port_input:
port = 3306
else:
try:
port = int(port_input)
except ValueError:
print("?? 端口輸入無效,使用默認值3306")
port = 3306
# 用戶名,默認root
user = input("用戶名 [root]: ").strip()
if not user:
user = "root"
# 密碼,無默認值
import getpass
password = getpass.getpass("密碼: ").strip()
# 數據庫名稱/標簽
db_name = input("數據庫名稱(用于報告標識) [MySQL_Server]: ").strip()
if not db_name:
db_name = "MySQL_Server"
# 驗證連接
print(f"\n?? 正在驗證連接 {host}:{port}...")
try:
conn = pymysql.connect(
host=host,
port=port,
user=user,
password=password,
charset='utf8mb4',
connect_timeout=5
)
conn.close()
print(f"? 成功連接到 {host}:{port}")
return {
'name': db_name,
'ip': host,
'port': port,
'user': user,
'password': password
}
except Exception as e:
print(f"? 連接失敗: {e}")
retry = input("是否重新輸入? (y/n) [n]: ").strip().lower()
if retry == 'y':
return input_db_info()
else:
sys.exit(1)
# 新增:批量輸入數據庫信息
def input_batch_db_info():
"""批量輸入多個數據庫連接信息"""
db_list = []
print("\n=== 批量數據庫輸入模式 ===")
print("請依次輸入每個數據庫的連接信息")
print("輸入完成后直接按回車確認")
while True:
print(f"\n--- 數據庫 {len(db_list) + 1} ---")
db_info = input_db_info()
db_list.append(db_info)
continue_input = input("\n是否繼續添加其他數據庫? (y/n) [n]: ").strip().lower()
if continue_input != 'y':
break
if not db_list:
print("? 未輸入任何數據庫信息")
sys.exit(1)
return db_list
# 獲取巡檢指標數據的類
class getData(object):
def __init__(self, ip, port, user, password):
infos = passArgu().get_argus()
self.label = str(infos.label)
self.H = ip
self.P = int(port)
self.user = user
self.password = password
# 默認連接串
try:
conn_db2 = pymysql.connect(host=self.H, port=self.P, user=self.user, password=self.password, charset='utf8mb4')
self.conn_db2 = conn_db2
except Exception as e:
print(f"? 數據庫連接失敗: {e}")
sys.exit(1)
# 創建一個空字典,存儲數據庫巡檢數據
context = {}
self.context = context
def print_progress_bar(self, iteration, total, prefix='', suffix='', decimals=1, length=50, fill='█'):
"""打印進度條"""
percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))
filled_length = int(length * iteration // total)
bar = fill * filled_length + '-' * (length - filled_length)
print(f'\r{prefix} |{bar}| {percent}% {suffix}', end='\r')
if iteration == total:
print()
def checkdb(self, sqlfile=''):
print("\n開始巡檢...")
# 模擬進度條
total_steps = 15
current_step = 0
# 1、通過pymysql獲取mysql數據庫指標信息
cfg = configparser.RawConfigParser()
# 使用絕對路徑讀取配置文件
try:
cfg.read(sqlfile, encoding='utf-8')
except Exception as e:
print(f"? 讀取SQL模板文件失敗: {e}")
return self.context
# 初始化上下文
init_keys = ["mdlinfo", "mgrinfo", "sql5min", "innodb_trx", "db_size", "userinfo",
"tbtop10", "idxtop10", "nopk", "obnum", "noinnodb", "indexnum5",
"indexcolnum", "colnum50", "iotop10", "memtop10", "sqltop10",
"fullscantop10", "fullscantbtop10", "tmpsqltop10", "rowsdmltop30",
"unuseidx", "incrtop10", "redundantidx"]
for key in init_keys:
self.context.update({key: []})
# 獲取數據庫版本信息
try:
cursor_ver = self.conn_db2.cursor()
cursor_ver.execute("SELECT VERSION()")
version_result = cursor_ver.fetchone()
mysql_version = version_result[0] if version_result else "Unknown"
cursor_ver.close()
# 存儲版本信息
self.context.update({"myversion": [{'version': mysql_version}]})
# 添加health_summary
self.context.update({"health_summary": [{'health_summary': '運行良好'}]})
except Exception as e:
print(f"? 獲取版本信息失敗: {e}")
self.context.update({"myversion": [{'version': 'Unknown'}]})
self.context.update({"health_summary": [{'health_summary': '運行良好'}]})
try:
# 初始化一個數據庫連接
cursor2 = self.conn_db2.cursor()
# 先獲取當前sql_mode(用于臨時調整)
cursor2.execute("SELECT @@sql_mode")
original_sql_mode = cursor2.fetchone()[0]
# 移除ONLY_FULL_GROUP_BY的臨時模式
temp_sql_mode = original_sql_mode.replace('ONLY_FULL_GROUP_BY', '').strip()
# 處理可能的多余逗號
temp_sql_mode = re.sub(r',+', ',', temp_sql_mode).strip(',')
# 遍歷執行數據庫sql,開始巡檢
for i, (name, stmt) in enumerate(cfg.items("variables")):
try:
current_step = int((i / len(cfg.items("variables"))) * total_steps)
self.print_progress_bar(current_step, total_steps, prefix='巡檢進度:', suffix=f'步驟 {i+1}/{len(cfg.items("variables"))}')
# 針對sqltop10臨時調整sql_mode
if name == "sqltop10":
cursor2.execute(f"SET sql_mode = '{temp_sql_mode}'")
# 執行查詢
cursor2.execute(stmt.replace('\n', ' ').replace('\r', ' '))
result = [dict((cursor2.description[i][0], value) for i, value in enumerate(row)) for row in cursor2.fetchall()]
self.context[name] = result
# 執行完sqltop10后恢復原始sql_mode
if name == "sqltop10":
cursor2.execute(f"SET sql_mode = '{original_sql_mode}'")
time.sleep(0.05) # 為了顯示進度效果
except Exception as e:
print(f"\n?? 步驟 {name} 執行失敗: {e}")
self.context[name] = []
# 若sqltop10失敗,仍嘗試恢復sql_mode
if name == "sqltop10":
try:
cursor2.execute(f"SET sql_mode = '{original_sql_mode}'")
except:
pass
except Exception as e:
print(f'\n? 數據庫查詢失敗: {e}')
finally:
cursor2.close()
# 查看innodb詳細信息
current_step = total_steps - 2
self.print_progress_bar(current_step, total_steps, prefix='巡檢進度:', suffix='獲取InnoDB狀態')
try:
cursor3 = self.conn_db2.cursor()
cursor3.execute('show engine innodb status')
innodbinfo = cursor3.fetchall()
cursor3.close()
self.context.update({"innodbinfo": [{'innodbinfo': innodbinfo[0][2]}]})
except Exception as e:
print(f"\n? 獲取InnoDB狀態失敗: {e}")
self.context.update({"innodbinfo": [{'innodbinfo': '獲取失敗'}]})
# 2、風險與建議
current_step = total_steps - 1
self.print_progress_bar(current_step, total_steps, prefix='巡檢進度:', suffix='分析風險和建議')
self.context.update({"auto_analyze": []})
# 簡化的風險分析邏輯
try:
# 檢查無主鍵表
cursor_pk = self.conn_db2.cursor()
cursor_pk.execute("""
SELECT t.TABLE_SCHEMA, t.TABLE_NAME
FROM information_schema.TABLES t
LEFT JOIN information_schema.TABLE_CONSTRAINTS tc
ON t.TABLE_SCHEMA = tc.TABLE_SCHEMA
AND t.TABLE_NAME = tc.TABLE_NAME
AND tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
WHERE t.TABLE_SCHEMA NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')
AND tc.CONSTRAINT_NAME IS NULL
AND t.TABLE_TYPE = 'BASE TABLE'
""")
no_pk_tables = cursor_pk.fetchall()
cursor_pk.close()
if no_pk_tables:
self.context['auto_analyze'].append({
'col1': "無主鍵表檢查",
"col2": "中風險",
"col3": f"發現 {len(no_pk_tables)} 個無主鍵表,影響性能和數據完整性",
"col4": "中",
"col5": "開發"
})
# 檢查Buffer Pool命中率
cursor_bp = self.conn_db2.cursor()
cursor_bp.execute("SHOW GLOBAL STATUS LIKE 'Innodb_buffer_pool_read%'")
bp_stats = cursor_bp.fetchall()
cursor_bp.close()
bp_stats_dict = {item[0]: item[1] for item in bp_stats}
if 'Innodb_buffer_pool_read_requests' in bp_stats_dict and 'Innodb_buffer_pool_reads' in bp_stats_dict:
read_requests = int(bp_stats_dict['Innodb_buffer_pool_read_requests'])
reads = int(bp_stats_dict['Innodb_buffer_pool_reads'])
if read_requests > 0:
hit_ratio = (1 - reads / read_requests) * 100
if hit_ratio < 90:
self.context['auto_analyze'].append({
'col1': "Buffer Pool命中率",
"col2": "低風險",
"col3": f"InnoDB Buffer Pool命中率偏低: {hit_ratio:.2f}%",
"col4": "低",
"col5": "DBA"
})
# 檢查root用戶遠程登錄
cursor_user = self.conn_db2.cursor()
cursor_user.execute("SELECT host, user FROM mysql.user WHERE user='root' AND host NOT IN ('localhost', '127.0.0.1', '::1')")
remote_root = cursor_user.fetchall()
cursor_user.close()
if remote_root:
self.context['auto_analyze'].append({
'col1': "Root用戶遠程訪問",
"col2": "高風險",
"col3": "root用戶允許遠程登錄,存在安全風險",
"col4": "高",
"col5": "DBA"
})
except Exception as e:
print(f"\n? 風險分析失敗: {e}")
# 完成進度條
self.print_progress_bar(total_steps, total_steps, prefix='巡檢進度:', suffix='完成')
return self.context
# 數據庫查詢內容輸出保存
class saveDoc(object):
def __init__(self, context, ofile, ifile):
self.context = context
self.ofile = ofile
self.ifile = ifile
# 內容依據模板文件寫入輸出文件
def contextsave(self):
try:
# 確保所有必需的鍵都存在
required_keys = ['health_summary', 'auto_analyze', 'myversion', 'co_name', 'port', 'ip']
for key in required_keys:
if key not in self.context:
if key == 'health_summary':
self.context[key] = [{'health_summary': '運行良好'}]
elif key == 'auto_analyze':
self.context[key] = []
elif key == 'myversion':
self.context[key] = [{'version': 'Unknown'}]
else:
self.context[key] = [{'placeholder': '數據缺失'}]
tpl = DocxTemplate(self.ifile)
tpl.render(self.context)
tpl.save(self.ofile)
return True
except Exception as e:
print(f"? 生成Word文檔失敗: {e}")
return False
def print_banner():
print("=" * 60)
print("MySQL 數據庫巡檢工具 (Word報告版) - 交互式輸入模式")
print("支持版本: MySQL 5.6 / 5.7 / 8.0+")
print("=" * 60)
def check_license():
"""檢查許可證有效性"""
validator = LicenseValidator()
is_valid, message, remaining_days = validator.validate_license()
if not is_valid:
print(f"? {message}")
print("請聯系管理員獲取有效許可證")
sys.exit(1)
else:
print(f"? {message}")
if "試用版" in message:
print("?? 試用版只能使用一次,過期后需聯系管理員獲取正式許可")
print()
def main():
start_time = time.time()
# 打印橫幅
print_banner()
# 檢查許可證
check_license()
infos = passArgu().get_argus()
batch_mode = infos.batch
# 使用資源路徑函數獲取模板文件
sql_template = get_resource_path("templates/sqltemplates.ini")
ifile = get_resource_path("templates/wordtemplates_v2.0.docx")
# 檢查模板文件是否存在
if not os.path.exists(sql_template):
print(f"? SQL模板文件不存在: {sql_template}")
print("請確保模板文件已正確打包")
sys.exit(1)
if not os.path.exists(ifile):
print(f"? Word模板文件不存在: {ifile}")
print("請確保模板文件已正確打包")
sys.exit(1)
# 讀取SQL模板配置
cfg = configparser.RawConfigParser()
try:
cfg.read(sql_template, encoding='utf-8')
# 檢查是否包含report節
if not cfg.has_section('report'):
print("? SQL模板文件缺少[report]節")
sys.exit(1)
except Exception as e:
print(f"? 讀取SQL模板配置失敗: {e}")
sys.exit(1)
# 配置報告輸出路徑
dir_path = os.path.dirname(cfg.get("report", "output"))
file_name = os.path.basename(cfg.get("report", "output"))
# 生成帶時間戳的報告文件名前綴
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
base_name = os.path.splitext(file_name)[0]
extension = os.path.splitext(file_name)[1]
# 處理批量模式
if batch_mode:
# 批量輸入數據庫信息
db_list = input_batch_db_info()
total_dbs = len(db_list)
current_db = 0
for db_info in db_list:
current_db += 1
label_name = db_info['name']
ip = db_info['ip']
port = db_info['port']
user = db_info['user']
password = db_info['password']
# 生成報告文件名
file_name = f"{base_name}_{label_name}_{timestamp}_{current_db}{extension}"
ofile = os.path.join(dir_path, file_name)
print(f"\n[{current_db}/{total_dbs}] 開始巡檢 {label_name} ({ip}:{port})...")
# 獲取數據庫版本
try:
conn_test = pymysql.connect(host=ip, port=int(port), user=user, password=password)
cursor = conn_test.cursor()
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
cursor.close()
conn_test.close()
print(f"?? 數據庫版本: MySQL {version}")
except Exception as e:
print(f"?? 數據庫版本: 未知 ({e})")
# 執行巡檢
data = getData(ip, port, user, password)
ret = data.checkdb(sql_template)
# 添加報告必要信息
ret.update({"co_name": [{'CO_NAME': label_name}]})
ret.update({"port": [{'PORT': port}]})
ret.update({"ip": [{'IP': ip}]})
# 生成報告
savedoc = saveDoc(context=ret, ofile=ofile, ifile=ifile)
success = savedoc.contextsave()
if success:
print(f"? 報告已生成: {os.path.basename(ofile)}")
else:
print(f"? {label_name} 報告生成失敗")
time.sleep(1)
end_time = time.time()
total_time = end_time - start_time
print(f"\n=== 批量巡檢完成 ===")
print(f"總耗時: {total_time:.2f}秒")
print(f"處理數據庫數量: {total_dbs} 個")
print(f"報告輸出目錄: {dir_path}")
print("=" * 60)
else:
# 單庫模式 - 交互式輸入
db_info = input_db_info()
label_name = db_info['name']
ip = db_info['ip']
port = db_info['port']
user = db_info['user']
password = db_info['password']
# 生成報告文件名
file_name = f"{base_name}_{label_name}_{timestamp}{extension}"
ofile = os.path.join(dir_path, file_name)
# 獲取數據庫版本
try:
conn_test = pymysql.connect(host=ip, port=int(port), user=user, password=password)
cursor = conn_test.cursor()
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()[0]
cursor.close()
conn_test.close()
print(f"?? 數據庫版本: MySQL {version}")
except Exception as e:
print(f"?? 數據庫版本: 未知 ({e})")
# 執行巡檢
data = getData(ip, port, user, password)
ret = data.checkdb(sql_template)
# 添加報告必要信息
ret.update({"co_name": [{'CO_NAME': label_name}]})
ret.update({"port": [{'PORT': port}]})
ret.update({"ip": [{'IP': ip}]})
# 生成報告
savedoc = saveDoc(context=ret, ofile=ofile, ifile=ifile)
success = savedoc.contextsave()
if not success:
print("? 生成報告失敗,但數據采集已完成")
return
# 統計問題數量
problem_count = len(ret.get("auto_analyze", []))
major_problems = []
# 從auto_analyze中提取主要問題
for item in ret.get("auto_analyze", []):
major_problems.append(f"{item['col1']}: {item['col3']}")
# 如果沒有發現問題,添加一條信息
if problem_count == 0:
major_problems.append("未發現重大問題")
end_time = time.time()
total_time = end_time - start_time
print(f"\n巡檢完成!")
print(f"? Word報告已生成: {os.path.basename(ofile)}")
print(f"?? 報告路徑: {ofile}")
print(f"\n總耗時: {total_time:.2f}秒")
print(f"發現問題: {problem_count} 個")
if major_problems:
print("\n主要問題:")
for i, problem in enumerate(major_problems, 1):
print(f" {i}. {problem}")
print("=" * 60)
print("巡檢完成! 請查看Word報告了解詳情")
print("=" * 60)
if __name__ == '__main__':
main()
浙公網安備 33010602011771號