dolphinscheduler簡單任務定義及復雜的跨節點傳參
dolphinscheduler簡單任務定義及跨節點傳參
轉載請注明出處 http://www.rzrgm.cn/funnyzpc/p/16395094.html
寫在前面
dolphinscheduler是一款非常不錯的調度工具,本文我就簡稱ds啦,可單機可集群可容器,可調度sql、存儲過程、http、大數據,也可使用shell、python、java、flink等語言及工具,功能強大類型豐富,適合各類調度型任務,社區及項目也十分活躍,現在github中已有8.2k的star??
所以,本篇博文開始會逐步講一些ds相關的東西,也期待各位同行能接觸到此并能實際解決一些生產上的問題~??
一.準備工作
閱讀本博文前建議您先閱讀下官方的文檔[https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(https://dolphinscheduler.apache.org/zh-cn/docs/latest/user_doc/guide/parameter/context.html)(雖然也會碰到一些坑??)
這里,先準備下sql表資源,以下為`postgresql`的`sql`腳本:
(表結構)
CREATE TABLE dolphinscheduler.tmp (
id int4 NOT NULL,
"name" varchar(50) NULL,
"label" varchar(50) NULL,
update_time timestamp NULL,
score int4 NULL,
CONSTRAINT tmp_pkey PRIMARY KEY (id)
);
(表數據)
INSERT INTO tmp (id,"name","label",update_time,score) VALUES
(3,'二狗子','','2022-07-06 21:49:26.872',NULL),
(2,'馬云云','',NULL,NULL),
(1,'李思','','2022-07-05 19:54:31.880',85);
因為個人使用的postgresql的數據庫,如果您是mysql或者其他數據的用戶,請自行更改以上表和數據并添加到庫中即可
表及數據入庫,請將tmp所屬的庫配置到 ds后臺->數據源中心->創建數據源 ,以下是我的配置,記住,這里面的所有數據庫配置均遵守所屬數據庫類型的jdbc的driver的配置參數,配置完成也會在ds的數據庫生成一條jdbc的連接地址,這點要明白~

二.簡單的項目創建及說明
因為`ds`的任務是配置在項目下面,所以第一步得新建一個項目,這樣:`ds后臺`->`項目管理`->`創建項目`,這是我創建的,請看:

準備完項目之后,鼠標點進去,并進入到 工作流定義菜單 頁面,如下圖:

先簡單到解釋下ds的一點兒基本結構,首先,ds一般部署在linux服務器下,創建任務的用戶需要在admin賬戶下創建,重要的是創建的每個工作賬戶需要與操作系統用戶一一對應,比如你創建了一個 test 的ds賬戶,那ds所在的服務器也必須有一個test的賬戶才可行,這是ds的規則,我沒法解釋為什么。
每個用戶下(除了admin外)所能創建的調度任務均在各自創建的項目下,每個項目又分為多個任務(工作流定義),一個任務下又可分為多個任務節點,下圖為任務定義:

ok,如果已經準備好以上步驟,下面開始定義一個簡單的調度任務,繼續哈~
三.簡單的參數傳遞
先看表:

我們先做個簡單的,比如圖中,如果二狗子的本名叫:李思,需要我們取id=1的name放到id=3的label中,并且更新update_time
-
1.這里第一步 在工作流定義列表,點擊
創建工作流就進入一個具體的任務(工作流)的定義,同時我們使用的是sql任務,所以就需要從左側拖動一個sql任務到畫布中(右側空白處):
![]()
因為拖動sql任務到畫布會自動彈出節點定義,上圖為當前節點的一個定義,重點是:數據源、sql類型、sql語句,如官方所說,如果將name傳遞到下游,則需要在自定義參數重定義這個name為out方向類型為varchar。 -
2.因為傳遞到參數需要寫入到表,這里我們再定義一個節點,這個節點負責接收上游傳遞到
name,執行update時使用這個name,以下是我的定義:
![]()
看到沒,這里不僅僅要注意sql類型(sql類型與sql語句是一一對應的,類型不能錯) ,還有就是前置任務一定要選中(上面定義的)node1節點。
另外,需要注意的是當前任務是上下游傳參,所以在node2中是直接使用node1中定義的name這個參數哈 -
3.定義完成當前任務就需要保存:點右上角保存,填寫并保存后點關閉以退出定義:
![]()
-
4.因為定義的任務需要上線了才可執行,所以,在工作流定義列表先點該任務的
黃色按鈕(任務上線),然后才是點綠色按鈕(執行任務):
![]()
-
5.任務執行成功與否,具體得看任務實例,這是執行
node2節點的日志:
![]()
順帶再看看數據庫表是否真實成功:
![]()
完美??
四.復雜的跨節點傳參
首先看表:

思考一個問題:可以看到李思的score是85,根據score應該被評為 B(>=90的為A)并寫入到label字段,該怎么辦呢,如果這個分數是90分又該怎么辦呢,如果根本沒有score(分值) 這個任務是不是就不需要更新李思的label(評分)呢?
對于上面問題可以有一些偏門的解決方法,比如在sql中塞一個異常值,這樣看似不錯,不過作為調度工具建議還是在condition節點或者switch節點處理是最好的,不過就目前我用的2.0.5版本的ds對于這兩類任務節點是沒法接收參數的,這是一個遺憾;遂~個人覺得較好的方式是在寫入節點之前增加一個判斷節點,將錯誤拋出(沒有score的)最好~,對于此,我使用了一個shell的中間節點。
下面是我定義的三個節點:
-
node1節點定義:
![]()
-
node2節點定義:
![]()
(腳本內容)
#!/bin/bash
echo "=====>input param start<====="
echo "id=${id}"
echo "score=${score}"
echo "=====>input param end<====="
id=${id}
echo '${setValue(id2='$id')}'
if [ "${score}" -ge "90" ];then
echo '${setValue(label2=level A)}'
echo "level A"
elif [ "${score}" -ge "80" ];then
echo '${setValue(label2=level B)}'
echo "level B"
elif [ "${score}" -ge "60" ];then
echo '${setValue(label2=level C)}'
echo "level C"
elif [ "${score}" -ge "0" ];then
echo '${setValue(label2=F!)}'
echo "F!"
else
echo "NO score ,please check!"
exit 1
fi
-
node3節點定義:
![]()
-
看一眼結果??:
![]()
五.中間的坑
對于復雜節點傳參數也碰到一些坑,這些坑大概有這些:
- 1.對于
shell腳本不熟悉的,判斷節點其實還是有一些難度的,這是很重要的一點 - 2.
node2(判斷節點)不能有重復的參數,不管局部的還是node1(上一級)傳遞過來的,均不能重復 - 3.因為在
node2(判斷節點)需要將id以及label繼續往下傳(tonode3),這時候就需要給id以及label定義一個映射的out變量(id2、label2) - 4.
node2中重新設置參數麻煩,需要在shell中重新定義變量(id2、label2),同時需要在shell任務內使用拼接的方式賦值(如:echo '${setValue(id2='$id')}') - 5.
sql類型以及不同節點下不同參數時常搞錯,不是任何節點都可以接收上級節點參數,以及局部變量與傳遞變量以及全局變量優先級區別及可能造成沖突 - 6.ds
列表傳參(2.0是不可以的)很雞肋,對于列表傳參又不能在下一級節點做循環賦值,這點對于ds是有改進的空間的 - 7.等等...
對于
ds還有很多可擴展的地方(因為實際需要),所以我就做了一些二次開發??,后面會聊...大家期待喲??











浙公網安備 33010602011771號