[源碼分析] 從實例和源碼入手看 Flink 之廣播 Broadcast
[源碼分析] 從實例和源碼入手看 Flink 之廣播 Broadcast
0x00 摘要
本文將通過源碼分析和實例講解,帶領大家熟悉Flink的廣播變量機制。
0x01 業務需求
1. 場景需求
對黑名單中的IP進行檢測過濾。IP黑名單的內容會隨時增減,因此是可以隨時動態配置的。
該黑名單假設存在mysql中,Flink作業啟動時候會把這個黑名單從mysql載入,作為一個變量由Flink算子使用。
2. 問題
我們不想重啟作業以便重新獲取這個變量。所以就需要一個能夠動態修改算子里變量的方法。
3. 解決方案
使用廣播的方式去解決。去做配置的動態更新。
廣播和普通的流數據不同的是:廣播流的1條流數據能夠被算子的所有分區所處理,而數據流的1條流數據只能夠被算子的某一分區處理。因此廣播流的特點也決定適合做配置的動態更新。
0x02 概述
廣播這部分有三個難點:使用步驟;如何自定義函數;如何存取狀態。下面就先為大家概述下。
1. broadcast的使用步驟
- 建立MapStateDescriptor
- 通過DataStream.broadcast方法返回廣播數據流BroadcastStream
- 通過DataStream.connect方法,把業務數據流和BroadcastStream進行連接,返回BroadcastConnectedStream
- 通過BroadcastConnectedStream.process方法分別進行processElement及processBroadcastElement處理
2. 用戶自定義處理函數
- BroadcastConnectedStream.process接收兩種類型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction
- 兩種類型的function都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,默認是空操作,允許子類重寫
- processElement處理業務數據流
- processBroadcastElement處理廣播數據流
3. Broadcast State
- Broadcast State始終表示為MapState,即map format。這是Flink提供的最通用的狀態原語。是托管狀態的一種,托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- 用戶必須創建一個
MapStateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱, 狀態所持有值的類型,并且可能包含用戶指定的函數 - checkpoint的時候也會checkpoint broadcast state
- Broadcast State只在內存有,沒有RocksDB state backend
- Flink 會將state廣播到每個task,注意該state并不會跨task傳播,對其修改僅僅是作用在其所在的task
- downstream tasks接收到broadcast event的順序可能不一樣,所以依賴其到達順序來處理element的時候要小心
0x03. 示例代碼
1. 示例代碼
我們直接從Flink源碼入手可以找到理想的示例。 以下代碼直接摘錄 Flink 源碼 StatefulJobWBroadcastStateMigrationITCase,我會在里面加上注釋說明。
@Test
def testRestoreSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 以下兩個變量是為了確定廣播流發出的數據類型,廣播流可以同時發出多種類型的數據
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
env.setStateBackend(new MemoryStateBackend)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
// 數據流,這里數據流和廣播流的Source都是同一種CheckpointedSource。數據流這里做了一系列算子操作,比如flatMap
val stream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
.flatMap(new StatefulFlatMapper)
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
// 廣播流
val broadcastStream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
// 把數據流和廣播流結合起來
stream
.connect(broadcastStream)
.process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
.addSink(new AccumulatorCountingSink)
}
}
// 用戶自定義的處理函數
class TestBroadcastProcessFunction
extends KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)] {
// 重點說明,這里的 firstBroadcastStateDesc,secondBroadcastStateDesc 其實和之前廣播流的那兩個MapStateDescriptor無關。
// 這里兩個MapStateDescriptor是為了存取BroadcastState,這樣在 processBroadcastElement和processElement之間就可以傳遞變量了。我們完全可以定義新的MapStateDescriptor,只要processBroadcastElement和processElement之間認可就行。
// 這里參數 "broadcast-state-1" 是name, flink就是用這個 name 來從Flink運行時系統中存取MapStateDescriptor
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
override def processElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext,
out: Collector[(Long, Long)]): Unit = {
// 這里Flink源碼中是直接把接受到的業務變量直接再次轉發出去
out.collect(value)
}
override def processBroadcastElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#Context,
out: Collector[(Long, Long)]): Unit = {
// 這里是把最新傳來的廣播變量存儲起來,processElement中可以取出再次使用. 具體是通過firstBroadcastStateDesc 的 name 來獲取 BroadcastState
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)
}
}
// 廣播流和數據流的Source
private class CheckpointedSource(val numElements: Int)
extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true
private var state: ListState[CustomCaseClass] = _
// 就是簡單的定期發送
override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
ctx.emitWatermark(new Watermark(0))
ctx.getCheckpointLock synchronized {
var i = 0
while (i < numElements) {
ctx.collect(i, i)
i += 1
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
// timers
while (isRunning) Thread.sleep(20)
}
}
2. 技術難點
MapStateDescriptor
首先要說明一些概念:
- Flink中包含兩種基礎的狀態:Keyed State和Operator State。
- Keyed State和Operator State又可以 以兩種形式存在:原始狀態和托管狀態。
- 托管狀態是由Flink框架管理的狀態,如ValueState, ListState, MapState等。
- raw state即原始狀態,由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
- MapState是托管狀態的一種:即狀態值為一個map。用戶通過
put或putAll方法添加元素。
回到我們的例子,廣播變量就是OperatorState的一部分,是以托管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現
所以我們需要用MapStateDescriptor描述broadcast state,這里MapStateDescriptor的使用比較靈活,因為是key,value類似使用,所以個人覺得value直接使用類,這樣更方便,尤其是對于從其他語言轉到scala的同學。
processBroadcastElement
// 因為主要起到控制作用,所以這個函數的處理相對簡單
override def processBroadcastElement(): Unit = {
// 這里可以把最新傳來的廣播變量存儲起來,processElement中可以取出再次使用,比如
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
}
processElement
// 這個函數需要和processBroadcastElement配合起來使用
override def processElement(): Unit = {
// 可以取出processBroadcastElement之前存儲的廣播變量,然后用此來處理業務變量,比如
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
var actualSecondState = Map[String, String]()
for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {
val v = secondExpectedBroadcastState.get(entry.getKey).get
actualSecondState += (entry.getKey -> entry.getValue)
}
// 甚至這里只要和processBroadcastElement一起關聯好,可以存儲任意類型的變量。不必須要和廣播變量的類型一致。重點是聲明新的對應的MapStateDescriptor
// MapStateDescriptor繼承了StateDescriptor,其中state為MapState類型,value為Map類型
}
結合起來使用
因為某些限制,所以下面只能從網上找一個例子給大家講講。
// 模式始終存儲在MapState中,并將null作為鍵。broadcast state始終表示為MapState,這是Flink提供的最通用的狀態原語。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
// 能看到的是,在處理廣播變量時候,存儲廣播變量到BroadcastState
public void processBroadcastElement(Pattern pattern, Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
// 能看到的是,在處理業務變量時候,從BroadcastState取出廣播變量,存取時候實際都是用"patterns"這個name字符串來作為key。
public void processElement(Action action, ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
0x04. Flink 源碼解析
1. 廣播的邏輯流程
* The life cycle of the Broadcast:
* {@code
* -- 初始化邏輯 -> 用一個BroadcastConnectedStream把數據流和廣播流結合起來進行拓撲轉換
* |
* +----> businessStream = DataStream.filter.map....
* | // 處理業務邏輯的數據流,businessStream 是普通DataStream
* +----> broadcastStream = DataStream.broadcast(broadcastStateDesc)
* | // 處理配置邏輯的廣播數據流,broadcastStream是BroadcastStream類型
* +----> businessStream.connect(broadcastStream)
* | .process(new processFunction(broadcastStateDesc))
* | // 把業務流,廣播流 結合起來,生成一個BroadcastConnectedStream,然后進行 process
* +----------> process @ BroadcastConnectedStream
* | TwoInputStreamOperator<IN1, IN2, OUT> operator =
* | new CoBroadcastWithNonKeyedOperator<>(clean(function),
* | broadcastStateDescriptors);
* | return transform(outTypeInfo, operator);
* | // 生成一個類型是TwoInputStreamOperator 的 operator,進行 transform
* +----------------> transform @ BroadcastConnectedStream
* | transform = new TwoInputTransformation<>(
* | inputStream1.getTransformation(), // 業務流
* | inputStream2.getTransformation(), // 廣播流
* | ifunctionName, // 用戶的UDF
* | operator, // 算子 CoBroadcastWithNonKeyedOperator
* | outTypeInfo); // 輸出類型
* | returnStream = new SingleOutputStreamOperator(transform);
* | getExecutionEnvironment().addOperator(transform)
* | // 將業務流,廣播流與拓撲聯合起來形成一個轉換,加到 Env 中,這就完成了拓撲轉換
* | // 最后返回結果是一個SingleOutputStreamOperator。
* }
* 數據結構:
* -- BroadcastStream.
* 就是簡單封裝一個DataStream,然后記錄這個廣播流對應的StateDescriptors
public class BroadcastStream<T> {
private final StreamExecutionEnvironment environment;
private final DataStream<T> inputStream;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 數據結構:
* -- BroadcastConnectedStream.
* 把業務流,廣播流 結合起來,然后會生成算子和拓撲
public class BroadcastConnectedStream<IN1, IN2> {
private final StreamExecutionEnvironment environment;
private final DataStream<IN1> inputStream1;
private final BroadcastStream<IN2> inputStream2;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 真實計算:
* -- CoBroadcastWithNonKeyedOperator -> 真正對BroadcastProcessFunction的執行,是在這里完成的
public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private transient TimestampedCollector<OUT> collector;
private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
private transient ReadWriteContextImpl rwContext;
private transient ReadOnlyContextImpl rContext;
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
// 當上游有最新業務數據來的時候,調用用戶自定義的processElement
// 在這可以把之前存儲的廣播配置信息取出,然后對業務數據流進行處理
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
// 當上游有數據來的時候,調用用戶自定義的processBroadcastElement
// 在這可以把最新傳送的廣播配置信息存起來
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}
}
2. DataStream的關鍵函數
// 就是connect,broadcast,分別生成對應的數據流
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
@PublicEvolving
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
return new BroadcastConnectedStream<>(
environment,
this,
Preconditions.checkNotNull(broadcastStream),
broadcastStream.getBroadcastStateDescriptor());
}
@PublicEvolving
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}
}
3. 關鍵數據結構MapStateDescriptor
主要是用來聲明各種元數據信息。后續可以看出來,系統是通過MapStateDescriptor的name,即第一個參數來存儲 / 獲取MapStateDescriptor對應的State。
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keySerializer The type serializer for the keys in the state.
* @param valueSerializer The type serializer for the values in the state.
*/
public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyTypeInfo The type information for the keys in the state.
* @param valueTypeInfo The type information for the values in the state.
*/
public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyClass The class of the type of keys in the state.
* @param valueClass The class of the type of values in the state.
*/
public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
super(name, new MapTypeInfo<>(keyClass, valueClass), null);
}
}
4. 狀態存取
在processBroadcastElement和processElement之間傳遞的狀態,是通過MapStateDescriptor的name為key,來存儲在Flink中。即類似調用如下ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)。所以我們接下來需要介紹下Flink的State概念。
State vs checkpoint
首先區分一下兩個概念,state一般指一個具體的task/operator的狀態。而checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了所有task/operator的狀態。Flink通過定期地做checkpoint來實現容錯和恢復。
Flink中包含兩種基礎的狀態:Keyed State和Operator State。
Keyed State
顧名思義,就是基于KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Operator State
與Keyed State不同,Operator State跟一個特定operator的一個并發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能會有很多個key,從而對應多個keyed state。
舉例來說,Flink中的Kafka Connector,就使用了operator state。它會在每個connector實例中,保存該實例中消費topic的所有(partition, offset)映射。
原始狀態和Flink托管狀態 (Raw and Managed State)
這又是另外一個維度。
Keyed State 和 Operator State 分別有兩種存在形式:managed and raw, 即原始狀態和托管狀態。
托管狀態是由 Flink框架運行時 管理的狀態,比如內部的 hash table 或者 RocksDB。 比如 “ValueState”, “ListState” 等。Flink runtime 會對這些狀態進行編碼并寫入 checkpoint。
比如managed keyed state 接口提供不同類型狀態的訪問接口,這些狀態都作用于當前輸入數據的 key 下。換句話說,這些狀態僅可在 KeyedStream 上使用,可以通過 stream.keyBy(...) 得到 KeyedStream。而我們可以通過實現 CheckpointedFunction 或 ListCheckpointed 接口來使用 managed operator state。
Raw state即原始狀態,由用戶自行管理狀態具體的數據結構,保存在算子自己的數據結構中。checkpoint 的時候,Flink 并不知曉具體的內容,僅僅寫入一串字節序列到 checkpoint。
通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。
回到我們的例子,廣播變量就是OperatorState的一部分,是以托管狀態的MapState形式保存的。具體getBroadcastState函數就是DefaultOperatorStateBackend中的實現。
StateDescriptor
你必須創建一個 StateDescriptor,才能得到對應的狀態句柄。 這保存了狀態名稱(你可以創建多個狀態,并且它們必須具有唯一的名稱以便可以引用它們), 狀態所持有值的類型,并且可能包含用戶指定的函數,例如ReduceFunction。 根據不同的狀態類型,可以創建ValueStateDescriptor,ListStateDescriptor, ReducingStateDescriptor,FoldingStateDescriptor 或 MapStateDescriptor。
狀態通過 RuntimeContext 進行訪問,因此只能在 rich functions 中使用。
OperatorStateBackEnd
OperatorStateBackEnd 主要管理OperatorState. 目前只有一種實現: DefaultOperatorStateBackend。
DefaultOperatorStateBackend
DefaultOperatorStateBackend狀態是以Map方式來存儲的。其構造出一個 PartitionableListState (屬于ListState)。OperatorState都保存在內存中。
public class DefaultOperatorStateBackend implements OperatorStateBackend {
/**
* Map for all registered operator states. Maps state name -> state
*/
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
/**
* Map for all registered operator broadcast states. Maps state name -> state
*/
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
/**
* Cache of already accessed states.
*
* <p>In contrast to {@link #registeredOperatorStates} which may be repopulated
* with restored state, this map is always empty at the beginning.
*
* <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
*/
private final Map<String, PartitionableListState<?>> accessedStatesByName;
private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName; // 這里用來緩存廣播變量
// 這里就是前文中所說的,存取廣播變量的API
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
// 如果之前有,就取出來
BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(
name);
if (previous != null) {
return previous;
}
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
} else {
// has restored state; check compatibility of new state access
RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
// check whether new serializers are incompatible
TypeSerializerSchemaCompatibility<K> keyCompatibility =
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
TypeSerializerSchemaCompatibility<V> valueCompatibility =
restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
}
accessedBroadcastStatesByName.put(name, broadcastState); // 如果之前沒有,就存入
return broadcastState;
}
}
0x05. 參考
Flink原理與實現:詳解Flink中的狀態管理 https://yq.aliyun.com/articles/225623
Flink使用廣播實現配置動態更新 https://www.jianshu.com/p/c8c99f613f10
Flink Broadcast State實用指南 https://blog.csdn.net/u010942041/article/details/93901918
聊聊flink的Broadcast State https://www.jianshu.com/p/d6576ae67eae
Working with State https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html
浙公網安備 33010602011771號