Flink SQL的 4 種時間窗口語義
DML:窗口聚合
Flink SQL 中支持的 4 種窗口的運算。
- ? 滾動窗口(TUMBLE)
- ? 滑動窗口(HOP)
- ? Session 窗口(SESSION)
- ? 漸進式窗口(CUMULATE)
1.滾動窗口(TUMBLE)
- ? 滾動窗口定義:滾動窗口將每個元素指定給指定窗口大小的窗口。滾動窗口具有固定大小,且不重疊。例如,指定一個大小為 5 分鐘的滾動窗口。在這種情況下,Flink 將每隔 5 分鐘開啟一個新的窗口,其中每一條數都會劃分到唯一一個 5 分鐘的窗口中,如下圖所示。
tumble window
- ? 應用場景:常見的按照一分鐘對數據進行聚合,計算一分鐘內 PV,UV 數據。
- ? 實際案例:簡單且常見的分維度分鐘級別同時在線用戶數、總銷售額。
那么上面這個案例的 SQL 要咋寫呢?
關于滾動窗口,在 1.13 版本之前和 1.13 及之后版本有兩種 Flink SQL 實現方式,分別是:
- ? Group Window Aggregation(1.13 之前只有此類方案,此方案在 1.13 及之后版本已經標記為廢棄,不推薦小伙伴萌使用)
- ? Windowing TVF(1.13 及之后建議使用 Windowing TVF)
這里兩種方法都會介紹:
- ? Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 數據處理邏輯
insert into sink_table
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 計算 uv 數
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute)
可以看到 Group Window Aggregation 滾動窗口的 SQL 語法就是把 tumble window 的聲明寫在了 group by 子句中,即 tumble(row_time, interval '1' minute),第一個參數為事件時間的時間戳;第二個參數為滾動窗口大小。
- ? Window TVF 方案(1.13 只支持 Streaming 任務):
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 數據處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as uv
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滾動窗口的寫法就是把 tumble window 的聲明寫在了數據源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分參數。
第一個參數 TABLE source_table 聲明數據源表;第二個參數 DESCRIPTOR(row_time) 聲明數據源的時間戳;第三個參數 INTERVAL '60' SECOND 聲明滾動窗口大小為 1 min。
- ? SQL 語義:
由于離線沒有相同的時間窗口聚合概念,這里就直接說實時場景 SQL 語義,假設 Orders 為 kafka,target_table 也為 Kafka,這個 SQL 生成的實時任務,在執行時,會生成三個算子:
- ? 數據源算子(From Order):連接到 Kafka topic,數據源算子一直運行,實時的從 Order Kafka 中一條一條的讀取數據,然后一條一條發送給下游的 窗口聚合算子
- ? 窗口聚合算子(TUMBLE 算子):接收到上游算子發的一條一條的數據,然后將每一條數據按照時間戳劃分到對應的窗口中(根據事件時間、處理時間的不同語義進行劃分),上述案例為事件時間,事件時間中,滾動窗口算子接收到上游的 Watermark 大于窗口的結束時間時,則說明當前這一分鐘的滾動窗口已經結束了,將窗口計算完的結果發往下游算子(一條一條發給下游 數據匯算子)
- ? 數據匯算子(INSERT INTO target_table):接收到上游發的一條一條的數據,寫入到 target_table Kafka 中
這個實時任務也是 24 小時一直在運行的,所有的算子在同一時刻都是處于 running 狀態的。
注意: 事件時間中滾動窗口的窗口計算觸發是由 Watermark 推動的。
2.滑動窗口(HOP)
- ? 滑動窗口定義:滑動窗口也是將元素指定給固定長度的窗口。與滾動窗口功能一樣,也有窗口大小的概念。不一樣的地方在于,滑動窗口有另一個參數控制窗口計算的頻率(滑動窗口滑動的步長)。因此,如果滑動的步長小于窗口大小,則滑動窗口之間每個窗口是可以重疊。在這種情況下,一條數據就會分配到多個窗口當中。舉例,有 10 分鐘大小的窗口,滑動步長為 5 分鐘。這樣,每 5 分鐘會劃分一次窗口,這個窗口包含的數據是過去 10 分鐘內的數據,如下圖所示。
hop window
- ? 應用場景:比如計算同時在線的數據,要求結果的輸出頻率是 1 分鐘一次,每次計算的數據是過去 5 分鐘的數據(有的場景下用戶可能在線,但是可能會 2 分鐘不活躍,但是這也要算在同時在線數據中,所以取最近 5 分鐘的數據就能計算進去了)
- ? 實際案例:簡單且常見的分維度分鐘級別同時在線用戶數,1 分鐘輸出一次,計算最近 5 分鐘的數據
依然是 Group Window Aggregation、Windowing TVF 兩種方案:
- ? Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數據處理邏輯
insert into sink_table
SELECT dim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,
count(distinct user_id) as uv
FROM source_table
GROUP BY dim
, hop(row_time, interval '1' minute, interval '5' minute)
可以看到 Group Window Aggregation 滾動窗口的寫法就是把 hop window 的聲明寫在了 group by 子句中,即 hop(row_time, interval '1' minute, interval '5' minute)。其中:
第一個參數為事件時間的時間戳;第二個參數為滑動窗口的滑動步長;第三個參數為滑動窗口大小。
- ? Windowing TVF 方案(1.13 只支持 Streaming 任務):
-- 數據源表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數據處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(distinct user_id) as bucket_uv
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滾動窗口的寫法就是把 hop window 的聲明寫在了數據源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES)),包含四部分參數:
第一個參數 TABLE source_table 聲明數據源表;第二個參數 DESCRIPTOR(row_time) 聲明數據源的時間戳;第三個參數 INTERVAL '1' MINUTES 聲明滾動窗口滑動步長大小為 1 min。第四個參數 INTERVAL '5' MINUTES 聲明滾動窗口大小為 5 min。
- ? SQL 語義:
滑動窗口語義和滾動窗口類似,這里不再贅述。
3.Session 窗口(SESSION)
- ? Session 窗口定義:Session 時間窗口和滾動、滑動窗口不一樣,其沒有固定的持續時間,如果在定義的間隔期(Session Gap)內沒有新的數據出現,則 Session 就會窗口關閉。如下圖對比所示:
session window
- ? 實際案例:計算每個用戶在活躍期間(一個 Session)總共購買的商品數量,如果用戶 5 分鐘沒有活動則視為 Session 斷開
目前 1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以這里就只介紹 Group Window Aggregation 方案:
- ? Group Window Aggregation 方案(支持 Batch\Streaming 任務):
-- 數據源表,用戶購買行為記錄表
CREATE TABLE source_table (
-- 維度數據
dim STRING,
-- 用戶 id
user_id BIGINT,
-- 用戶
price BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數據匯表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT, -- 購買商品數量
window_start bigint
) WITH (
'connector' = 'print'
);
-- 數據處理邏輯
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,
count(1) as pv
FROM source_table
GROUP BY dim
, session(row_time, interval '5' minute)
注意: 上述 SQL 任務是在整個 Session 窗口結束之后才會把數據輸出。Session 窗口即支持 處理時間 也支持 事件時間。但是處理時間只支持在 Streaming 任務中運行,Batch 任務不支持。
可以看到 Group Window Aggregation 中 Session 窗口的寫法就是把 session window 的聲明寫在了 group by 子句中,即 session(row_time, interval '5' minute)。其中:
第一個參數為事件時間的時間戳;第二個參數為 Session gap 間隔。
? SQL 語義:
Session 窗口語義和滾動窗口類似,這里不再贅述。
4.漸進式窗口(CUMULATE)
- ? 漸進式窗口定義(1.13 只支持 Streaming 任務):漸進式窗口在其實就是 固定窗口間隔內提前觸發的的滾動窗口,其實就是 Tumble Window + early-fire 的一個事件時間的版本。例如,從每日零點到當前這一分鐘繪制累積 UV,其中 10:00 時的 UV 表示從 00:00 到 10:00 的 UV 總數。漸進式窗口可以認為是首先開一個最大窗口大小的滾動窗口,然后根據用戶設置的觸發的時間間隔將這個滾動窗口拆分為多個窗口,這些窗口具有相同的窗口起點和不同的窗口終點。如下圖所示:
cumulate window
- ? 應用場景:周期內累計 PV,UV 指標(如每天累計到當前這一分鐘的 PV,UV)。這類指標是一段周期內的累計狀態,對分析師來說更具統計分析價值,而且幾乎所有的復合指標都是基于此類指標的統計(不然離線為啥都要累計一天的數據,而不要一分鐘累計的數據呢)。
- ? 實際案例:每天的截止當前分鐘的累計 money(sum(money)),去重 id 數(count(distinct id))。每天代表漸進式窗口大小為 1 天,分鐘代表漸進式窗口移動步長為分鐘級別。舉例如下:
明細輸入數據:
|
time |
id |
money |
|
2021-11-01 00:01:00 |
A |
3 |
|
2021-11-01 00:01:00 |
B |
5 |
|
2021-11-01 00:01:00 |
A |
7 |
|
2021-11-01 00:02:00 |
C |
3 |
|
2021-11-01 00:03:00 |
C |
10 |
預期經過漸進式窗口計算的輸出數據:
|
time |
count distinct id |
sum money |
|
2021-11-01 00:01:00 |
2 |
15 |
|
2021-11-01 00:02:00 |
3 |
18 |
|
2021-11-01 00:03:00 |
3 |
28 |
轉化為折線圖長這樣:
當日累計
可以看到,其特點就在于,每一分鐘的輸出結果都是當天零點累計到當前的結果。
漸進式窗口目前只有 Windowing TVF 方案支持:
- ? Windowing TVF 方案(1.13 只支持 Streaming 任務):
-- 數據源表
CREATE TABLE source_table (
-- 用戶 id
user_id BIGINT,
-- 用戶
money BIGINT,
-- 事件時間戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 設置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 數據匯表
CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);
-- 數據處理邏輯
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end
可以看到 Windowing TVF 滾動窗口的寫法就是把 cumulate window 的聲明寫在了數據源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分參數:
第一個參數 TABLE source_table 聲明數據源表;第二個參數 DESCRIPTOR(row_time) 聲明數據源的時間戳;第三個參數 INTERVAL '60' SECOND 聲明漸進式窗口觸發的漸進步長為 1 min。第四個參數 INTERVAL '1' DAY 聲明整個漸進式窗口的大小為 1 天,到了第二天新開一個窗口重新累計。
? SQL 語義:
漸進式窗口語義和滾動窗口類似,這里不再贅述。
5.Window TVF 支持 Grouping Sets、Rollup、Cube
具體應用場景:實際的案例場景中,經常會有多個維度進行組合(cube)計算指標的場景。如果把每個維度組合的代碼寫一遍,然后 union all 起來,這樣寫起來非常麻煩,而且會導致一個數據源讀取多遍。
這時,有離線 Hive SQL 使用經驗的小伙伴萌就會想到,如果有了 Grouping Sets,我們就可以直接用 Grouping Sets 將維度組合寫在一條 SQL 中,寫起來方便并且執行效率也高。當然,Flink 支持這個功能。
但是目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。
來一個實際案例感受一下,計算每日零點累計到當前這一分鐘的分匯總、age、sex、age+sex 維度的用戶數。
-- 用戶訪問明細表
CREATE TABLE source_table (
age STRING,
sex STRING,
user_id BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.age.length' = '1',
'fields.sex.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000'
);
CREATE TABLE sink_table (
age STRING,
sex STRING,
uv BIGINT,
window_end bigint
) WITH (
'connector' = 'print'
);
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end,
-- grouping sets 寫法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
Flink SQL 中 Grouping Sets 的語法和 Hive SQL 的語法有一些不同,如果我們使用 Hive SQL 實現上述 SQL 的語義,其實現如下:
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM source_table
GROUP BY
age
, sex
-- hive sql grouping sets 寫法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
本文來自博客園,作者:業余磚家,轉載請注明原文鏈接:http://www.rzrgm.cn/yeyuzhuanjia/p/18908179

浙公網安備 33010602011771號