[Flink/序列化/泛型] Flink DataStream 的類型系統(`TypeInformation`)
0 序 :緣起——最近遇到的類型系統轉換問題
- 最近,在將一個業務上較為重要的、原來由 Flink Sql 編寫的 Flink Job 重寫,改為基于 Flink Api 實現。
但改造過程中遇到不少 Flink 類型系統轉換方面的問題,索性總結一二。
InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
Tuple arity: 元組元數
public class DeviceRemoteConfigVersionMapFunction extends RichMapFunction< Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>, Tuple4<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode, DeviceRemoteConfigVersion>> implements ResultTypeQueryable {
@Override
public TypeInformation getProducedType() {
return Types.TUPLE(
TypeInformation.of( CdcPrimaryKeys.class ), //如缺少1個 class,比如此行,則報錯:InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
TypeInformation.of( DeviceAlarmRecord.class ),
TypeInformation.of( DimFaultCode.class ),
TypeInformation.of( DeviceRemoteConfigVersion.class )
);
}
//...
}
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1436) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:151) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:581) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at com.xxx.bd.dataintegration.flinkapi.entry.xxx.DwdDeviceAlarmRecordRi.main(DwdDeviceAlarmRecordRi.java:106) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:232) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:343) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
... 12 more
public class DimFaultCodeMapFunction extends RichMapFunction< Tuple2<CdcPrimaryKeys, DeviceAlarmRecord>, Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>> implements ResultTypeQueryable {
@Override
public TypeInformation getProducedType() {
return Types.TUPLE(
TypeInformation.of( CdcPrimaryKeys.class ),//如果不填這個,將報錯: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.apache.flink.api.java.tuple.Tuple3
TypeInformation.of( DeviceAlarmRecord.class ),
TypeInformation.of( DimFaultCode.class )
);
}
//...
}
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
@Override
public TypeInformation getProducedType() {
return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
//return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
//return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
//return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
}
//...
}
TypeHint<T>的 T : 不支持泛型
public class MysqlCdcDeserializationSchema implements DebeziumDeserializationSchema<CdcRecord> {
/**
* 獲取返回類型
* @reference-doc
* [1] {@link com.ververica.cdc.connectors.mysql.source.MySqlSource # deserializer(DebeziumDeserializationSchema<T> deserializer) 要求 }
* @return
*/
@Override
public TypeInformation<CdcRecord> getProducedType() {
//return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
//return Types.GENERIC(CdcRecord.class);
return Types.POJO( CdcRecord.class );
//return TypeInformation.of( CdcRecord.class );
//return BasicTypeInfo.of( CdcRecord.class );
//return BasicTypeInfo.STRING_TYPE_INFO;
}
//...
}
1 概述:Flink DataStream 的類型系統
Flink 類型系統的由來
FlinkDataStream應用程序所處理的【事件】以數據對象的形式存在。
- 函數調用時會傳入數據對象,同時也可以輸出數據對象。
因此,
Flink在內部需要能夠處理這些不同數據結構的對象。
- 當通過【網絡傳輸】或者讀寫【狀態后端】、【檢查點】以及【保存點】時,需要對它們進行【序列化】和【反序列化】。
為了能夠更高效地做到這一點,Flink 框架需要詳細了解應用程序處理的數據類型。
-
Flink使用【類型信息】(org.apache.flink.api.common.typeinfo.TypeInformation)的概念來表示數據類型,并為每種數據類型生成特定的序列化器、反序列化器及比較器。 -
此外,Flink 還有一個類型提取系統,可以分析函數的輸入和返回類型來自動獲取類型信息,進而獲得序列化器和反序列化器。
但是,在某些情況下,例如使用了
Lambda函數或者泛型類型,必須顯式提供【類型信息】才能使應用程序正常工作或者提高其性能。
- 在本文中,我們會討論:
- Flink 支持的數據類型
- 如何為數據類型創建類型信息
- 如何在 Flink 的類型系統無法自動推斷函數的返回類型時提供提示
最后簡單說明一下顯示指定類型信息的兩個場景。
Flink 類型系統的常見應用場景: 注冊子類型、注冊自定義序列化器、添加類型提示、手動創建 TypeInformation
-
注冊子類型:如果函數簽名只描述了超類型,但是它們實際上在執行期間使用了超類型的子類型,那么讓 Flink 了解這些子類型會大大提高性能。可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 中調用 .registertype (clazz) 注冊子類型信息。
-
注冊自定義序列化:對于不適用于自己的序列化框架的數據類型,Flink 會使用 Kryo 來進行序列化,并不是所有的類型都與 Kryo 無縫連接,具體注冊方法在下文介紹。
-
添加類型提示:有時,當 Flink 用盡各種手段都無法推測出泛型信息時,用戶需要傳入一個類型提示 TypeHint,這個通常只在 Java API 中需要。
-
手動創建一個 TypeInformation:在某些 API 調用中,這可能是必需的,因為 Java 的泛型類型擦除導致 Flink 無法推斷數據類型。
例如:
//樣例: OutDto 為數據類型不定的泛型
public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
//...
@Override
public TypeInformation getProducedType() {
return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
//return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
//return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
//return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
}
}
其實在大多數情況下,用戶不必擔心序列化框架和注冊類型,因為 Flink 已經提供了大量的序列化操作,不需要去定義自己的一些序列化器,但是在一些特殊場景下,需要去做一些相應的處理。
Flink DataStream 類型系統的核心API
-
org.apache.flink.api.common.typeinfo.TypeInformation -
org.apache.flink.api.java.typeutils.ResultTypeQueryable -
org.apache.flink.api.common.typeinfo.Types
public class Types

org.apache.flink.api.common.typeinfo.TypeHint
public abstract class TypeHint<T>
org.apache.flink.api.common.typeinfo.BasicTypeInfo
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>
org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T>
//eg: org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>
org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo
public class LocalTimeTypeInfo<T extends Temporal> extends TypeInformation<T> implements AtomicType<T>
org.apache.flink.api.common.typeinfo.NothingTypeInfo
public class NothingTypeInfo extends TypeInformation<Nothing>
- ...
2 數據類型
- Flink 支持 Java 和 Scala 所有常見的數據類型,也不需要像 Hadoop 一樣去實現一個特定的接口(
org.apache.hadoop.io.Writable),能夠自動識別數據類型。
使用最多的可以分為如下幾類,如下圖所示:

從圖中可以看到 Flink 類型可以分為基本類型、數組類型、復合類型、輔助類型以及泛型。
2.1 基本類型
- Flink 能夠支持所有 Java 和 Scala 原生基本類型(包裝類型)以及 Void、String、Date、BigDecimal、BigInteger 等類型。
例如: 通過從給定的元素集中創建 DataStream 數據集:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 創建 Integer 類型的數據集
DataStream<Integer> integerElements = env.fromElements(1, 2, 3);
// 創建 String 類型的數據集
DataStream<String> stringElements = env.fromElements("1", "2", "3");
2.2 數組類型
- 數組類型包含兩種類型:
- 基本類型數組:基本類型的 Java 數組,支持 boolean、byte、short、int、long、float 等
- 對象數組:Object 類型的 Java 數組,支持 String 以及其他對象
例如: 通過從給定的元素集中創建 DataStream 數據集:
int[] a = {1, 2};
int[] b = {3, 4};
DataStream<int[]> arrayElements = env.fromElements(a, b);
2.3 復合數據類型
2.3.1 Java Tuples 類型
- Flink 在 Java 接口中定義了元組類(
Tuple)供用戶使用。
元組是由固定數量的強類型字段組成的復合數據類型。
如下代碼所示,創建 Tuple 數據類型數據集:
DataStream<Tuple2> tupleElements = env.fromElements(new Tuple2(1, "a"), new Tuple2(2, "b"));
Flink 提供了 Java 元組的高效實現,最多包含 25 個字段,每個字段長度都對應一個單獨的實現,即 Tuple0 到 Tuple25。如果字段數量超過上限,可以通過繼承 Tuple 類的方式進行拓展。
2.3.2 Scala Case Class 與 Tuple 類型
- Flink 支持任意的 Scala Case Class 以及 Scala tuples 類型,支持的字段數量上限為 22,支持通過字段名稱和位置索引獲取指標,不支持存儲空值。
例如:代碼實例所示,定義 WordCount Case Class 數據類型,然后通過 fromElements 方法創建 input 數據集,調用 keyBy() 方法對數據集根據 word 字段重新分區。
// 定義WordCount Case Class數據結構
case class WordCount(word: String, count: Int)
// 通過fromElements方法創建數據集
val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
val keyStream1 = input.keyBy("word") // 根據word字段為分區字段,
val keyStream2 = input.keyBy(0) //也可以通過指定position分區
- 通過使用 Scala Tuple 創建 DataStream 數據集,其他的使用方式和 Case Class 相似。
需要注意的是,如果根據名稱獲取字段,可以使用 Tuple 中的默認字段名稱:
// 通過 scala Tuple 創建具有兩個元素的數據集
val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),("c", 2))
// 使用默認字段名稱獲取字段,其中 _1 表示 tuple 的第一個字段
tupleStream.keyBy("_1")
2.3.3 ROW 類型
Row是一種固定長度、可識別空值的復合類型,以確定的字段順序存儲多個值。
- 每個字段的類型都可以不一樣,并且每個字段都可以為空。
- 由于無法自動推斷行字段的類型,因此——在生成 Row 時都需要提供類型信息。
如下代碼所示,創建 Row 數據類型數據集:
DataStream<Row> rowElements = env.fromElements(Row.of(0, "a", 3.14));
2.3.4 POJO 類型
- Flink 會分析那些不屬于任何一類的數據類型,嘗試將它們作為 POJO 類型進行處理。如果一個類型滿足如下條件,Flink 就會將它們作為 POJO 數據類型:
- POJOs 類必須是一個公有類,Public 修飾且獨立定義,不能是內部類;
- POJOs 類中必須包含一個 Public 修飾的無參構造器;
- POJOs 類中所有的字段必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法;
- POJOs 類中的字段類型必須是 Flink 支持的。
例如,如下 Java 類就會被 Flink 識別為
POJO:
// (1) 必須是 Public 修飾且必須獨立定義,不能是內部類
public class Person {
// (4) 字段類型必須是 Flink 支持的
private String name;
private int age;
// (2) 必須包含一個 Public 修飾的無參構造器
public Person() {
}
public Person(String name, int age) {
this.name = name;
this.age = age;
}
// (3) 所有的字段必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
- 定義好 POJOs Class 后,就可以在 Flink 環境中使用了,如下代碼所示,使用 fromElements 接口構建 Person 類的數據集:
env.fromElements(new Person("Lucy", 18), new Person("Tom", 12))
2.4 輔助類型
- 在 Flink 中也支持一些比較特殊的數據數據類型,例如 Scala 中的 List、Map、Either、Option、Try 數據類型,以及 Java 中 Either 數據類型,還有 Hadoop 的 Writable 數據類型。
如下代碼所示,創建 List 類型數據集:
DataStream<ArrayList<Integer>> listElements = env.fromElements(
Lists.newArrayList(1, 2), Lists.newArrayList(3, 4)
);
這種數據類型使用場景不是特別廣泛,主要原因是數據中的操作相對不像 POJOs 類那樣方便和透明,用戶無法根據字段位置或者名稱獲取字段信息,同時要借助 Types Hint 幫助 Flink 推斷數據類型信息。
2.5 泛型類型
- 那些無法特別處理的類型會被當做泛型類型處理并交給
Kryo序列化框架進行序列化。
如果可能的話,盡可能的避免使用 Kryo。
Kryo作為一個通用的序列化框架,通常效率不高。
3 TypeInformation
- 那這么多的數據類型,在 Flink 內部又是如何表示的呢?
在 Flink 中每一個具體的類型都對應了一個具體的
TypeInformation實現類。
例如,BasicTypeInformation 中的 IntegerTypeInformation 對應了 Integer 數據類型。
- 數據類型的描述信息都是由
TypeInformation定義。
比較常用的 TypeInformation 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 類等,如下圖所示:

TypeInformation為系統提供生成序列化器和比較器提供必要的信息。
- 當應用程序提交執行時,Flink 的類型系統會嘗試為處理的每種數據類型自動推斷 TypeInformation。
- 類型提取器會分析函數的泛型類型以及返回類型,來獲取相應的
TypeInformation對象。
-
但是,有時類型提取器會失靈,或者你可能想定義自己的類型并告訴 Flink 如何有效地處理它們。在這種情況下,你需要為特定數據類型生成 TypeInformation。
-
除了對類型的描述之外,TypeInformation 還提供了序列化的支撐。每一個 TypeInformation 都會為對應的具體數據類型提供一個專屬的序列化器。
TypeInformation會提供一個createSerialize()方法,通過這個方法就可以得到該類型進行數據序列化操作與反序列化操作的序列化器TypeSerializer:
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
- 對于大多數的數據類型, Flink 可以自動生成對應的序列化器,能非常高效地對數據集進行序列化和反序列化。
比如,BasicTypeInfo、WritableTypeIno 等
- 但針對
GenericTypeInfo類型,Flink 會使用Kyro進行序列化和反序列化。
其中,Tuple、Pojo 和 CaseClass 類型是復合類型,它們可能嵌套一個或者多個數據類型。
在這種情況下,它們的序列化器同樣是復合的。它們會將內嵌類型的序列化委托給對應類型的序列化器。
4 顯式指定 TypeInformation
-
大多數情況下,Flink 可以自動推斷類型生成正確的 TypeInformation,并選擇合適的序列化器和比較器。
-
Flink 的類型提取器利用反射機制來分析函數簽名以及子類信息,生成函數的正確輸出類型。
-
但是有時無法提取必要的信息,例如定義函數時如果使用到了泛型,JVM 就會出現類型擦除的問題,使得 Flink 并不能很容易地獲取到數據集中的數據類型信息。這時候可能會拋出如下類似的異常:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ReturnsExample.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1236)
at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
...
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
...
- 此外,在某些情況下,Flink 選擇的 TypeInformation 可能無法生成【最有效的序列化器】和【反序列化器】。
因此,你可能需要為你使用的數據類型顯式地提供 TypeInformation。
- 我們首先看一下如何創建 TypeInformation,然后再看一下如何為函數指定 TypeInformation。
4.1 創建 TypeInformation
4.1.1 of 方法
- 對于非泛型的類型,可以使用 TypeInformation 的
of(Class typeClass)函數直接傳入 Class 就可以創建 TypeInformation:
// 示例1 非泛型類型 直接傳入 Class 對象
DataStream<WordCount> result1 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(TypeInformation.of(WordCount.class));
result1.print("R1");
上述方法僅適用于非泛型類型。
- 如果是泛型類型,可以借助
TypeHint為泛型類型創建 TypeInformation:
// 示例2 泛型類型(如: <String, Integer>) 需要借助 TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result2.print("R2");
4.1.2 TypeHint
- 對于泛型類型,上面是通過
TypeInformation.of+TypeHint來創建 TypeInformation,也可以單獨使用TypeHint來創建 TypeInformation:
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo());
result2.print("R2");
- TypeHint 的原理是在內部創建匿名子類,捕獲泛型信息并會將其保留到運行時。運行時
TypeExtractor可以獲取保存的實際類型。
4.1.3 預定義的快捷方式
- 例如 BasicTypeInfo 類定義了一系列常用類型的快捷方式,對于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,可以直接使用:

- 當然,如果覺得 BasicTypeInfo 還是太長,Flink 還提供了完全等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):

- Types 為常見數據類型提供 TypeInformation,使用起來非常方便,如下示例:
// 示例1 Types.TUPLE
DataStream<Tuple2<String, Integer>> result1 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
result1.print("R1");
// 示例2 Types.POJO
DataStream<WordCount> result2 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(Types.POJO(WordCount.class));
result2.print("R2");
4.2 顯式提供類型信息
- 當 Flink 無法自動推斷函數的生成類型是什么的時候,就需要我們顯示提供類型信息提示。從上面示例中我們知道可以通過 returns 顯示提供類型信息,除此之外還可以實現
ResultTypeQueryable接口顯示提供。
org.apache.flink.api.java.typeutils.ResultTypeQueryable
//樣例:
public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
//...
@Override
public TypeInformation getProducedType() {
return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
//return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
//return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
//return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
}
}
4.2.1 returns
第一種方法是使用 returns 為算子添加返回類型的類型信息提示。對于非泛型類型,可以直接傳入 Class 即可;對于泛型類型需要借助 TypeHint 提供類型信息提示,如下所示:
// 示例1 非泛型類型 直接傳入 Class
DataStream<WordCount> result1 = env.fromElements("a b a")
.flatMap((String value, Collector<WordCount> out) -> {
for(String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
})
.returns(WordCount.class);
result1.print("R1");
// 示例2 泛型類型 優先推薦借助 TypeHint
DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(new TypeHint<Tuple2<String, Integer>>() {});
result2.print("R2");
// 示例3 TypeInformation.of + TypeHint
DataStream<Tuple2<String, Integer>> result3 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
result3.print("R3");
// 示例4 Types 快捷方式
DataStream<Tuple2<String, Integer>> result4 = env.fromElements("a", "b", "a")
.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
result4.print("R4");
4.2.2 ResultTypeQueryable
- 第二種方法是通過實現 ResultTypeQueryable 接口來擴展函數以顯式提供返回類型的 TypeInformation。
如下示例是一個顯式提供返回類型的 MapFunction:
public static class ResultTypeMapFunction implements MapFunction<String, Stu>, ResultTypeQueryable {
@Override
public Stu map(String value) throws Exception {
String[] params = value.split(",");
String name = params[0];
int age = Integer.parseInt(params[1]);
return new Stu(name, age);
}
@Override
public TypeInformation getProducedType() {
return Types.POJO(Stu.class);
}
}
5 使用場景
5.1 Table 轉 DataStream
Table轉 DataStream 的時候,Table 并清楚 DataStream 的數據結構。
因此,需要給當前轉換出來的 DataStream 顯性的指定數據類型:
// 轉化為 Pojo 類型
DataStream<WordCount> stream1 = tEnv.toAppendStream(table, Types.POJO(WordCount.class));
// 轉換為 Row 類型
DataStream<Row> stream2 = tEnv.toAppendStream(table, Types.ROW(Types.STRING, Types.LONG));
5.2 Lambda 表達式與泛型
- 由于 Java 泛型會出現類型擦除問題,因此 Flink 通過 Java 反射機制盡可能重構類型信息,例如使用函數簽名以及子類的信息等。
對于函數的返回類型取決于輸入類型的情況時,會包含一些簡單的類型推斷。
但如果無法重構所有的泛型類型信息時,需要借助于類型提示來告訴系統函數中傳入的參數類型信息和輸出參數信息。
如下所示使用 returns 語句指定生成的類型:
env.fromElements(1, 2, 3)
.map(i -> Tuple2.of(i, i*i))
// 如果不指定 returns 返回的 TypeInformation 會拋出異常
.returns(Types.TUPLE(Types.INT, Types.INT))
.print();
Y 推薦文獻
X 參考文獻
- Flink DataStream 類型系統 TypeInformation - 騰訊云 2022.4.23 【推薦】
- flink 遇到 Tuple2 泛型導致的問題:could not be determined automatically, due to type erasure. - jianshu
遇到【泛型】的類型擦除問題
本文鏈接: http://www.rzrgm.cn/johnnyzen
關于博文:評論和私信會在第一時間回復,或直接私信我。
版權聲明:本博客所有文章除特別聲明外,均采用 BY-NC-SA 許可協議。轉載請注明出處!
日常交流:大數據與軟件開發-QQ交流群: 774386015 【入群二維碼】參見左下角。您的支持、鼓勵是博主技術寫作的重要動力!

浙公網安備 33010602011771號