量化交易系統--數據庫搭建(從原理到實踐)
數據模塊架構:


數據庫用途規劃
| 數據庫 | 存儲內容 | 分片策略 | 推薦理由 |
|---|---|---|---|
| TDengine | Tick數據、分鐘線 | 超級表 + 子表 | 性能與靈活性的完美結合,為時序數據而生。 |
| PostgreSQL | 日線、復雜因子、基本面 | 單一大表 + 時間分區 | 適合復雜關聯查詢和分析,關系模型表現力強。 |
| Redis | 實時行情、狀態、緩存 | Key中含代碼 | 邏輯分片,實現毫秒級訪問,清晰直觀。 |
| MySQL | 元數據(代碼表、日歷) | 經典關系表 | 保證數據一致性和關系完整性。 |

一、TDengine (專為時序數據設計)
策略:超級表(Super Table) + 子表(Sub-Table)模式 —— 這是“所有股票一張表”和“每支股票一張表”優勢的結合體,是最優解。
TDengine的設計哲學是:為每個數據采集點(在您這就是每支股票)創建一張獨立的子表,但這些子表都遵循一個由超級表定義的統一模板。
優勢:您既可以像查詢“一張大表”那樣進行全局分析,又可以像查詢“單個小表”那樣高效獲取單支股票的數據。
結論:對于TDengine,您不需要自己糾結分片策略,使用其內置的超級表模式就是最佳實踐。
二、PostgreSQL + TimescaleDB (用于日線、基本面、復雜因子)
策略:單一大表 + 分區表(Partitioning) —— 即“所有股票放一張表”,但按時間或股票代碼進行分區。
PostgreSQL更適合用寬表模式存儲日線等低頻數據,并結合TimescaleDB或原生分區表來管理數據。
2. 獨立性安排
所有日線數據都在 quotes_daily 這一張表里,通過 symbol_id 和 trade_date 來定位數據。
優點:
查詢方便:輕松實現“查詢所有A股2023年市盈率低于10的股票”這樣的復雜篩選,如果分表,這種查詢將是噩夢。
維護簡單:備份、索引、Vacuum等操作只需要對一張表進行。
關系清晰:易于與基本面等其他表進行關聯查詢(JOIN)。
三、Redis (用于實時緩存和狀態)
策略:Key-Value設計,Key中包含股票代碼 —— 這是一種邏輯上的“分表”。
Redis是鍵值數據庫,它的“表”就是Key。我們通過精心設計Key的名稱來實現邏輯隔離。
-
“表”結構設計
最新行情(Hash):
Key: quote:{symbol} 如 quote:600000.SH
Value: { price: 10.50, open: 10.30, high: 10.60, volume: 12345678, ts: 1685437200000 }訂單簿(Sorted Set 或 Hash):
Key: orderbook:{symbol} 如 orderbook:600000.SH
Value: { bids: '[[10.49, 1000], [10.48, 500]]', asks: '[[10.51, 2000]]' } (可以用JSON字符串存儲)集合(Set):
Key: industry:banks (存儲所有銀行股的代碼)
Members: -
獨立性安排
每支股票的數據在邏輯上都是獨立的,通過Key中的{symbol}來區分。
優點:
極致性能:讀取quote:600000.SH就像直接訪問內存中的一個變量,速度極快。
清晰明了:Key的設計模式使得數據管理非常直觀。
四、MySQL/MariaDB (用于元數據)
策略:經典的關聯表設計 —— 全部是關系表,不分表。
這里存儲的是高度結構化的關系數據,完全遵循數據庫范式設計。
獨立性安排
所有同類數據存于一張表。例如,所有股票代碼都在 symbols 表里。
優點:
數據一致性:避免重復,易于管理。
強大的查詢能力:可以輕松執行復雜的關聯查詢
行動指南
在NAS的Docker上部署TDengine
TDengine文檔:https://docs.taosdata.com/2.6/
從 3.3.7.0 版本開始,TDengine TSDB 的鏡像名稱調整如下:
社區版的鏡像名稱從 tdengine/tdengine 重命名為 tdengine/tsdb
企業版的鏡像名稱從 tdengine/tdengine-ee 重命名為 tdengine/tsdb-ee
Docker布署:https://docs.taosdata.com/get-started/docker/
我實際的YAML文件:
version: '3.8'
services:
tdengine:
image: docker.1ms.run/tdengine/tdengine:latest
container_name: tdengine
restart: always # 容器故障或NAS重啟時自動恢復
environment:
- TAOS_FQDN=tdengine # 容器內部主機名
- TAOS_PASSWORD=password # 設置強密碼,但這個密碼好像沒卵用
- TAOS_PORT=6030 # 客戶端連接端口
ports:
- "6030:6030" # 客戶端連接端口
- "6041:6041" # REST API端口
- "6042:6042" # 其他服務端口
- "6043-6049:6043-6049" # 預留端口范圍
- "6041-6049:6041-6049/udp" # UDP協議端口
volumes:
# 數據持久化到NAS本地目錄,需提前創建
- /volume5/docker5/tdengine/data:/var/lib/taos
# 日志持久化,便于問題排查
- /volume5/docker5/tdengine/log:/var/log/taos
# 配置文件持久化(首次啟動后會自動生成配置)
- /volume5/docker5/tdengine/config:/etc/taos
healthcheck:
# 健康檢查,確保服務正常運行
test: ["CMD", "taos", "-s", "show databases;"]
interval: 30s
timeout: 10s
retries: 3
好了,具體的命令,參考:https://docs.taosdata.com/reference/tools/taos-cli/
taos> CREATE DATABASE IF NOT EXISTS stock
> KEEP 365 -- 數據保留365天
> CACHE 16 -- 緩存大小16MB
> UPDATE 1 -- 允許數據更新
> CHARSET UTF8; -- 支持中文(與其他參數同屬CREATE語句的一部分)
>
>
> show databases;
Create OK, 0 row(s) affected (0.004209s)
DB error: Incomplete SQL statement [0x80002601] (0.000281s)
taos> show databases;
name |
=================================
information_schema |
performance_schema |
stock |
log |
Query OK, 4 row(s) in set (0.006709s)
taos> exit
按照上述設計,首先創建 symbols 等元數據表。
編寫數據采集程序,按照“超級表”模式將Tick數據寫入TDengine,同時將最新數據寫入Redis。
日線數據繼續寫入PostgreSQL的超表。
在您的數據管理層(適配器+工廠模式)中,封裝好對不同數據庫的查詢邏輯,對上層應用提供統一接口。
連通性測試
john@Desktop-CLF:/$ telnet 192.168.123.104 6041
Trying 192.168.123.104...
Connected to 192.168.123.104.
Escape character is '^]'.
john@Desktop-CLF:/$ telnet 192.168.123.104 6030
Trying 192.168.123.104...
Connected to 192.168.123.104.
Escape character is '^]'.
從以上操作來看,telnet 命令的輸出已經表明連接成功了:
當你執行 telnet 192.168.123.104 6041 時,顯示 Connected to 192.168.123.104,說明 6041 端口已連通
執行 telnet 192.168.123.104 6030 時,同樣顯示 Connected to 192.168.123.104,說明 6030 端口也已連通
# 發送認證請求測試(返回 200 表示正常)6041 是 TDengine 的 REST 端口,可通過 curl 發送 HTTP 請求驗證:
john@Desktop-CLF:/$ curl -u root:taosdata "http://192.168.123.104:6041/rest/sql" -d "show databases;"
{"code":0,"column_meta":[["name","VARCHAR",64]],"data":[["information_schema"],["performance_schema"],["stock"],["log"],["db"]],"rows":5}john@Desktop-CLF:/$
從 curl 命令的輸出結果來看,你的 WSL 已經成功連接到 NAS 中的 TDengine 數據庫了!
響應中的 "code":0 表示請求成功
返回了數據庫列表(information_schema、stock、log 等),說明 TDengine 服務正常響應
NAS布署 PostgreSQL + TimescaleDB
安裝 TimescaleDB 擴展(PostgreSQL 的時序時序數據庫擴展)需要根據你的操作系統和 PostgreSQL 版本進行操作。以下是Docker安裝步驟:
使用官方 TimescaleDB Docker 鏡像(推薦)
Timescale 提供了預安裝 TimescaleDB 擴展的 PostgreSQL 鏡像,直接使用即可:
- 拉取官方鏡像
下載鏡像
docker pull docker.1ms.run/timescale/timescaledb-ha:pg17.6-ts2.22.0-oss
啟動前準備
確保宿主機端口 5433 未被占用(可通過 netstat -tulpn | grep 5433 檢查)。
執行命令查看 5433 端口被哪個進程 / 容器占用:
bash
# 查看 5433 端口占用情況
sudo netstat -tulpn | grep 5433
# 或用 docker 專門查看容器端口占用
docker ps --filter "publish=5433"
#如果換了 5434 后仍提示端口占用,可繼續嘗試 5435、5436 等,直到找到空閑端口。也可換這個命令:
sudo netstat -tulpn | grep 543
如果輸出結果顯示有容器(比如你之前的 db 服務)占用 5433,就必須給新容器換一個未占用的端口。
重建數據目錄權限(避免權限錯誤):
bash
sudo rm -rf /volume5/docker5/timescale/pgdata # 清除可能損壞的舊數據
sudo mkdir -p /volume5/docker5/timescale/pgdata
sudo chmod 700 /volume5/docker5/timescale/pgdata # PostgreSQL 要求的嚴格權限
點擊查看YAML文件代碼
version: '3.8'
services:
timescaledb:
image: docker.1ms.run/timescale/timescaledb-ha:pg17.6-ts2.22.0-oss
container_name: timescaledb-oss
restart: unless-stopped
ports:
- "5438:5432" # 已確認的未占用端口,避開沖突
volumes:
# 官網指定 PGDATA 路徑,宿主機目錄按實際權限調整
- /volume5/docker5/timescale/pgdata:/home/postgres/pgdata/data
- /volume5/docker5/timescale/conf:/etc/postgresql/17/main/conf.d
#######################################
# 核心修復:使用容器內默認 postgres 用戶(UID 999),而非宿主機 UID
# 官網強調:TimescaleDB 鏡像內 postgres 用戶 UID 固定為 999,GID 固定為 999
#######################################
user: "999:999" # 容器內 postgres 用戶的默認 UID/GID,無需修改
environment:
# 官網必填初始化配置
POSTGRES_DB: mystocks
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password #自己設置
PGDATA: /home/postgres/pgdata/data # 嚴格對齊官網 PGDATA 路徑
# 官網單節點與擴展配置
PATRONI_MODE: "standalone"
PATRONI_SCOPE: "timescale-single-node"
TIMESCALEDB_TELEMETRY: "off"
POSTGRES_MULTIPLE_DATABASES: "mystocks"
TIMESCALEDB_EXTENSION_CREATE_IF_NOT_EXISTS: "true"
# 官網性能與時區配置
TZ: "Asia/Shanghai"
PATRONI_POSTGRESQL_PARAMETERS: |
shared_buffers = 512MB
work_mem = 32MB
max_connections = 100
log_directory = '/home/postgres/pgdata/data/logs'
# 官網標準健康檢查
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB} -h 127.0.0.1 -p 5432"]
interval: 10s
timeout: 5s
retries: 5
start_period: 60s # 適配 PostgreSQL 17 初始化耗時
networks:
default:
driver: bridge
驗證安裝
進入容器并連接 PostgreSQL:
bash
# 進入容器
docker exec -it timescaledb psql -U postgres -d your_db
# 在 PostgreSQL 終端中驗證
CREATE EXTENSION IF NOT EXISTS timescaledb;
SELECT timescaledb_version(); # 輸出版本號即成功

請分析一下這兩個版本的差異:基于 PostgreSQL 17 的最新版本是 timescale/timescaledb-ha:pg17,輕量級的 TimescaleDB 非 HA Docker 鏡像使用 Alpine?;?PostgreSQL 17 的最新版本是 timescale/timescaledb:latest-pg17。
timescale/timescaledb-ha:pg17 和 timescale/timescaledb:latest-pg17 都是基于 PostgreSQL 17 的 TimescaleDB 鏡像,但針對不同場景設計,核心差異體現在高可用性(HA)支持、底層系統和功能定位上。以下是具體分析:
-
高可用性(HA)支持
timescale/timescaledb-ha:pg17
專為高可用集群設計,內置 HA 所需的組件和配置(如 Patroni、etcd 等)。
支持自動故障轉移、主從復制、集群狀態管理,適合生產環境中對可用性要求高的場景(如金融、工業監控等)。
可直接用于搭建多節點集群,無需手動配置 replication 或故障轉移邏輯。
timescale/timescaledb:latest-pg17
是單節點輕量鏡像,不包含 HA 組件,僅集成了 TimescaleDB 擴展的基礎功能。
適合開發、測試環境,或對可用性要求不高的單節點生產場景(如小型應用、個人項目)。 -
底層系統與鏡像大小
timescale/timescaledb-ha:pg17
基于Debian/Ubuntu系統構建(非 Alpine),包含更多系統工具和依賴(如集群管理工具、監控組件等)。
鏡像體積較大(通常數百 MB),因為需要容納 HA 相關的額外軟件。
timescale/timescaledb:latest-pg17
基于Alpine Linux構建(輕量級 Linux 發行版),僅包含運行 TimescaleDB 必需的組件。
鏡像體積更?。ㄍǔ资?MB),啟動速度更快,適合資源受限的環境或追求輕量化部署的場景。 -
功能與工具集成
timescale/timescaledb-ha:pg17
集成了 HA 集群管理工具(如 Patroni 用于自動切換,etcd 用于分布式配置存儲)。
包含監控和運維相關工具(如 Prometheus exporters、日志收集組件),方便集群狀態監控。
預設了針對集群優化的 PostgreSQL 配置(如連接池、復制參數等)。
timescale/timescaledb:latest-pg17
僅包含 PostgreSQL 17 核心和 TimescaleDB 擴展,無額外集群工具。
配置更精簡,適合自定義調整參數,或作為基礎鏡像二次構建。 -
適用場景
鏡像 適用場景 優勢 劣勢
timescale/timescaledb-ha:pg17 生產環境、多節點集群、高可用性需求 內置 HA 功能、適合大規模部署、運維工具齊全 體積大、資源消耗高、配置復雜
timescale/timescaledb:latest-pg17 開發 / 測試、單節點部署、輕量應用 體積小、啟動快、配置簡單 無 HA 支持、需手動搭建集群
總結建議
若需要在生產環境中保證服務不中斷(如避免單點故障),選擇 timescale/timescaledb-ha:pg17,并配合官方文檔搭建集群。
若用于本地開發、測試,或部署單節點服務,timescale/timescaledb:latest-pg17 更輕便高效。
兩者均支持 TimescaleDB 的核心功能(如超表、時序數據優化等),差異主要體現在集群可用性和部署復雜度上。
timescale/timescaledb-ha 鏡像的這些版本差異主要體現在 包含的組件功能 和 是否包含商業特性 上。以下是具體解析:
版本命名規則
先理解版本號的構成:pg<PostgreSQL版本>-ts<TimescaleDB版本>-[all]-[oss]
pg17.6:基于 PostgreSQL 17.6
ts2.22.0:集成 TimescaleDB 2.22.0 擴展
后綴 all/oss 表示功能范圍差異
各版本區別
版本 核心差異 適用場景
pg17.6-ts2.22.0 基礎高可用版本,包含 TimescaleDB 社區版核心功能 + HA 組件(Patroni、etcd 等),但不含額外工具鏈。 僅需基礎 HA 集群功能,不需要監控、備份等附加工具的場景。
pg17.6-ts2.22.0-all 在基礎版之上,額外集成了 完整工具鏈,包括:
- 監控工具(Prometheus exporter、Grafana 配置)
- 備份工具(pgBackRest)
- 日志管理(Vector)
- 集群管理輔助腳本。 生產環境中需要一站式 HA 解決方案,包含監控、備份等運維功能的場景。
pg17.6-ts2.22.0-oss 僅包含 開源(Open Source)組件,不含任何 Timescale 商業特性(如 Timescale Cloud 集成、企業級支持工具等),HA 功能基于純開源軟件實現。 對開源合規性要求嚴格,不希望引入任何商業閉源組件的場景。
pg17.6-ts2.22.0-all-oss 集成完整工具鏈(同 all 版本),但 僅包含開源組件,不含商業特性。 需要完整運維工具鏈,同時要求純開源合規的生產環境。
關鍵區別點總結
all 后綴:是否包含完整運維工具鏈(監控、備份、日志等)。
帶 all:工具齊全,開箱即用,適合生產環境。
不帶 all:僅基礎 HA 功能,工具需自行部署。
oss 后綴:是否限制為純開源組件。
帶 oss:僅包含開源許可的功能,無商業特性(如 Timescale 企業級插件)。
不帶 oss:可能包含 Timescale 商業特性(需注意許可條款)。
選擇建議
生產環境優先選 all 版本(如 pg17.6-ts2.22.0-all),工具鏈齊全,減少運維成本。
若有開源合規要求(如禁止使用非開源組件),選 all-oss 版本。
僅做測試或自定義工具鏈時,可選基礎版(不帶 all)。
注意:商業特性的使用可能受許可限制,生產環境使用前建議查閱 Timescale 官方許可說明。
如果你不需要搭建多節點 HA 集群,也不開啟 TimescaleDB 遙測和執行定期備份時序數據,那么可以考慮選擇非 HA 版本的 TimescaleDB 鏡像,這樣可以避免一些不必要的組件和功能,從而減小鏡像體積。
在 TimescaleDB 的鏡像版本中,帶有 “-all” 后綴的版本通常包含了所有的組件和依賴項,包括 HA 相關的組件,所以體積會比較大。而 “-oss” 后綴的版本是開源社區版,不包含一些商業功能和 HA 組件,可能更適合你的需求。
NAS布署Redis數據庫
services:
redis:
image: docker.1ms.run/redis:latest
restart: unless-stopped
volumes:
- /volume5/docker5/Redis:/data
ports:
- "6379:6379"
networks:
- paperless-shared-network # 加入paperless網絡, 各位可以忽略我這一句
- mystocks-network # 加入mystocks網絡
command: redis-server --appendonly yes
networks:
# 關鍵修改:Redis負責創建該網絡(不再標記為external)
paperless-shared-network:
driver: bridge # 由Redis初始化網絡,確保始終存在
mystocks-network:
driver: bridge
Grafana實現TDEngine完整的監控功能(尚未嘗試)
TDengine 能夠與開源數據可視化系統 Grafana 快速集成搭建數據監測報警系統,整個過程無需任何代碼開發,TDengine 中數據表的內容可以在儀表盤(DashBoard)上進行可視化展現。
??完整的監控功能需要安裝并運行 taoskeeper 服務。taoskeeper 負責接收監控指標數據并創建 log 庫。
TDEngine安裝配置參考鏈接:https://blog.csdn.net/weixin_44462773/article/details/130999428
grafana創建面板鏈接:https://docs.taosdata.com/third-party/grafana/
https://docs.taosdata.com/third-party/visual/grafana/#安裝-grafana-plugin-并配置數據源
https://docs.taosdata.com/operation/monitor/#tdinsight---使用監控數據庫--grafana-對-tdengine-進行監控的解決方案?login=from_csdn
在WSL中安裝DBeaverCE
https://dbeaver.io/download/
所說還有另外一種方案:https://docs.taosdata.com/reference/explorer/
我選擇安裝WSL版本,如下:
#安裝dbeaver-ce
sudo snap install dbeaver-ce
(base) root@Desktop-CLF:~# sudo snap list | grep dbeaver-ce
dbeaver-ce 25.2.0.202508311659 400 latest/stable dbeaver-corp -
#運行
dbeaver-ce
安裝完成后,可參考官方DBeaver連接的指南(但實際上沒參考它):https://docs.taosdata.com/third-party/tool/dbeaver/
配置 TDengine 連接,填入主機地址、端口號、用戶名和密碼。如果 TDengine 部署在本機,可以只填用戶名和密碼,默認用戶名為 root,默認密碼為 taosdata。
注意:這里沒有連SSH

建表程序(init_db_monitor.py)
import sqlalchemy
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
import logging
import argparse
import os
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_env_config(env_file='env'):
"""從環境變量文件加載配置"""
config = {}
try:
# 檢查文件是否存在
if not os.path.exists(env_file):
raise FileNotFoundError(f"環境變量文件 '{env_file}' 不存在")
# 讀取文件內容
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
# 跳過注釋和空行
if not line or line.startswith('#'):
continue
# 解析鍵值對
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
# 驗證必要的配置項
required_keys = ['MYSQL_HOST', 'MYSQL_USER', 'MYSQL_PASSWORD', 'MYSQL_PORT']
for key in required_keys:
if key not in config:
raise ValueError(f"環境變量文件缺少必要配置: {key}")
return {
'user': config['MYSQL_USER'],
'password': config['MYSQL_PASSWORD'],
'host': config['MYSQL_HOST'],
'port': int(config['MYSQL_PORT']),
'database': 'mysql', # 初始連接使用的數據庫
'charset': 'utf8mb4',
'collation': 'utf8mb4_unicode_ci'
}
except Exception as e:
logger.error(f"加載配置失敗: {str(e)}")
raise
def get_sql_commands(drop_existing=False, charset='utf8mb4', collation='utf8mb4_unicode_ci'):
"""生成SQL命令,支持刪除已有表選項"""
drop_commands = ""
if drop_existing:
drop_commands = """
DROP TABLE IF EXISTS table_validation_log;
DROP TABLE IF EXISTS table_operation_log;
DROP TABLE IF EXISTS column_definition_log;
DROP TABLE IF EXISTS table_creation_log;
"""
return f"""
CREATE DATABASE IF NOT EXISTS db_monitor
CHARACTER SET {charset}
COLLATE {collation};
USE db_monitor;
{drop_commands}
CREATE TABLE table_creation_log (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主鍵',
table_name VARCHAR(255) NOT NULL COMMENT '表名',
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL COMMENT '數據庫類型',
database_name VARCHAR(255) NOT NULL COMMENT '數據庫名稱',
creation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
modification_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改時間',
status ENUM('success', 'failed') NOT NULL COMMENT '創建狀態',
table_parameters JSON NOT NULL COMMENT '表參數配置(JSON格式)',
ddl_command TEXT NOT NULL COMMENT '執行的DDL命令',
error_message TEXT COMMENT '錯誤信息(如有)',
INDEX idx_database_type (database_type),
INDEX idx_creation_time (creation_time)
) ENGINE=InnoDB DEFAULT CHARSET={charset} COMMENT='表創建日志表';
CREATE TABLE column_definition_log (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主鍵',
table_log_id INT NOT NULL COMMENT '關聯的表創建日志ID',
column_name VARCHAR(255) NOT NULL COMMENT '列名',
data_type VARCHAR(100) NOT NULL COMMENT '數據類型',
col_length INT COMMENT '列長度',
col_precision INT COMMENT '精度',
col_scale INT COMMENT '小數位數',
is_nullable BOOLEAN DEFAULT TRUE COMMENT '是否允許為空',
is_primary_key BOOLEAN DEFAULT FALSE COMMENT '是否為主鍵',
default_value VARCHAR(255) COMMENT '默認值',
comment TEXT COMMENT '列備注',
FOREIGN KEY (table_log_id) REFERENCES table_creation_log(id) ON DELETE CASCADE,
INDEX idx_table_log_id (table_log_id)
) ENGINE=InnoDB DEFAULT CHARSET={charset} COMMENT='列定義日志表';
-- 新增表操作日志表
CREATE TABLE table_operation_log (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主鍵',
table_name VARCHAR(255) NOT NULL COMMENT '表名',
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL COMMENT '數據庫類型',
database_name VARCHAR(255) NOT NULL COMMENT '數據庫名稱',
operation_type ENUM('CREATE', 'ALTER', 'DROP', 'VALIDATE') NOT NULL COMMENT '操作類型',
operation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '操作時間',
operation_status ENUM('success', 'failed') NOT NULL COMMENT '操作狀態',
operation_details JSON NOT NULL COMMENT '操作詳情(JSON格式)',
ddl_command TEXT COMMENT '執行的DDL命令',
error_message TEXT COMMENT '錯誤信息(如有)',
INDEX idx_operation_time (operation_time),
INDEX idx_operation_type (operation_type)
) ENGINE=InnoDB DEFAULT CHARSET={charset} COMMENT='表操作日志表';
-- 新增表結構驗證日志表
CREATE TABLE table_validation_log (
id INT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主鍵',
table_name VARCHAR(255) NOT NULL COMMENT '表名',
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL COMMENT '數據庫類型',
database_name VARCHAR(255) NOT NULL COMMENT '數據庫名稱',
validation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '驗證時間',
validation_status ENUM('pass', 'fail') NOT NULL COMMENT '驗證狀態',
validation_details JSON NOT NULL COMMENT '驗證詳情(JSON格式)',
issues_found TEXT COMMENT '發現的問題',
INDEX idx_validation_time (validation_time)
) ENGINE=InnoDB DEFAULT CHARSET={charset} COMMENT='表結構驗證日志表';
"""
def create_database_and_tables(drop_existing=False):
try:
# 從env文件加載配置
db_config = load_env_config()
# 創建數據庫連接字符串
connection_str = (
f"mysql+pymysql://{db_config['user']}:{db_config['password']}@"
f"{db_config['host']}:{db_config['port']}/{db_config['database']}?"
f"charset={db_config['charset']}"
)
# 建立數據庫連接
engine = sqlalchemy.create_engine(connection_str)
with engine.connect() as connection:
# 確保自動提交模式開啟
connection = connection.execution_options(autocommit=True)
# 獲取SQL命令
sql_commands = get_sql_commands(
drop_existing=drop_existing,
charset=db_config['charset'],
collation=db_config['collation']
).split(';')
# 執行SQL命令
for cmd in sql_commands:
cmd = cmd.strip()
if cmd: # 跳過空命令
logger.info(f"執行SQL命令: {cmd[:80]}...") # 只顯示前80個字符
connection.execute(text(cmd))
logger.info("所有數據庫和表創建成功!")
return True
except SQLAlchemyError as e:
logger.error(f"執行SQL時發生錯誤: {str(e)}")
return False
except Exception as e:
logger.error(f"發生意外錯誤: {str(e)}")
return False
if __name__ == "__main__":
# 解析命令行參數
parser = argparse.ArgumentParser(description='創建監控數據庫和表結構')
parser.add_argument('--drop-existing', action='store_true',
help='刪除已存在的表(如果存在)')
args = parser.parse_args()
create_database_and_tables(drop_existing=args.drop_existing)
建立建表接口
CREATE DATABASE IF NOT EXISTS db_monitor;
USE db_monitor;
CREATE TABLE table_creation_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL,
database_name VARCHAR(255) NOT NULL,
creation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
modification_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
status ENUM('success', 'failed') NOT NULL,
table_parameters JSON NOT NULL,
ddl_command TEXT NOT NULL,
error_message TEXT,
INDEX idx_database_type (database_type),
INDEX idx_creation_time (creation_time)
);
CREATE TABLE column_definition_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_log_id INT NOT NULL,
column_name VARCHAR(255) NOT NULL,
data_type VARCHAR(100) NOT NULL,
col_length INT, -- 避免使用關鍵字 length
col_precision INT, -- 避免使用關鍵字 precision
col_scale INT, -- 避免使用關鍵字 scale
is_nullable BOOLEAN DEFAULT TRUE,
is_primary_key BOOLEAN DEFAULT FALSE,
default_value VARCHAR(255),
comment TEXT,
FOREIGN KEY (table_log_id) REFERENCES table_creation_log(id) ON DELETE CASCADE
);
-- 新增表操作日志表
CREATE TABLE table_operation_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL,
database_name VARCHAR(255) NOT NULL,
operation_type ENUM('CREATE', 'ALTER', 'DROP', 'VALIDATE') NOT NULL,
operation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
operation_status ENUM('success', 'failed') NOT NULL,
operation_details JSON NOT NULL,
ddl_command TEXT,
error_message TEXT,
INDEX idx_operation_time (operation_time),
INDEX idx_operation_type (operation_type)
);
-- 新增表結構驗證日志表
CREATE TABLE table_validation_log (
id INT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(255) NOT NULL,
database_type ENUM('TDengine', 'PostgreSQL', 'Redis', 'MySQL', 'MariaDB') NOT NULL,
database_name VARCHAR(255) NOT NULL,
validation_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
validation_status ENUM('pass', 'fail') NOT NULL,
validation_details JSON NOT NULL,
issues_found TEXT,
INDEX idx_validation_time (validation_time)
);
增強的python監控數據庫實現代碼:
點擊查看代碼
import json
import logging
import os
from datetime import datetime
from enum import Enum
from typing import Dict, List, Any, Optional, Tuple
import sqlalchemy as sa
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, JSON, Boolean, Enum as SQLEnum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
import psycopg2
import taos
import redis
import pymysql
from dotenv import load_dotenv
import yaml
# 加載環境變量
load_dotenv()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("DatabaseTableManager")
# 從環境變量獲取監控數據庫連接
MONITOR_DB_URL = os.getenv("MONITOR_DB_URL", "mysql+pymysql://user:password@localhost/db_monitor")
Base = declarative_base()
class DatabaseType(Enum):
TDENGINE = "TDengine"
POSTGRESQL = "PostgreSQL"
REDIS = "Redis"
MYSQL = "MySQL"
MARIADB = "MariaDB"
# ORM模型
class TableCreationLog(Base):
__tablename__ = 'table_creation_log'
id = Column(Integer, primary_key=True)
table_name = Column(String(255), nullable=False)
database_type = Column(String(20), nullable=False)
database_name = Column(String(255), nullable=False)
creation_time = Column(DateTime, default=datetime.utcnow)
modification_time = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
status = Column(String(10), nullable=False)
table_parameters = Column(JSON, nullable=False)
ddl_command = Column(Text, nullable=False)
error_message = Column(Text)
# 關系
columns = relationship("ColumnDefinitionLog", backref="table_log", cascade="all, delete-orphan")
class ColumnDefinitionLog(Base):
__tablename__ = 'column_definition_log'
id = Column(Integer, primary_key=True)
table_log_id = Column(Integer, sa.ForeignKey('table_creation_log.id'))
column_name = Column(String(255), nullable=False)
data_type = Column(String(100), nullable=False)
col_length = Column(Integer) # 避免使用關鍵字 length
col_precision = Column(Integer) # 避免使用關鍵字 precision
col_scale = Column(Integer) # 避免使用關鍵字 scale
is_nullable = Column(Boolean, default=True)
is_primary_key = Column(Boolean, default=False)
default_value = Column(String(255))
comment = Column(Text)
class TableOperationLog(Base):
__tablename__ = 'table_operation_log'
id = Column(Integer, primary_key=True)
table_name = Column(String(255), nullable=False)
database_type = Column(String(20), nullable=False)
database_name = Column(String(255), nullable=False)
operation_type = Column(SQLEnum('CREATE', 'ALTER', 'DROP', 'VALIDATE'), nullable=False)
operation_time = Column(DateTime, default=datetime.utcnow)
operation_status = Column(String(10), nullable=False)
operation_details = Column(JSON, nullable=False)
ddl_command = Column(Text)
error_message = Column(Text)
class TableValidationLog(Base):
__tablename__ = 'table_validation_log'
id = Column(Integer, primary_key=True)
table_name = Column(String(255), nullable=False)
database_type = Column(String(20), nullable=False)
database_name = Column(String(255), nullable=False)
validation_time = Column(DateTime, default=datetime.utcnow)
validation_status = Column(String(10), nullable=False)
validation_details = Column(JSON, nullable=False)
issues_found = Column(Text)
class DatabaseTableManager:
def __init__(self):
# 初始化監控數據庫連接
self.monitor_engine = create_engine(MONITOR_DB_URL)
Base.metadata.create_all(self.monitor_engine)
Session = sessionmaker(bind=self.monitor_engine)
self.monitor_session = Session()
# 從環境變量加載各數據庫連接配置
self.db_configs = {
DatabaseType.TDENGINE: {
'host': os.getenv('TDENGINE_HOST', 'localhost'),
'user': os.getenv('TDENGINE_USER', 'root'),
'password': os.getenv('TDENGINE_PASSWORD', 'taosdata'),
'port': int(os.getenv('TDENGINE_PORT', '6030'))
},
DatabaseType.POSTGRESQL: {
'host': os.getenv('POSTGRESQL_HOST', 'localhost'),
'user': os.getenv('POSTGRESQL_USER', 'postgres'),
'password': os.getenv('POSTGRESQL_PASSWORD', ''),
'port': int(os.getenv('POSTGRESQL_PORT', '5432'))
},
DatabaseType.REDIS: {
'host': os.getenv('REDIS_HOST', 'localhost'),
'port': int(os.getenv('REDIS_PORT', '6379')),
'password': os.getenv('REDIS_PASSWORD', None),
'db': int(os.getenv('REDIS_DB', '0'))
},
DatabaseType.MYSQL: {
'host': os.getenv('MYSQL_HOST', 'localhost'),
'user': os.getenv('MYSQL_USER', 'root'),
'password': os.getenv('MYSQL_PASSWORD', ''),
'port': int(os.getenv('MYSQL_PORT', '3306'))
},
DatabaseType.MARIADB: {
'host': os.getenv('MARIADB_HOST', 'localhost'),
'user': os.getenv('MARIADB_USER', 'root'),
'password': os.getenv('MARIADB_PASSWORD', ''),
'port': int(os.getenv('MARIADB_PORT', '3306'))
}
}
# 各數據庫連接池
self.db_connections = {}
def get_connection(self, db_type: DatabaseType, db_name: str, **kwargs):
"""獲取數據庫連接"""
conn_key = f"{db_type.value}_{db_name}"
if conn_key in self.db_connections:
return self.db_connections[conn_key]
# 獲取默認配置并更新用戶提供的參數
config = self.db_configs[db_type].copy()
config.update(kwargs)
try:
if db_type == DatabaseType.TDENGINE:
# TDengine連接
conn = taos.connect(
host=config['host'],
user=config['user'],
password=config['password'],
port=config['port'],
database=db_name
)
elif db_type == DatabaseType.POSTGRESQL:
# PostgreSQL連接
conn = psycopg2.connect(
host=config['host'],
user=config['user'],
password=config['password'],
port=config['port'],
database=db_name
)
elif db_type == DatabaseType.REDIS:
# Redis連接
conn = redis.Redis(
host=config['host'],
port=config['port'],
db=config['db'],
password=config['password'],
decode_responses=True
)
elif db_type in [DatabaseType.MYSQL, DatabaseType.MARIADB]:
# MySQL/MariaDB連接
conn = pymysql.connect(
host=config['host'],
user=config['user'],
password=config['password'],
port=config['port'],
database=db_name,
charset='utf8mb4'
)
else:
raise ValueError(f"Unsupported database type: {db_type}")
self.db_connections[conn_key] = conn
return conn
except Exception as e:
logger.error(f"Failed to connect to {db_type.value} database {db_name}: {str(e)}")
raise
def _log_operation(self, table_name: str, db_type: DatabaseType, db_name: str,
operation_type: str, operation_details: Dict, ddl_command: str = "",
status: str = "success", error_message: str = ""):
"""記錄操作日志到監控數據庫"""
log_entry = TableOperationLog(
table_name=table_name,
database_type=db_type.value,
database_name=db_name,
operation_type=operation_type,
operation_status=status,
operation_details=operation_details,
ddl_command=ddl_command,
error_message=error_message
)
self.monitor_session.add(log_entry)
self.monitor_session.commit()
return log_entry.id
def create_table(self, db_type: DatabaseType, db_name: str, table_name: str,
columns: List[Dict], **kwargs) -> bool:
"""在指定數據庫中創建表"""
# 記錄開始信息到監控表
operation_id = self._log_operation(
table_name, db_type, db_name, "CREATE",
{"columns": columns, "kwargs": kwargs},
status="processing"
)
try:
# 獲取數據庫連接
conn = self.get_connection(db_type, db_name, **kwargs)
# 生成DDL語句
if db_type == DatabaseType.TDENGINE:
ddl = self._generate_tdengine_ddl(table_name, columns, kwargs.get('tags', []))
cursor = conn.cursor()
cursor.execute(ddl)
elif db_type == DatabaseType.POSTGRESQL:
ddl = self._generate_postgresql_ddl(table_name, columns, **kwargs)
cursor = conn.cursor()
cursor.execute(ddl)
conn.commit()
elif db_type in [DatabaseType.MYSQL, DatabaseType.MARIADB]:
ddl = self._generate_mysql_ddl(table_name, columns, **kwargs)
cursor = conn.cursor()
cursor.execute(ddl)
conn.commit()
elif db_type == DatabaseType.REDIS:
# Redis沒有表結構,這里可以創建一些初始結構
ddl = f"REDIS INIT for {table_name}"
self._initialize_redis_structure(conn, table_name, columns)
else:
raise ValueError(f"Unsupported database type: {db_type}")
# 記錄表創建日志
log_entry = TableCreationLog(
table_name=table_name,
database_type=db_type.value,
database_name=db_name,
status='success',
table_parameters=json.dumps({"columns": columns, "kwargs": kwargs}),
ddl_command=ddl
)
self.monitor_session.add(log_entry)
self.monitor_session.flush() # 獲取ID但不提交
# 記錄列定義
for col_def in columns:
col_log = ColumnDefinitionLog(
table_log_id=log_entry.id,
column_name=col_def['name'],
data_type=col_def['type'],
col_length=col_def.get('length'),
col_precision=col_def.get('precision'),
col_scale=col_def.get('scale'),
is_nullable=col_def.get('nullable', True),
is_primary_key=col_def.get('primary_key', False),
default_value=col_def.get('default'),
comment=col_def.get('comment')
)
self.monitor_session.add(col_log)
# 提交所有更改
self.monitor_session.commit()
# 更新操作日志
self._log_operation(
table_name, db_type, db_name, "CREATE",
{"columns": columns, "kwargs": kwargs}, ddl, "success"
)
# 驗證表結構
self.validate_table_structure(db_type, db_name, table_name, columns)
return True
except Exception as e:
# 記錄錯誤信息
error_msg = str(e)
logger.error(f"Failed to create table {table_name}: {error_msg}")
# 更新操作日志
self._log_operation(
table_name, db_type, db_name, "CREATE",
{"columns": columns, "kwargs": kwargs}, "", "failed", error_msg
)
self.monitor_session.rollback()
return False
def alter_table(self, db_type: DatabaseType, db_name: str, table_name: str,
alterations: List[Dict], **kwargs) -> bool:
"""修改表結構"""
# 記錄開始信息
operation_id = self._log_operation(
table_name, db_type, db_name, "ALTER",
{"alterations": alterations, "kwargs": kwargs},
status="processing"
)
try:
conn = self.get_connection(db_type, db_name, **kwargs)
cursor = conn.cursor()
# 生成ALTER語句
ddl = self._generate_alter_ddl(db_type, table_name, alterations)
# 執行ALTER語句
cursor.execute(ddl)
if db_type != DatabaseType.REDIS: # Redis不需要commit
conn.commit()
# 更新操作日志
self._log_operation(
table_name, db_type, db_name, "ALTER",
{"alterations": alterations, "kwargs": kwargs}, ddl, "success"
)
# 更新表創建日志中的修改時間
table_log = self.monitor_session.query(TableCreationLog).filter_by(
table_name=table_name,
database_type=db_type.value,
database_name=db_name
).first()
if table_log:
table_log.modification_time = datetime.utcnow()
self.monitor_session.commit()
return True
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to alter table {table_name}: {error_msg}")
self._log_operation(
table_name, db_type, db_name, "ALTER",
{"alterations": alterations, "kwargs": kwargs}, "", "failed", error_msg
)
return False
def drop_table(self, db_type: DatabaseType, db_name: str, table_name: str, **kwargs) -> bool:
"""刪除表"""
# 記錄開始信息
operation_id = self._log_operation(
table_name, db_type, db_name, "DROP",
{"kwargs": kwargs},
status="processing"
)
try:
conn = self.get_connection(db_type, db_name, **kwargs)
cursor = conn.cursor()
# 生成DROP語句
if db_type == DatabaseType.TDENGINE:
ddl = f"DROP TABLE IF EXISTS {table_name}"
elif db_type == DatabaseType.POSTGRESQL:
ddl = f"DROP TABLE IF EXISTS {table_name} CASCADE"
elif db_type in [DatabaseType.MYSQL, DatabaseType.MARIADB]:
ddl = f"DROP TABLE IF EXISTS {table_name}"
elif db_type == DatabaseType.REDIS:
# Redis沒有表的概念,刪除相關的所有鍵
ddl = f"Redis keys deletion for pattern: {table_name}:*"
keys = conn.keys(f"{table_name}:*")
if keys:
conn.delete(*keys)
else:
raise ValueError(f"Unsupported database type: {db_type}")
# 執行DROP語句(Redis除外)
if db_type != DatabaseType.REDIS:
cursor.execute(ddl)
conn.commit()
# 更新操作日志
self._log_operation(
table_name, db_type, db_name, "DROP",
{"kwargs": kwargs}, ddl, "success"
)
# 從監控表中刪除相關記錄
table_log = self.monitor_session.query(TableCreationLog).filter_by(
table_name=table_name,
database_type=db_type.value,
database_name=db_name
).first()
if table_log:
# 刪除相關的列定義記錄(由于外鍵約束,會級聯刪除)
self.monitor_session.delete(table_log)
self.monitor_session.commit()
return True
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to drop table {table_name}: {error_msg}")
self._log_operation(
table_name, db_type, db_name, "DROP",
{"kwargs": kwargs}, "", "failed", error_msg
)
return False
def validate_table_structure(self, db_type: DatabaseType, db_name: str,
table_name: str, expected_columns: List[Dict], **kwargs) -> Dict:
"""驗證表結構是否符合預期"""
validation_details = {
"expected_columns": expected_columns,
"actual_columns": [],
"matches": False,
"issues": []
}
try:
# 獲取實際表結構
actual_structure = self.get_table_info(db_type, db_name, table_name, **kwargs)
if not actual_structure:
validation_details["issues"].append("Table does not exist or cannot be accessed")
validation_status = "fail"
else:
validation_details["actual_columns"] = actual_structure.get("columns", [])
# 驗證列匹配
expected_cols = {col["name"]: col for col in expected_columns}
actual_cols = {col["name"]: col for col in actual_structure.get("columns", [])}
# 檢查缺失的列
for col_name in expected_cols:
if col_name not in actual_cols:
validation_details["issues"].append(f"Missing column: {col_name}")
# 檢查多余的列
for col_name in actual_cols:
if col_name not in expected_cols:
validation_details["issues"].append(f"Extra column: {col_name}")
# 檢查列屬性
for col_name in expected_cols:
if col_name in actual_cols:
expected = expected_cols[col_name]
actual = actual_cols[col_name]
# 檢查數據類型
if expected.get("type") and actual.get("type"):
if expected["type"].lower() != actual["type"].lower():
validation_details["issues"].append(
f"Column {col_name} type mismatch: expected {expected['type']}, got {actual['type']}"
)
# 檢查是否允許為空
if "nullable" in expected and "nullable" in actual:
if expected["nullable"] != actual["nullable"]:
validation_details["issues"].append(
f"Column {col_name} nullable mismatch: expected {expected['nullable']}, got {actual['nullable']}"
)
# 確定驗證狀態
validation_details["matches"] = len(validation_details["issues"]) == 0
validation_status = "pass" if validation_details["matches"] else "fail"
except Exception as e:
error_msg = str(e)
logger.error(f"Failed to validate table {table_name}: {error_msg}")
validation_details["issues"].append(f"Validation error: {error_msg}")
validation_status = "fail"
# 記錄驗證結果
validation_log = TableValidationLog(
table_name=table_name,
database_type=db_type.value,
database_name=db_name,
validation_status=validation_status,
validation_details=validation_details,
issues_found="; ".join(validation_details["issues"]) if validation_details["issues"] else None
)
self.monitor_session.add(validation_log)
self.monitor_session.commit()
return validation_details
def batch_create_tables(self, config_file: str):
"""通過配置文件批量創建表"""
try:
with open(config_file, 'r') as f:
config = yaml.safe_load(f)
results = {}
for table_config in config.get('tables', []):
db_type = DatabaseType[table_config['database_type'].upper()]
db_name = table_config['database_name']
table_name = table_config['table_name']
columns = table_config['columns']
kwargs = table_config.get('kwargs', {})
result = self.create_table(db_type, db_name, table_name, columns, **kwargs)
results[table_name] = result
return results
except Exception as e:
logger.error(f"Failed to batch create tables: {str(e)}")
return {"error": str(e)}
def _generate_alter_ddl(self, db_type: DatabaseType, table_name: str, alterations: List[Dict]) -> str:
"""生成ALTER TABLE語句"""
ddl_parts = []
for alteration in alterations:
operation = alteration.get('operation') # ADD, DROP, MODIFY, RENAME
if operation == 'ADD':
col_def = self._generate_column_definition(alteration)
ddl_parts.append(f"ADD COLUMN {col_def}")
elif operation == 'DROP':
ddl_parts.append(f"DROP COLUMN {alteration['column_name']}")
elif operation == 'MODIFY':
col_def = self._generate_column_definition(alteration)
ddl_parts.append(f"MODIFY COLUMN {col_def}")
elif operation == 'RENAME':
ddl_parts.append(f"RENAME COLUMN {alteration['old_name']} TO {alteration['new_name']}")
return f"ALTER TABLE {table_name} {', '.join(ddl_parts)}"
def _generate_column_definition(self, col_def: Dict) -> str:
"""生成列定義字符串"""
definition = f"{col_def['name']} {col_def['type']}"
# 處理長度/精度
if col_def.get('length') and col_def['type'].lower() in ['varchar', 'char']:
definition += f"({col_def['length']})"
elif col_def.get('precision') and col_def['type'].lower() in ['numeric', 'decimal']:
if col_def.get('scale'):
definition += f"({col_def['precision']}, {col_def['scale']})"
else:
definition += f"({col_def['precision']})"
# 處理約束
if not col_def.get('nullable', True):
definition += " NOT NULL"
if col_def.get('default') is not None:
definition += f" DEFAULT {col_def['default']}"
if col_def.get('comment'):
definition += f" COMMENT '{col_def['comment']}'"
return definition
# 其他方法 (_generate_tdengine_ddl, _generate_postgresql_ddl, _generate_mysql_ddl,
# _initialize_redis_structure, get_table_info, close_all_connections) 保持不變
# 但需要更新以使用新的列名 (col_length, col_precision, col_scale)
# 使用示例
if __name__ == "__main__":
manager = DatabaseTableManager()
# 批量創建表示例
results = manager.batch_create_tables("tables_config.yaml")
print("Batch create results:", results)
# 修改表示例
alterations = [
{
"operation": "ADD",
"name": "new_column",
"type": "VARCHAR",
"length": 100,
"nullable": True,
"comment": "New column added by alter operation"
}
]
success = manager.alter_table(
DatabaseType.MYSQL,
'test_db',
'test_table',
alterations,
host='localhost',
user='root',
password='password'
)
print(f"Alter table {'成功' if success else '失敗'}")
# 刪除表示例
success = manager.drop_table(
DatabaseType.MYSQL,
'test_db',
'test_table',
host='localhost',
user='root',
password='password'
)
print(f"Drop table {'成功' if success else '失敗'}")
manager.close_all_connections()
python連接TDengine
參考:https://docs.taosdata.com/2.6/develop/connect/
先下載這個:https://www.taosdata.com/assets-download/3.0/TDengine-client-3.3.6.13-Windows-x64.exe
# 安裝最新版本
pip3 install taospy
# 安裝指定版本
pip3 install taospy==2.6.2
# 從 GitHub 安裝
pip3 install git+https://github.com/taosdata/taos-connector-python.git
#對于 REST 連接,只需驗證是否能成功導入 taosrest 模塊??稍?Python 交互式 Shell 中輸入:
import taosrest

參考
TDengine文檔:https://docs.taosdata.com/reference/connector/java/#properties
浙公網安備 33010602011771號