py腳本---將Prometheus數據寫入es
定時將Prometheus監控數據取出寫入到es
import requests
import json
import schedule
import time
from elasticsearch import Elasticsearch
from datetime import datetime
Prometheus URL
prometheus_url = "http://100.64.0.40:9090/api/v1/query"
Elasticsearch 配置
es_host = "http://100.64.0.5:9200"
es_index_prefix = "gpu_metrics" # 設置索引名稱前綴
創建 Elasticsearch 客戶端
es = Elasticsearch([es_host])
定義需要查詢的指標
queries = [
'DCGM_FI_DEV_POWER_USAGE', # GPU 功率使用情況
'DCGM_FI_DEV_SM_CLOCK', # GPU SM 時鐘
'DCGM_FI_DEV_GPU_UTIL', # GPU 使用率
'DCGM_FI_DEV_FB_USED' # GPU 內存使用量
]
定義查詢參數
def query_prometheus(query):
params = {'query': query}
try:
response = requests.get(prometheus_url, params=params)
response.raise_for_status() # 如果返回代碼不是 200,會拋出異常
return response.json()
except requests.exceptions.RequestException as e:
print(f"Failed to query Prometheus for {query}: {e}")
return None
處理查詢結果并寫入 Elasticsearch
def write_to_es(query, data):
if data and 'data' in data and 'result' in data['data']:
results = data['data']['result']
for result in results:
metric = result.get("metric", {})
gpu_device = metric.get("device", "Unknown")
gpu_uuid = metric.get("UUID", "Unknown")
gpu_name = metric.get("modelName", "Unknown")
pci_bus_id = metric.get("pci_bus_id", "Unknown")
timestamp = result.get("value", [None])[0]
value = result.get("value", [None, None])[1]
# 如果數據無效,則跳過該項
if value is None:
continue
# 轉換為小寫的索引名稱
index_name = f"{es_index_prefix}_{query.lower()}_{gpu_device.lower()}"
# 構建 Elasticsearch 文檔
doc = {
"timestamp": datetime.utcfromtimestamp(float(timestamp)),
"gpu_device": gpu_device,
"gpu_uuid": gpu_uuid,
"gpu_name": gpu_name,
"pci_bus_id": pci_bus_id,
"query": query,
"value": value
}
# 寫入 Elasticsearch
try:
es.index(index=index_name, body=doc)
print(f"Data for GPU {gpu_device} written to Elasticsearch in index {index_name}.")
except Exception as e:
print(f"Error writing to Elasticsearch: {e}")
else:
print(f"No valid data found for {query}.")
定時任務:每小時執行一次
def job():
print("Starting the data collection and writing to Elasticsearch...")
for query in queries:
data = query_prometheus(query)
if data is not None:
write_to_es(query, data)
else:
print(f"Skipping {query} due to failed data retrieval.")
print("Data collection and writing completed.")
設置定時任務:每小時執行一次 job 函數
schedule.every(1).hour.do(job)
啟動并持續運行
if name == "main":
while True:
schedule.run_pending() # 檢查并執行已安排的任務
time.sleep(1) # 每秒檢查一次任務

浙公網安備 33010602011771號