AI 監(jiān)控平臺產(chǎn)品分析--Evidentlyai產(chǎn)品實(shí)踐指南
大數(shù)據(jù),人工智能浪潮席卷IT行業(yè),最近數(shù)月Gen AI新產(chǎn)品頻頻亮相。讓人不得不感嘆,AI時代已經(jīng)到來。對企業(yè)來說一般會構(gòu)建AI平臺,包括AI開發(fā),訓(xùn)練平臺,發(fā)布平臺等。
但是,一般來說AI模型是面向特定的業(yè)務(wù)主題,而且對業(yè)務(wù)影響較大,從開發(fā)到上線周期較長,而且,對于AI模型的性能監(jiān)控也是非常重要的一環(huán),如果沒有相應(yīng)的監(jiān)控平臺,我想大多數(shù)AI模型是不敢上線使用的。這也是AI 監(jiān)控平臺的主要作用:
監(jiān)控模型質(zhì)量,以便及時對模型進(jìn)行重新訓(xùn)練和調(diào)整。
本文從AI平臺整體架構(gòu),AI平臺能力鏈條,evidentlyai產(chǎn)品實(shí)踐等方面展開。
ai平臺架構(gòu):

整體上分為三層:
1.基礎(chǔ)能力層,包括CPU,GPU,存儲等基礎(chǔ)設(shè)施能力
2.技術(shù)平臺層,包括數(shù)據(jù)處理,模型開發(fā),模型運(yùn)行,管理,監(jiān)控等模塊
3.AI應(yīng)用層,包括各種類型的AI模型
AI平臺能力鏈條:

從AI平臺的能力上來看,主要分為:
1.業(yè)務(wù)理解,對應(yīng)可視化建模平臺
2.數(shù)據(jù)處理,對應(yīng)為數(shù)據(jù)處理平臺,數(shù)據(jù)標(biāo)簽平臺
3.模型開發(fā)平臺,對應(yīng)為model開發(fā)IDE notebook
4.模型評估和上線
5.模型運(yùn)行平臺,對應(yīng)模型運(yùn)行平臺和監(jiān)控平臺
通過模型的監(jiān)控,發(fā)現(xiàn)模型質(zhì)量的偏差和問題,進(jìn)而及時對模型進(jìn)行調(diào)整,這樣形成模型開發(fā)的閉環(huán),確保模型能夠持續(xù)改進(jìn),適應(yīng)業(yè)務(wù)的需求。
模型監(jiān)控及模型監(jiān)控工具--evidentlyai
關(guān)于模型監(jiān)控,一般從三個層面來進(jìn)行考慮:

1.運(yùn)維層面:包括模型的訪問次數(shù),訪問延遲時間,CPU/MEM/IO等系統(tǒng)狀況
2.模型性能層面:包括用來識別concept drift的RMSE,AUC-ROC,KS統(tǒng)計等指標(biāo)
3.模型穩(wěn)定性矩陣:包括PSI指數(shù)和CSI指數(shù)
我們可以從下面這些方面來進(jìn)行考慮:

1. 數(shù)據(jù)質(zhì)量和完整性
--驗證輸入數(shù)據(jù)是否符合我們的期望至關(guān)重要。檢查可能包括范圍合規(guī)性、數(shù)據(jù)分布、特征統(tǒng)計、 相關(guān)性或我們認(rèn)為數(shù)據(jù)集“正常”的任何行為。
--確認(rèn)我們正在提供模型可以處理的數(shù)據(jù)
2. 數(shù)據(jù)和目標(biāo)漂移
--當(dāng)模型接收到它在訓(xùn)練中沒有看到的數(shù)據(jù)時,我們可能會遇到數(shù)據(jù)漂移。
--當(dāng)現(xiàn)實(shí)世界的模式發(fā)生變化,我們可能會遇到概念漂移。(模型不再適用,如: 影響所有客戶行為的全球流行病,出現(xiàn)新的影響因素 )
-- 目標(biāo)是獲得概念或數(shù)據(jù)發(fā)生變化的早期信號,及時更新我們的模型了
3. 模型性能
-- 將模型的預(yù)測與實(shí)際值進(jìn)行對比,對比KPI如:分類的Precision/Recall、回歸的 RMSE
以上面的三條為評判依據(jù),我們搜集了業(yè)界市面上的AI監(jiān)控平臺及解決方案,并進(jìn)行了對比:

綜合發(fā)現(xiàn),Evidently是一款能夠滿足我們需求的開源產(chǎn)品,所以,先鎖定Evidently進(jìn)行研究。
---------------------------------------------------分割線---------------------------------------------------------------------------
進(jìn)入官網(wǎng),映入眼簾的是對產(chǎn)品的介紹:
The open-source ML observability platform
Evaluate, test, and monitor ML models from validation to production.
From tabular data to NLP and LLM. Built for data scientists and ML engineers.
研究過程我就不在此墜述了,直接上我們的解決方案吧,我們目前部署模型,暴露為api方式供調(diào)用,Evidently提供了多種接入和使用方式:
1.以python包的形式引入,可在開發(fā)過程中可視化模型性能指標(biāo),并生成html報告。

2.針對批量模型,可以結(jié)合調(diào)度工具airflow,定時批量生成報告,結(jié)合dashboard進(jìn)行展示

dashboard:

3.針對實(shí)時場景,可以使用Granfana + Prometheus + Evidently來實(shí)現(xiàn)實(shí)時監(jiān)控

以上三種都是在官網(wǎng)提供的參考指南,可以在下面網(wǎng)址找到不同案例的實(shí)踐指南:
https://docs.evidentlyai.com/integrations/evidently-integrations
https://github.com/evidentlyai/evidently/tree/main/examples/integrations
整體上來講,第一種是基礎(chǔ),第二種具備可實(shí)施的條件,第三種dashboard還不夠全面,實(shí)施難度較大。選擇第二種方案來驗證。
------------------------------------------分割線------------------------------------------------------------
我們本地開發(fā)了一個二手車估值的regression模型,用這個例子來做驗證。
1.下載evidentlyai到機(jī)器
git clone git@github.com:evidentlyai/evidently.git
2.我們主要使用兩個項目:

airflow_drift_detection使用airflow創(chuàng)建pipline觸發(fā)生成模型性能質(zhì)量報告,streamlit_dashboard用來進(jìn)行報告展示。
3.airflow_drift_detection 安裝
修改dckerfile/Dockerfile
FROM puckel/docker-airflow:1.10.9
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
RUN pip install evidently==0.2.0
#RUN useradd -d /home/ubuntu -ms /bin/bash -g root -G sudo -p ubuntu ubuntu
#RUN mkdir /opt/myvolume && chown ubuntu /opt/myvolume
#WORKDIR /home/ubuntu
#VOLUME /opt/myvolume
修改docker-compose.yml 主要修改目錄映射關(guān)系,將報告直接生成到streamlit_dashboard中對應(yīng)的目錄
version: '3.7'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
logging:
options:
max-size: 10m
max-file: "3"
webserver:
build: ./dockerfiles
user: "airflow:airflow"
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/usr/local/airflow/dags
- ../streamlit_dashboard/projects:/usr/local/airflow/reports
#- ./evidently_reports:/usr/local/airflow/reports
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
volumes:
evidently_reports:
到airflow_drift_detection 根目錄:docker compose up --build -d
可能遇到無法生成報告問題,江睦路權(quán)限修改為777即可:chmod 777 ../streamlit_dashboard/projects
訪問地址:********:8080/admin/
4.streamlit_dashboard安裝
sudo su
切換到root
創(chuàng)建虛擬環(huán)境:
cd /home/uradataplatform/
python3 -m venv .venv
source ./venv/bin/activate
進(jìn)入虛擬環(huán)境
進(jìn)入項目目錄:
cd /home/uradataplatform/sc/streamlit_dashboard
執(zhí)行命令:
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
啟動程序:
cd /home/uradataplatform/sc/streamlit_dashboard/
cd streamlit-app
streamlit run app.py &
訪問地址:********:8051
5.pipline開發(fā):
目前evidentlyai預(yù)置了7種報告:Data Quality,Data Drift,Target Drift,Classification performance,Regression performance,Text Overview,NoTargetPerformance
這里選擇Data Drift,Target Drift,Regression performance三種報告展示:
1>Data Drift report
try: import os from datetime import datetime from datetime import timedelta import psycopg2 #python用來操作postgreSQL數(shù)據(jù)庫的第三方庫 import pandas as pd import pandas as pd from airflow import DAG from airflow.operators.python_operator import PythonOperator from sklearn import datasets from evidently.metric_preset import DataDriftPreset from evidently.pipeline.column_mapping import ColumnMapping from evidently.report import Report except Exception as e: print("Error {} ".format(e)) dir_path = "reports" file_path = "used_car_valuation_data_drift_report.html" project_name = "used_car_valuation" #timstamp_area = "2023-10-10_2023-10-18" # 獲取當(dāng)前日期和時間 now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") #yesterday yesterday = now - timedelta(days=1) format_yesterday =yesterday.strftime("%Y-%m-%d") timstamp_area=format_yesterday+"_"+format_today full_path = dir_path+'/'+project_name+'/reports/'+timstamp_area def load_data_execute(**context): # 連接到一個給定的數(shù)據(jù)庫 conn = psycopg2.connect(database="radarSmartcustoms",user="radarSmartcustoms", password='', host="129.184.13.155", port='5433') cursor = conn.cursor() # 連接游標(biāo) #獲取數(shù)據(jù)表1中的列名 sql1_text="""select string_agg(column_name,',') from information_schema.columns where table_schema='public' and table_name='valuation_model_res' """ cursor.execute(sql1_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data1 = cursor.fetchall() # 獲取sql1_text中全部數(shù)據(jù),此數(shù)據(jù)為嵌套元組數(shù)據(jù)(元組列表) #獲取數(shù)據(jù)表1中的數(shù)據(jù) #sql2_text = "select * from valuation_model_res" now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") sql2_text = "select * from public.valuation_model_res order by id desc limit 40" #sql2_text = "select vmr.* from public.valuation_model_res vmr,public.sad_item_basic_info sibi where vmr.uuid =sibi.uuid and sibi.inspect_date ='"+format_today+"'" cursor.execute(sql2_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data2 = cursor.fetchall() # 獲取sql2_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df1=pd.DataFrame(list(data2),columns=columns_name) columns_name del df1['id'] del df1['uuid'] del df1['item_no'] del df1['cost_insurance_freight'] del df1['free_on_board'] #featrues data drift 不需要predict,target del df1['predict_price'] del df1['declared_price'] #df1.rename(columns={"predict_price": "prediction"}, inplace=True) #df1.rename(columns={"declared_price": "target"}, inplace=True) df1['threshold'] = df1['threshold'].astype(float) df1['ratio'] = df1['ratio'].astype(float) #df1['diffrence'] = df1['diffrence'].astype(float) #df1['prediction'] = df1['prediction'].astype(float) #df1['target'] = df1['target'].astype(float) #reference data sql3_text = "select * from public.valuation_model_reference" cursor.execute(sql3_text) #執(zhí)行SQL語句 data3 = cursor.fetchall() # 獲取sql3_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df2=pd.DataFrame(list(data3),columns=columns_name) columns_name del df2['id'] del df2['uuid'] del df2['item_no'] del df2['cost_insurance_freight'] del df2['free_on_board'] #featrues data drift 不需要predict,target del df2['predict_price'] del df2['declared_price'] #df2.rename(columns={"predict_price": "prediction"}, inplace=True) #df2.rename(columns={"declared_price": "target"}, inplace=True) df2['threshold'] = df2['threshold'].astype(float) df2['ratio'] = df2['ratio'].astype(float) #df2['diffrence'] = df2['diffrence'].astype(float) #df2['prediction'] = df2['prediction'].astype(float) #df2['target'] = df2['target'].astype(float) cursor.close() # 關(guān)閉游標(biāo) conn.close() # 關(guān)閉數(shù)據(jù)庫連接--不需要使用數(shù)據(jù)庫時,及時關(guān)閉數(shù)據(jù)庫,可以減少所占內(nèi)存 data_columns = ColumnMapping() data_columns.numerical_features = [ "mileage", #"target", #"prediction", "threshold", "ratio", #"diffrence", "flag" ] data_columns.categorical_features = ["maker", "country","drive","body_type","model","fuel"] context["ti"].xcom_push(key="data_frame", value=df1) context["ti"].xcom_push(key="data_frame_reference", value=df2) context["ti"].xcom_push(key="data_columns", value=data_columns) def drift_analysis_execute(**context): data = context.get("ti").xcom_pull(key="data_frame") data_reference = context.get("ti").xcom_pull(key="data_frame_reference") data_columns = context.get("ti").xcom_pull(key="data_columns") data_drift_report = Report(metrics=[DataDriftPreset()]) data_drift_report.run(reference_data=data_reference[:40], current_data=data[:40], column_mapping=data_columns) try: if os.path.exists(full_path): print('Current folder exists') else: print('Current folder not exists') #create file folder #os.mkdir(dir_path) os.makedirs(full_path) print("Creation of the directory {} succeed".format(full_path)) except OSError: print("Creation of the directory {} failed".format(full_path)) data_drift_report.save_html(os.path.join(full_path, file_path)) with DAG( dag_id="used_car_valuation_data_drift_report", schedule_interval="@daily", default_args={ "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), "start_date": datetime(2023, 10, 19), }, catchup=False, ) as f: load_data_execute = PythonOperator( task_id="load_data_execute", python_callable=load_data_execute, provide_context=True, op_kwargs={"parameter_variable": "parameter_value"}, # not used now, may be used to specify data ) drift_analysis_execute = PythonOperator( task_id="drift_analysis_execute", python_callable=drift_analysis_execute, provide_context=True, ) load_data_execute >> drift_analysis_execute
報告樣例:

2>.Target Drift
try: import os from datetime import datetime from datetime import timedelta import psycopg2 #python用來操作postgreSQL數(shù)據(jù)庫的第三方庫 import pandas as pd import pandas as pd from airflow import DAG from airflow.operators.python_operator import PythonOperator from sklearn import datasets from evidently.metric_preset import TargetDriftPreset from evidently.pipeline.column_mapping import ColumnMapping from evidently.report import Report except Exception as e: print("Error {} ".format(e)) dir_path = "reports" file_path = "used_car_valuation_target_drift_report.html" project_name = "used_car_valuation" #timstamp_area = "2023-10-10_2023-10-18" # 獲取當(dāng)前日期和時間 now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") #yesterday yesterday = now - timedelta(days=1) format_yesterday =yesterday.strftime("%Y-%m-%d") timstamp_area=format_yesterday+"_"+format_today full_path = dir_path+'/'+project_name+'/reports/'+timstamp_area def load_data_execute(**context): # 連接到一個給定的數(shù)據(jù)庫 conn = psycopg2.connect(database="radarSmartcustoms",user="radarSmartcustoms", password='', host="129.184.13.155", port='5433') cursor = conn.cursor() # 連接游標(biāo) #獲取數(shù)據(jù)表1中的列名 sql1_text="""select string_agg(column_name,',') from information_schema.columns where table_schema='public' and table_name='valuation_model_res' """ cursor.execute(sql1_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data1 = cursor.fetchall() # 獲取sql1_text中全部數(shù)據(jù),此數(shù)據(jù)為嵌套元組數(shù)據(jù)(元組列表) #獲取數(shù)據(jù)表1中的數(shù)據(jù) #sql2_text = "select * from valuation_model_res" now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") sql2_text = "select * from public.valuation_model_res order by id desc limit 40" #sql2_text = "select vmr.* from public.valuation_model_res vmr,public.sad_item_basic_info sibi where vmr.uuid =sibi.uuid and sibi.inspect_date ='"+format_today+"'" cursor.execute(sql2_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data2 = cursor.fetchall() # 獲取sql2_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df1=pd.DataFrame(list(data2),columns=columns_name) columns_name del df1['id'] del df1['uuid'] del df1['item_no'] del df1['cost_insurance_freight'] del df1['free_on_board'] df1.rename(columns={"predict_price": "prediction"}, inplace=True) df1.rename(columns={"declared_price": "target"}, inplace=True) df1['threshold'] = df1['threshold'].astype(float) df1['ratio'] = df1['ratio'].astype(float) df1['diffrence'] = df1['diffrence'].astype(float) df1['prediction'] = df1['prediction'].astype(float) df1['target'] = df1['target'].astype(float) #reference data sql3_text = "select * from public.valuation_model_reference" cursor.execute(sql3_text) #執(zhí)行SQL語句 data3 = cursor.fetchall() # 獲取sql3_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df2=pd.DataFrame(list(data3),columns=columns_name) columns_name del df2['id'] del df2['uuid'] del df2['item_no'] del df2['cost_insurance_freight'] del df2['free_on_board'] df2.rename(columns={"predict_price": "prediction"}, inplace=True) df2.rename(columns={"declared_price": "target"}, inplace=True) df2['threshold'] = df2['threshold'].astype(float) df2['ratio'] = df2['ratio'].astype(float) df2['diffrence'] = df2['diffrence'].astype(float) df2['prediction'] = df2['prediction'].astype(float) df2['target'] = df2['target'].astype(float) cursor.close() # 關(guān)閉游標(biāo) conn.close() # 關(guān)閉數(shù)據(jù)庫連接--不需要使用數(shù)據(jù)庫時,及時關(guān)閉數(shù)據(jù)庫,可以減少所占內(nèi)存 data_columns = ColumnMapping() data_columns.numerical_features = [ "mileage", "target", "prediction", "threshold", "ratio", "diffrence", "flag" ] data_columns.categorical_features = ["maker", "country","drive","body_type","model","fuel"] context["ti"].xcom_push(key="data_frame", value=df1) context["ti"].xcom_push(key="data_frame_reference", value=df2) context["ti"].xcom_push(key="data_columns", value=data_columns) def drift_analysis_execute(**context): data = context.get("ti").xcom_pull(key="data_frame") data_reference = context.get("ti").xcom_pull(key="data_frame_reference") data_columns = context.get("ti").xcom_pull(key="data_columns") targer_drift_report = Report(metrics=[TargetDriftPreset()]) targer_drift_report.run(reference_data=data_reference[:40], current_data=data[:40], column_mapping=data_columns) try: if os.path.exists(full_path): print('Current folder exists') else: print('Current folder not exists') #create file folder #os.mkdir(dir_path) os.makedirs(full_path) print("Creation of the directory {} succeed".format(full_path)) except OSError: print("Creation of the directory {} failed".format(full_path)) targer_drift_report.save_html(os.path.join(full_path, file_path)) with DAG( dag_id="used_car_valuation_target_drift_report", schedule_interval="@daily", default_args={ "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), "start_date": datetime(2023, 10, 19), }, catchup=False, ) as f: load_data_execute = PythonOperator( task_id="load_data_execute", python_callable=load_data_execute, provide_context=True, op_kwargs={"parameter_variable": "parameter_value"}, # not used now, may be used to specify data ) drift_analysis_execute = PythonOperator( task_id="drift_analysis_execute", python_callable=drift_analysis_execute, provide_context=True, ) load_data_execute >> drift_analysis_execute
報告樣例:

3>.Regression performance
try: import os from datetime import datetime from datetime import timedelta import psycopg2 #python用來操作postgreSQL數(shù)據(jù)庫的第三方庫 import pandas as pd import pandas as pd from airflow import DAG from airflow.operators.python_operator import PythonOperator from sklearn import datasets from evidently.metric_preset import RegressionPreset from evidently.pipeline.column_mapping import ColumnMapping from evidently.report import Report except Exception as e: print("Error {} ".format(e)) dir_path = "reports" file_path = "used_car_valuation_performance_report.html" project_name = "used_car_valuation" #timstamp_area = "2023-10-10_2023-10-18" # 獲取當(dāng)前日期和時間 now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") #yesterday yesterday = now - timedelta(days=1) format_yesterday =yesterday.strftime("%Y-%m-%d") timstamp_area=format_yesterday+"_"+format_today full_path = dir_path+'/'+project_name+'/reports/'+timstamp_area def load_data_execute(**context): # 連接到一個給定的數(shù)據(jù)庫 conn = psycopg2.connect(database="radarSmartcustoms",user="radarSmartcustoms", password='', host="129.184.13.155", port='5433') cursor = conn.cursor() # 連接游標(biāo) #獲取數(shù)據(jù)表1中的列名 sql1_text="""select string_agg(column_name,',') from information_schema.columns where table_schema='public' and table_name='valuation_model_res' """ cursor.execute(sql1_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data1 = cursor.fetchall() # 獲取sql1_text中全部數(shù)據(jù),此數(shù)據(jù)為嵌套元組數(shù)據(jù)(元組列表) #獲取數(shù)據(jù)表1中的數(shù)據(jù) #sql2_text = "select * from valuation_model_res" now = datetime.now() # 格式化日期和時間 format_today = now.strftime("%Y-%m-%d") sql2_text = "select * from public.valuation_model_res order by id desc limit 40" #sql2_text = "select vmr.* from public.valuation_model_res vmr,public.sad_item_basic_info sibi where vmr.uuid =sibi.uuid and sibi.inspect_date ='"+format_today+"'" cursor.execute(sql2_text) #執(zhí)行SQL語句 # 獲取SELECT返回的元組 data2 = cursor.fetchall() # 獲取sql2_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df1=pd.DataFrame(list(data2),columns=columns_name) columns_name del df1['id'] del df1['uuid'] del df1['item_no'] del df1['cost_insurance_freight'] del df1['free_on_board'] df1.rename(columns={"predict_price": "prediction"}, inplace=True) df1.rename(columns={"declared_price": "target"}, inplace=True) df1['threshold'] = df1['threshold'].astype(float) df1['ratio'] = df1['ratio'].astype(float) df1['diffrence'] = df1['diffrence'].astype(float) df1['prediction'] = df1['prediction'].astype(float) df1['target'] = df1['target'].astype(float) #reference data sql3_text = "select * from public.valuation_model_reference" cursor.execute(sql3_text) #執(zhí)行SQL語句 data3 = cursor.fetchall() # 獲取sql3_text中全部數(shù)據(jù) #將獲得的列名元組數(shù)據(jù)轉(zhuǎn)換為列名列表數(shù)據(jù) columns_name = list(data1[0])[0].split(',') df2=pd.DataFrame(list(data3),columns=columns_name) columns_name del df2['id'] del df2['uuid'] del df2['item_no'] del df2['cost_insurance_freight'] del df2['free_on_board'] df2.rename(columns={"predict_price": "prediction"}, inplace=True) df2.rename(columns={"declared_price": "target"}, inplace=True) df2['threshold'] = df2['threshold'].astype(float) df2['ratio'] = df2['ratio'].astype(float) df2['diffrence'] = df2['diffrence'].astype(float) df2['prediction'] = df2['prediction'].astype(float) df2['target'] = df2['target'].astype(float) cursor.close() # 關(guān)閉游標(biāo) conn.close() # 關(guān)閉數(shù)據(jù)庫連接--不需要使用數(shù)據(jù)庫時,及時關(guān)閉數(shù)據(jù)庫,可以減少所占內(nèi)存 data_columns = ColumnMapping() data_columns.numerical_features = [ "mileage", "target", "prediction", "threshold", "ratio", "diffrence", "flag" ] data_columns.categorical_features = ["maker", "country","drive","body_type","model","fuel"] context["ti"].xcom_push(key="data_frame", value=df1) context["ti"].xcom_push(key="data_frame_reference", value=df2) context["ti"].xcom_push(key="data_columns", value=data_columns) def drift_analysis_execute(**context): data = context.get("ti").xcom_pull(key="data_frame") data_reference = context.get("ti").xcom_pull(key="data_frame_reference") data_columns = context.get("ti").xcom_pull(key="data_columns") performance_report = Report(metrics=[RegressionPreset()]) performance_report.run(reference_data=data_reference[:40], current_data=data[:40], column_mapping=data_columns) try: if os.path.exists(full_path): print('Current folder exists') else: print('Current folder not exists') #create file folder #os.mkdir(dir_path) os.makedirs(full_path) print("Creation of the directory {} succeed".format(full_path)) except OSError: print("Creation of the directory {} failed".format(full_path)) performance_report.save_html(os.path.join(full_path, file_path)) with DAG( dag_id="used_car_valuation_performance_report", schedule_interval="@daily", default_args={ "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), "start_date": datetime(2023, 10, 19), }, catchup=False, ) as f: load_data_execute = PythonOperator( task_id="load_data_execute", python_callable=load_data_execute, provide_context=True, op_kwargs={"parameter_variable": "parameter_value"}, # not used now, may be used to specify data ) drift_analysis_execute = PythonOperator( task_id="drift_analysis_execute", python_callable=drift_analysis_execute, provide_context=True, ) load_data_execute >> drift_analysis_execute
報告樣例:

關(guān)于三個報告的指標(biāo)的詳細(xì)解讀,請參見官方文檔: https://docs.evidentlyai.com/presets/data-drift
寫在最后,關(guān)于AI監(jiān)控平臺的研究,以及evidentyai的實(shí)踐都是基于互聯(lián)網(wǎng)資料的學(xué)習(xí)所得,分享給各位同學(xué),供大家簡單參考。


浙公網(wǎng)安備 33010602011771號