Python-批量統(tǒng)計(jì)MySQL中表的數(shù)據(jù)量
背景
在數(shù)據(jù)中臺(tái)中,有時(shí)為了核對(duì)數(shù)據(jù),需要每天批量統(tǒng)計(jì)MySQL數(shù)據(jù)庫(kù)中表的數(shù)據(jù)量,但是DMS中沒(méi)有周期調(diào)度功能。
MySQL創(chuàng)建表
--統(tǒng)計(jì)的表清單 CREATE TABLE `dws_table_list` ( `table_name` varchar(255) DEFAULT NULL, `flag` varchar(255) DEFAULT NULL ); --每天的數(shù)據(jù)量 CREATE TABLE `dws_table_count` ( `table_name` varchar(255) DEFAULT NULL, `table_count` bigint DEFAULT NULL, `ds` varchar(255) DEFAULT NULL );
dws_table_list表中插入數(shù)據(jù)

說(shuō)明:test_table1、test_table2、test_table3這些表,需要具有相同的日期字段(比如:ds)。
Python代碼
import pymysql import sys from datetime import datetime, timedelta if __name__ == '__main__': if len(sys.argv) == 2: input_str = sys.argv[1] else: print("參數(shù)個(gè)數(shù)不對(duì)!") exit() # MySQL中表不存在的列表 table_not_exists_list = [] # 沒(méi)有業(yè)務(wù)數(shù)據(jù)日期的列表 dataDate_no_data_list = [] # 業(yè)務(wù)數(shù)據(jù)日期 data_date = datetime.strptime(input_str, "%Y%m%d") data_date_str = data_date.strftime("%Y%m%d") print(f"業(yè)務(wù)數(shù)據(jù)日期: {data_date_str}") # 日期減去特定的時(shí)間間隔 data_date_1days_ago_str = (data_date - timedelta(days=1)).strftime('%Y%m%d') data_date_2days_ago_str = (data_date - timedelta(days=2)).strftime('%Y%m%d') data_date_3days_ago_str = (data_date - timedelta(days=3)).strftime('%Y%m%d') data_date_4days_ago_str = (data_date - timedelta(days=4)).strftime('%Y%m%d') data_date_5days_ago_str = (data_date - timedelta(days=5)).strftime('%Y%m%d') data_date_6days_ago_str = (data_date - timedelta(days=6)).strftime('%Y%m%d') data_date_7days_ago_str = (data_date - timedelta(days=7)).strftime('%Y%m%d') # 連接MySQL數(shù)據(jù)庫(kù) config = { 'user': 'root', 'password': 'Root@1234', 'host': 'localhost', 'database': 'test' } conn = pymysql.connect(**config) # 創(chuàng)建一個(gè)游標(biāo)對(duì)象 cursor = conn.cursor() # 執(zhí)行一個(gè)查詢 query = "SELECT table_name from dws_table_list ORDER BY table_name ;" cursor.execute(query) # 打印表清單中的所有表的數(shù)量 print("dws_table_list表中的表個(gè)數(shù): ",cursor.rowcount) print("**"*100) # 可取出指針結(jié)果集中的所有行,返回的結(jié)果集一個(gè)元組。 result = cursor.fetchall() # 刪除業(yè)務(wù)日期當(dāng)天的數(shù)據(jù),防止重跑時(shí)數(shù)據(jù)重復(fù) cursor.execute("DELETE FROM dws_table_count WHERE ds = %s ;", data_date_str) # 遍歷指針結(jié)果集 for rows in result: table_name = rows[0] query_count = f"SELECT '{table_name}' table_name,COUNT(0),ds table_count FROM {table_name} WHERE ds = '{data_date_str}' GROUP BY ds ;" print(query_count) try: cursor.execute(query_count) result2 = cursor.fetchall() if cursor.rowcount > 0: # 插入數(shù)據(jù) insert_sql = "INSERT INTO dws_table_count VALUES (%s,%s,%s)" for rows2 in result2: cursor.execute(insert_sql, rows2) else: print("\033[93m" + f"Table {table_name} does not have data for {data_date_str} " + "\033[0m") dataDate_no_data_list.append(table_name) except pymysql.MySQLError as e: if "exist" in str(e): print("\033[31m" + "MySQL error: " + str(e) + "\033[0m") table_not_exists_list.append(table_name) else: print("\033[31m" + "MySQL error: " + str(e) + "\033[0m") exit() # 刪除7天之前的數(shù)據(jù) cursor.execute("DELETE FROM dws_table_count WHERE ds = %s ;",data_date_7days_ago_str) # 根據(jù)表中的數(shù)據(jù)展示需要的結(jié)果 final_query = (f"""SELECT table_name, '{data_date_str}' ds, SUM(IF(ds = '{data_date_str}',table_count,0)) data_{data_date_str}, SUM(IF(ds = '{data_date_1days_ago_str}',table_count,0)) data_{data_date_1days_ago_str}, SUM(IF(ds = '{data_date_2days_ago_str}',table_count,0)) data_{data_date_2days_ago_str}, SUM(IF(ds = '{data_date_3days_ago_str}',table_count,0)) data_{data_date_3days_ago_str}, SUM(IF(ds = '{data_date_4days_ago_str}',table_count,0)) data_{data_date_4days_ago_str}, SUM(IF(ds = '{data_date_5days_ago_str}',table_count,0)) data_{data_date_5days_ago_str}, SUM(IF(ds = '{data_date_6days_ago_str}',table_count,0)) data_{data_date_6days_ago_str} FROM dws_table_count WHERE ds BETWEEN '{data_date_6days_ago_str}' AND '{data_date_str}' GROUP BY table_name ; """) cursor.execute(final_query) # 可取出指針結(jié)果集中的所有行,返回的結(jié)果集一個(gè)元組。 result3 = cursor.fetchall() print("**"*100) print("\033[92m" + "MySQL表數(shù)據(jù)量統(tǒng)計(jì):" + "\033[0m") print("--"*60) # 獲取列名 columns = [column[0] for column in cursor.description] # 打印列名 print('xh',end='|') for col in columns: print(col,end='|') print() # 打印結(jié)果 count = 1 for rows3 in result3: print(count,end='|') for j in range(len(rows3)): print(rows3[j],end='|') count = count + 1 print() print("**" * 100) print("\033[92m" + "MySQL中不存在的表:" + "\033[0m") print("--"*60) for index,table in enumerate(table_not_exists_list,start=1): print(index,table,sep='|') print("**" * 100) print(f"\033[92m" + f"業(yè)務(wù)日期{data_date_str}沒(méi)有數(shù)據(jù)的表: " + "\033[0m") print("--"*60) for index,table in enumerate(dataDate_no_data_list,start=1): print(index,table,sep='|') # 關(guān)閉游標(biāo)和連接 cursor.close() conn.commit() conn.close()
執(zhí)行結(jié)果

本文來(lái)自博客園,作者:業(yè)余磚家,轉(zhuǎn)載請(qǐng)注明原文鏈接:http://www.rzrgm.cn/yeyuzhuanjia/p/18375375

浙公網(wǎng)安備 33010602011771號(hào)