Python導出PostgreSQL數據庫的表結構(使用SSH隧道)
1、安裝依賴
pip install sshtunnel psycopg2 openpyxl pandas
2、實現代碼:
import pandas as pd import psycopg2 from openpyxl import load_workbook from openpyxl.styles import Border, Side, PatternFill, Font, Alignment from openpyxl.utils import get_column_letter from sshtunnel import SSHTunnelForwarder def apply_formatting(excel_path): """應用格式設置""" wb = load_workbook(excel_path) # 定義樣式 blue_fill = PatternFill( start_color="0070C0", end_color="0070C0", fill_type="solid" ) thin_border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) for sheet in wb.worksheets: # 設置標題行樣式 for cell in sheet[1]: cell.fill = blue_fill cell.font = Font(color="FFFFFF", bold=True) # 設置邊框 for row in sheet.iter_rows(min_row=1): for cell in row: cell.border = thin_border # 自適應列寬 for column in sheet.columns: max_length = max( len(str(cell.value)) for cell in column ) adjusted_width = (max_length + 2) * 1.2 sheet.column_dimensions[ get_column_letter(column[0].column) ].width = adjusted_width # 定義居中對齊樣式(水平居中 + 垂直居中) alignment = Alignment( horizontal='center', # 水平居中:'left', 'center', 'right', 'justify' vertical='center' # 垂直居中:'top', 'center', 'bottom' ) # 對指定的列設置居中 columns_to_center = ['A', 'H'] # 將A列和H列設置居中 for col in columns_to_center: for cell in sheet[col]: cell.alignment = alignment wb.save(excel_path) def add_hyperlinks(excel_path): """添加雙向超鏈接""" wb = load_workbook(excel_path) ws_summary = wb["首頁總覽"] # 總覽表到分表 for row in ws_summary.iter_rows(min_row=2): table_name = row[1].value sheet_name = table_name[:30] hyperlink = f"#'{sheet_name}'!A1" row[1].hyperlink = hyperlink row[1].style = "Hyperlink" # 分表到總覽表 for sheet_name in wb.sheetnames[1:]: ws = wb[sheet_name] last_row = ws.max_row + 1 ws.cell(row=last_row, column=1, value="返回\"首頁總覽\"") ws.cell(row=last_row, column=1).hyperlink = "#'首頁總覽'!A1" ws.cell(row=last_row, column=1).style = "Hyperlink" wb.save(excel_path) def sql_to_dataframe(cursor): """將 pyodbc cursor 結果轉換為 DataFrame""" # 獲取列名 columns = [column[0] for column in cursor.description] # 獲取數據并創建DataFrame data = cursor.fetchall() return pd.DataFrame.from_records(data, columns=columns) def get_tables_info(cursor, db_user): """獲取所有表信息""" query = f""" SELECT c.relname AS table_name, obj_description(c.oid, 'pg_class') AS table_comment FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relkind = 'r' AND n.nspname = 'public' AND NOT EXISTS ( SELECT 1 FROM pg_inherits i WHERE i.inhrelid = c.oid ) ORDER BY table_name """ # 執行查詢 cursor.execute(query) return sql_to_dataframe(cursor) def get_columns_info(cursor, table_name): """獲取指定表結構詳情""" query = f""" SELECT c.relname AS table_name, a.attname AS column_name, col_description(a.attrelid, a.attnum) AS column_comment, CASE WHEN t.typcategory = 'A' THEN REPLACE(t.typname, '_', '') || '[]' ELSE t.typname END AS column_type, CASE WHEN t.typname IN ('varchar','bpchar') THEN a.atttypmod -4 WHEN t.typname = 'numeric' THEN (a.atttypmod -4) >> 16 & 65535 ELSE NULL END AS type_length, CASE WHEN t.typname = 'numeric' THEN (a.atttypmod -4) & 65535 ELSE NULL END AS type_scale, CASE WHEN a.attnotnull THEN '[√]' ELSE '[]' END AS isnotnull FROM pg_class c JOIN pg_attribute a ON a.attrelid = c.oid JOIN pg_type t ON a.atttypid = t.oid WHERE c.relkind = 'r' AND a.attnum > 0 AND NOT a.attisdropped AND c.relname = '{table_name}' AND c.relnamespace = 'public'::regnamespace ORDER BY c.relname, a.attnum """ # 執行查詢 cursor.execute(query) return sql_to_dataframe(cursor) if __name__ == '__main__': # ================== 配置信息 ================== # SSH 隧道配置 ssh_host = "100.100.10.10" # SSH跳板機IP ssh_port = 22 # SSH端口(默認22) ssh_user = "root" # SSH用戶名 ssh_password = "Root@1234" # SSH密碼 # 數據庫參數 db_host = "100.100.20.20" # 數據庫服務器實際IP(SSH隧道目標) db_port = 5432 # 數據庫默認端口 db_name = "mydb" # 數據庫 db_user = "postgres" # 數據庫用戶 db_password = "postgres" # 數據庫用戶的密碼 # 本地綁定端口(隨機可用端口) local_bind_port = 9090 # ================== 建立SSH隧道 ================== try: with SSHTunnelForwarder( (ssh_host, ssh_port), ssh_username=ssh_user, ssh_password=ssh_password, remote_bind_address=(db_host, db_port), local_bind_address=("0.0.0.0", local_bind_port), set_keepalive=15 # 保持連接活躍 ) as tunnel: print(f"SSH隧道建立成功,本地端口:{tunnel.local_bind_port}") # ================== 連接數據庫 ================== conn = psycopg2.connect( host='localhost', # 關鍵!必須使用localhost port=tunnel.local_bind_port, # 映射的本地端口 database=db_name, user=db_user, password=db_password ) cursor = conn.cursor() print("數據庫連接成功!") # 獲取所有表的信息 df_summary = get_tables_info(cursor, db_user.upper()) df_summary.insert(0, '序號', range(1, len(df_summary) + 1)) # 寫入的Excel名稱 output_file = f"數據庫{db_name}表結構.xlsx" # 創建Excel寫入對象 with pd.ExcelWriter(output_file, engine='openpyxl') as writer: # 寫入總覽頁 df_summary.to_excel( writer, sheet_name='首頁總覽', index=False, header=['序號', '表名', '表注釋'] ) # 遍歷寫入各表結構 for _, row in df_summary.iterrows(): table_name = row['table_name'] print("正在導出表", table_name, "的表結構") df_columns = get_columns_info(cursor, table_name) df_columns.insert(0, '序號', range(1, len(df_columns) + 1)) df_columns.to_excel( writer, sheet_name=table_name[:30], # Excel表名最長31字符 index=False, header=["序號", "表名", "字段名稱", "字段注釋", "字段類型", "長度", "小數點", "非空"] ) # 應用格式和超鏈接 print("正在設置超鏈接和邊框......") add_hyperlinks(output_file) # 設置超鏈接 apply_formatting(output_file) # 設置邊框 print(f"表結構已成功導出至 {output_file}") except Exception as e: print(f"連接失敗: {str(e)}") finally: if 'conn' in locals(): conn.close()
本文來自博客園,作者:業余磚家,轉載請注明原文鏈接:http://www.rzrgm.cn/yeyuzhuanjia/p/18888468

浙公網安備 33010602011771號