<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [Flink/序列化/泛型] Flink DataStream 的類型系統(`TypeInformation`)

      0 序 :緣起——最近遇到的類型系統轉換問題

      • 最近,在將一個業務上較為重要的、原來由 Flink Sql 編寫的 Flink Job 重寫,改為基于 Flink Api 實現。

      但改造過程中遇到不少 Flink 類型系統轉換方面的問題,索性總結一二。

      • InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.

      Tuple arity : 元組元數

      public class DeviceRemoteConfigVersionMapFunction extends RichMapFunction< Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>, Tuple4<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode, DeviceRemoteConfigVersion>> implements ResultTypeQueryable {
          @Override
          public TypeInformation getProducedType() {
              return Types.TUPLE(
                  TypeInformation.of( CdcPrimaryKeys.class ), //如缺少1個 class,比如此行,則報錯:InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
                  TypeInformation.of( DeviceAlarmRecord.class ),
                  TypeInformation.of( DimFaultCode.class ),
                  TypeInformation.of( DeviceRemoteConfigVersion.class )
              );
          }
      	
      	//...
      }
      
      Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input mismatch: Tuple arity '3' expected but was '4'.
      	at org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1436) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:571) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:151) ~[flink-core-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:581) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at com.xxx.bd.dataintegration.flinkapi.entry.xxx.DwdDeviceAlarmRecordRi.main(DwdDeviceAlarmRecordRi.java:106) ~[?:?]
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_422]
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_422]
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_422]
      	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_422]
      	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:381) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:232) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:343) ~[flink-dist-1.15.0-h0.cbu.dli.330.20241021.r34.jar:1.15.0-h0.cbu.dli.330.20241021.r34]
      	... 12 more
      
      public class DimFaultCodeMapFunction extends RichMapFunction< Tuple2<CdcPrimaryKeys, DeviceAlarmRecord>, Tuple3<CdcPrimaryKeys, DeviceAlarmRecord, DimFaultCode>> implements ResultTypeQueryable {
          @Override
          public TypeInformation getProducedType() {
              return Types.TUPLE(
                  TypeInformation.of( CdcPrimaryKeys.class ),//如果不填這個,將報錯: java.lang.ClassCastException: org.apache.flink.api.java.tuple.Tuple2 cannot be cast to org.apache.flink.api.java.tuple.Tuple3
                  TypeInformation.of( DeviceAlarmRecord.class ),
                  TypeInformation.of( DimFaultCode.class )
              );
          }
      	
      	//...
      }
      
      • Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
      public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
          @Override
          public TypeInformation getProducedType() {
              return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
              //return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
              //return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
              //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
          }
      	
      	//...
      }
      
      • TypeHint<T> 的 T : 不支持泛型
      public class MysqlCdcDeserializationSchema implements DebeziumDeserializationSchema<CdcRecord> {
          /**
           * 獲取返回類型
           * @reference-doc
           *  [1] {@link com.ververica.cdc.connectors.mysql.source.MySqlSource # deserializer(DebeziumDeserializationSchema<T> deserializer) 要求 }
           * @return
           */
          @Override
          public TypeInformation<CdcRecord> getProducedType() {
              //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
              //return Types.GENERIC(CdcRecord.class);
              return Types.POJO( CdcRecord.class );
              //return TypeInformation.of( CdcRecord.class );
              //return BasicTypeInfo.of( CdcRecord.class );
      
              //return BasicTypeInfo.STRING_TYPE_INFO;
          }
      	
      	//...
      }
      

      1 概述:Flink DataStream 的類型系統

      • Flink DataStream 應用程序所處理的【事件】以數據對象的形式存在。
      • 函數調用時會傳入數據對象,同時也可以輸出數據對象

      因此,Flink內部需要能夠處理這些不同數據結構的對象

      • 當通過【網絡傳輸】或者讀寫【狀態后端】、【檢查點】以及【保存點】時,需要對它們進行【序列化】【反序列化】

      為了能夠更高效地做到這一點,Flink 框架需要詳細了解應用程序處理的數據類型

      • Flink 使用【類型信息】(org.apache.flink.api.common.typeinfo.TypeInformation)的概念來表示數據類型,并為每種數據類型生成特定的序列化器、反序列化器及比較器

      • 此外,Flink 還有一個類型提取系統,可以分析函數的輸入和返回類型來自動獲取類型信息,進而獲得序列化器反序列化器

      但是,在某些情況下,例如使用了 Lambda 函數或者泛型類型,必須顯式提供【類型信息】才能使應用程序正常工作或者提高其性能。

      • 在本文中,我們會討論:
      • Flink 支持的數據類型
      • 如何為數據類型創建類型信息
      • 如何在 Flink 的類型系統無法自動推斷函數的返回類型時提供提示

      最后簡單說明一下顯示指定類型信息的兩個場景。

      • 注冊子類型:如果函數簽名只描述了超類型,但是它們實際上在執行期間使用了超類型的子類型,那么讓 Flink 了解這些子類型會大大提高性能。可以在 StreamExecutionEnvironment 或 ExecutionEnvironment 中調用 .registertype (clazz) 注冊子類型信息。

      • 注冊自定義序列化:對于不適用于自己的序列化框架的數據類型,Flink 會使用 Kryo 來進行序列化,并不是所有的類型都與 Kryo 無縫連接,具體注冊方法在下文介紹。

      • 添加類型提示:有時,當 Flink 用盡各種手段都無法推測出泛型信息時,用戶需要傳入一個類型提示 TypeHint,這個通常只在 Java API 中需要。

      • 手動創建一個 TypeInformation:在某些 API 調用中,這可能是必需的,因為 Java 的泛型類型擦除導致 Flink 無法推斷數據類型。

      例如:

      //樣例: OutDto 為數據類型不定的泛型
      public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
      	//...
      
          @Override
          public TypeInformation getProducedType() {
              return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
              //return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
              //return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
              //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
          }
      }
      

      其實在大多數情況下,用戶不必擔心序列化框架和注冊類型,因為 Flink 已經提供了大量的序列化操作,不需要去定義自己的一些序列化器,但是在一些特殊場景下,需要去做一些相應的處理。

      • org.apache.flink.api.common.typeinfo.TypeInformation

      • org.apache.flink.api.java.typeutils.ResultTypeQueryable

      • org.apache.flink.api.common.typeinfo.Types

      public class Types
      

      image

      • org.apache.flink.api.common.typeinfo.TypeHint
      public abstract class TypeHint<T>
      
      • org.apache.flink.api.common.typeinfo.BasicTypeInfo
      public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>
      
      • org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
      public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T>
      
      //eg: org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO
      
      • org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
      public class SqlTimeTypeInfo<T> extends TypeInformation<T> implements AtomicType<T>
      
      • org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo
      public class LocalTimeTypeInfo<T extends Temporal> extends TypeInformation<T> implements AtomicType<T>
      
      • org.apache.flink.api.common.typeinfo.NothingTypeInfo
      public class NothingTypeInfo extends TypeInformation<Nothing>
      
      • ...

      2 數據類型

      • Flink 支持 Java 和 Scala 所有常見的數據類型,也不需要像 Hadoop 一樣去實現一個特定的接口(org.apache.hadoop.io.Writable),能夠自動識別數據類型

      使用最多的可以分為如下幾類,如下圖所示:

      image

      從圖中可以看到 Flink 類型可以分為基本類型、數組類型、復合類型、輔助類型以及泛型。

      2.1 基本類型

      • Flink 能夠支持所有 Java 和 Scala 原生基本類型(包裝類型)以及 Void、String、Date、BigDecimal、BigInteger 等類型。

      例如: 通過從給定的元素集中創建 DataStream 數據集:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // 創建 Integer 類型的數據集
      DataStream<Integer> integerElements = env.fromElements(1, 2, 3);
      // 創建 String 類型的數據集
      DataStream<String> stringElements = env.fromElements("1", "2", "3");
      

      2.2 數組類型

      • 數組類型包含兩種類型:
      • 基本類型數組:基本類型的 Java 數組,支持 boolean、byte、short、int、long、float 等
      • 對象數組:Object 類型的 Java 數組,支持 String 以及其他對象

      例如: 通過從給定的元素集中創建 DataStream 數據集:

      int[] a = {1, 2};
      int[] b = {3, 4};
      DataStream<int[]> arrayElements = env.fromElements(a, b);
      

      2.3 復合數據類型

      2.3.1 Java Tuples 類型

      • Flink 在 Java 接口中定義了元組類Tuple)供用戶使用。

      元組是由固定數量的強類型字段組成的復合數據類型。
      如下代碼所示,創建 Tuple 數據類型數據集:

      DataStream<Tuple2> tupleElements = env.fromElements(new Tuple2(1, "a"), new Tuple2(2, "b"));
      

      Flink 提供了 Java 元組的高效實現,最多包含 25 個字段,每個字段長度都對應一個單獨的實現,即 Tuple0 到 Tuple25。如果字段數量超過上限,可以通過繼承 Tuple 類的方式進行拓展。

      2.3.2 Scala Case Class 與 Tuple 類型

      • Flink 支持任意的 Scala Case Class 以及 Scala tuples 類型,支持的字段數量上限為 22,支持通過字段名稱和位置索引獲取指標,不支持存儲空值。

      例如:代碼實例所示,定義 WordCount Case Class 數據類型,然后通過 fromElements 方法創建 input 數據集,調用 keyBy() 方法對數據集根據 word 字段重新分區。

      // 定義WordCount Case Class數據結構
      case class WordCount(word: String, count: Int)
      // 通過fromElements方法創建數據集
      val input = env.fromElements(WordCount("hello", 1), WordCount("world", 2))
      val keyStream1 = input.keyBy("word") // 根據word字段為分區字段,
      val keyStream2 = input.keyBy(0) //也可以通過指定position分區
      
      • 通過使用 Scala Tuple 創建 DataStream 數據集,其他的使用方式和 Case Class 相似。

      需要注意的是,如果根據名稱獲取字段,可以使用 Tuple 中的默認字段名稱

      // 通過 scala Tuple 創建具有兩個元素的數據集
      val tupleStream: DataStream[Tuple2[String, Int]] = env.fromElements(("a", 1),("c", 2))
      // 使用默認字段名稱獲取字段,其中 _1 表示 tuple 的第一個字段
      tupleStream.keyBy("_1")
      

      2.3.3 ROW 類型

      • Row 是一種固定長度、可識別空值的復合類型,以確定的字段順序存儲多個值。
      • 每個字段的類型都可以不一樣,并且每個字段都可以為空
      • 由于無法自動推斷行字段的類型,因此——在生成 Row 時都需要提供類型信息

      如下代碼所示,創建 Row 數據類型數據集:

      DataStream<Row> rowElements = env.fromElements(Row.of(0, "a", 3.14));
      

      2.3.4 POJO 類型

      • Flink 會分析那些不屬于任何一類的數據類型,嘗試將它們作為 POJO 類型進行處理。如果一個類型滿足如下條件,Flink 就會將它們作為 POJO 數據類型:
      • POJOs 類必須是一個公有類,Public 修飾且獨立定義,不能是內部類
      • POJOs 類中必須包含一個 Public 修飾的無參構造器;
      • POJOs 類中所有的字段必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法;
      • POJOs 類中的字段類型必須是 Flink 支持的。

      例如,如下 Java 類就會被 Flink 識別為 POJO

      // (1) 必須是 Public 修飾且必須獨立定義,不能是內部類
      public class Person {
          // (4) 字段類型必須是 Flink 支持的
          private String name;
          private int age;
          // (2) 必須包含一個 Public 修飾的無參構造器
          public Person() {
      	
          }
          
      	public Person(String name, int age) {
              this.name = name;
              this.age = age;
          }
          
      	// (3) 所有的字段必須是 Public 或者具有 Public 修飾的 getter 和 setter 方法
          public String getName() {
              return name;
          }
          
      	public void setName(String name) {
              this.name = name;
          }
          
      	public int getAge() {
              return age;
          }
          
      	public void setAge(int age) {
              this.age = age;
          }
      }
      
      • 定義好 POJOs Class 后,就可以在 Flink 環境中使用了,如下代碼所示,使用 fromElements 接口構建 Person 類的數據集:
      env.fromElements(new Person("Lucy", 18), new Person("Tom", 12))
      

      2.4 輔助類型

      • 在 Flink 中也支持一些比較特殊的數據數據類型,例如 Scala 中的 List、Map、Either、Option、Try 數據類型,以及 Java 中 Either 數據類型,還有 Hadoop 的 Writable 數據類型。

      如下代碼所示,創建 List 類型數據集:

      DataStream<ArrayList<Integer>> listElements = env.fromElements(
          Lists.newArrayList(1, 2), Lists.newArrayList(3, 4)
      );
      

      這種數據類型使用場景不是特別廣泛,主要原因是數據中的操作相對不像 POJOs 類那樣方便和透明,用戶無法根據字段位置或者名稱獲取字段信息,同時要借助 Types Hint 幫助 Flink 推斷數據類型信息。

      2.5 泛型類型

      • 那些無法特別處理的類型會被當做泛型類型處理并交給 Kryo 序列化框架進行序列化。

      如果可能的話,盡可能的避免使用 Kryo
      Kryo 作為一個通用的序列化框架,通常效率不高

      3 TypeInformation

      • 那這么多的數據類型,在 Flink 內部又是如何表示的呢?

      在 Flink 中每一個具體的類型都對應了一個具體的 TypeInformation 實現類。
      例如,BasicTypeInformation 中的 IntegerTypeInformation 對應了 Integer 數據類型。

      • 數據類型的描述信息都是由 TypeInformation 定義。

      比較常用的 TypeInformation 有 BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo 類等,如下圖所示:

      image

      • TypeInformation 為系統提供生成序列化器和比較器提供必要的信息。
      • 當應用程序提交執行時,Flink 的類型系統會嘗試為處理的每種數據類型自動推斷 TypeInformation。
      • 類型提取器會分析函數的泛型類型以及返回類型,來獲取相應的 TypeInformation 對象。
      • 但是,有時類型提取器會失靈,或者你可能想定義自己的類型并告訴 Flink 如何有效地處理它們。在這種情況下,你需要為特定數據類型生成 TypeInformation

      • 除了對類型的描述之外,TypeInformation 還提供了序列化的支撐。每一個 TypeInformation 都會為對應的具體數據類型提供一個專屬的序列化器

      TypeInformation 會提供一個 createSerialize() 方法,通過這個方法就可以得到該類型進行數據序列化操作與反序列化操作的序列化器 TypeSerializer

      public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
          return this.serializer;
      }
      
      • 對于大多數的數據類型, Flink 可以自動生成對應的序列化器,能非常高效地對數據集進行序列化和反序列化。

      比如,BasicTypeInfo、WritableTypeIno 等

      • 但針對 GenericTypeInfo 類型,Flink 會使用 Kyro 進行序列化和反序列化。

      其中,Tuple、Pojo 和 CaseClass 類型是復合類型,它們可能嵌套一個或者多個數據類型。
      在這種情況下,它們的序列化器同樣是復合的。它們會將內嵌類型的序列化委托給對應類型的序列化器。

      4 顯式指定 TypeInformation

      • 大多數情況下,Flink 可以自動推斷類型生成正確的 TypeInformation,并選擇合適的序列化器比較器

      • Flink 的類型提取器利用反射機制來分析函數簽名以及子類信息,生成函數的正確輸出類型。

      • 但是有時無法提取必要的信息,例如定義函數時如果使用到了泛型,JVM 就會出現類型擦除的問題,使得 Flink 并不能很容易地獲取到數據集中的數據類型信息。這時候可能會拋出如下類似的異常:

      Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(ReturnsExample.java:21)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
      	at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:479)
      	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1236)
      	at org.apache.flink.streaming.api.datastream.DataStream.print(DataStream.java:937)
      ...
      Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
      ...
      
      • 此外,在某些情況下,Flink 選擇的 TypeInformation 可能無法生成【最有效的序列化器】和【反序列化器】。

      因此,你可能需要為你使用的數據類型顯式地提供 TypeInformation。

      • 我們首先看一下如何創建 TypeInformation,然后再看一下如何為函數指定 TypeInformation。

      4.1 創建 TypeInformation

      4.1.1 of 方法

      • 對于非泛型的類型,可以使用 TypeInformation 的 of(Class typeClass) 函數直接傳入 Class 就可以創建 TypeInformation:
      // 示例1 非泛型類型 直接傳入 Class 對象
      DataStream<WordCount> result1 = env.fromElements("a b a")
              .flatMap((String value, Collector<WordCount> out) -> {
                  for(String word : value.split("\\s")) {
                      out.collect(new WordCount(word, 1));
                  }
              })
              .returns(TypeInformation.of(WordCount.class));
      result1.print("R1");
      

      上述方法僅適用于非泛型類型。

      • 如果是泛型類型,可以借助 TypeHint 為泛型類型創建 TypeInformation:
      // 示例2 泛型類型(如: <String, Integer>) 需要借助 TypeHint
      DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
      result2.print("R2");
      

      4.1.2 TypeHint

      • 對于泛型類型,上面是通過 TypeInformation.of + TypeHint 來創建 TypeInformation,也可以單獨使用 TypeHint 來創建 TypeInformation:
      DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(new TypeHint<Tuple2<String, Integer>>() {}.getTypeInfo());
      result2.print("R2");
      
      • TypeHint 的原理是在內部創建匿名子類,捕獲泛型信息并會將其保留到運行時。運行時 TypeExtractor 可以獲取保存的實際類型。

      完整示例: https://github.com/sjf0115/data-example/blob/master/flink-example/src/main/java/com/flink/example/stream/base/typeInformation/hints/TypeHintExample.java

      4.1.3 預定義的快捷方式

      • 例如 BasicTypeInfo 類定義了一系列常用類型的快捷方式,對于 String、Boolean、Byte、Short、Integer、Long、Float、Double、Char 等基本類型的類型聲明,可以直接使用:

      image

      • 當然,如果覺得 BasicTypeInfo 還是太長,Flink 還提供了完全等價的 Types 類(org.apache.flink.api.common.typeinfo.Types):

      image

      • Types 為常見數據類型提供 TypeInformation,使用起來非常方便,如下示例:
      // 示例1 Types.TUPLE
      DataStream<Tuple2<String, Integer>> result1 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(Types.TUPLE(Types.STRING, Types.INT));
      result1.print("R1");
      
      // 示例2 Types.POJO
      DataStream<WordCount> result2 = env.fromElements("a b a")
              .flatMap((String value, Collector<WordCount> out) -> {
                  for(String word : value.split("\\s")) {
                      out.collect(new WordCount(word, 1));
                  }
              })
              .returns(Types.POJO(WordCount.class));
      result2.print("R2");
      

      完整示例: https://github.com/sjf0115/data-example/blob/master/flink-example/src/main/java/com/flink/example/stream/base/typeInformation/hints/TypesExample.java

      4.2 顯式提供類型信息

      • 當 Flink 無法自動推斷函數的生成類型是什么的時候,就需要我們顯示提供類型信息提示。從上面示例中我們知道可以通過 returns 顯示提供類型信息,除此之外還可以實現 ResultTypeQueryable 接口顯示提供。
      • org.apache.flink.api.java.typeutils.ResultTypeQueryable
      //樣例:
      public class CdcRecordValueToDtoMapFunction<OutDto> extends RichMapFunction<CdcRecord, Tuple2<CdcPrimaryKeys, OutDto>> implements ResultTypeQueryable {
      	//...
          @Override
          public TypeInformation getProducedType() {
              return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, Object>>(){} );
              //return TypeInformation.of( new TypeHint<Tuple2<CdcPrimaryKeys, OutDto>>(){} );//[錯誤示范] TypeHint<T> 的 T : 不支持泛型,將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint.
              //return TypeInformation.of( Tuple2.class );//[錯誤示范] 將報錯: Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot extract TypeInformation from Class alone, because generic parameters are missing. Please use TypeInformation.of(TypeHint) instead, or another equivalent method in the API that accepts a TypeHint instead of a Class. For example for a Tuple2<Long, String> pass a 'new TypeHint<Tuple2<Long, String>>(){}'.
              //return TypeInformation.of(new TypeHint< Object >() { });// TypeHint<T> 的 T : 不支持泛型
          }
      }
      

      4.2.1 returns

      第一種方法是使用 returns 為算子添加返回類型的類型信息提示。對于非泛型類型,可以直接傳入 Class 即可;對于泛型類型需要借助 TypeHint 提供類型信息提示,如下所示:

      // 示例1 非泛型類型 直接傳入 Class
      DataStream<WordCount> result1 = env.fromElements("a b a")
              .flatMap((String value, Collector<WordCount> out) -> {
                  for(String word : value.split("\\s")) {
                      out.collect(new WordCount(word, 1));
                  }
              })
              .returns(WordCount.class);
      result1.print("R1");
      
      // 示例2 泛型類型 優先推薦借助 TypeHint
      DataStream<Tuple2<String, Integer>> result2 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(new TypeHint<Tuple2<String, Integer>>() {});
      result2.print("R2");
      
      // 示例3 TypeInformation.of + TypeHint
      DataStream<Tuple2<String, Integer>> result3 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
      result3.print("R3");
      
      // 示例4 Types 快捷方式
      DataStream<Tuple2<String, Integer>> result4 = env.fromElements("a", "b", "a")
              .map(value -> Tuple2.of(value, 1))
              .returns(Types.TUPLE(Types.STRING, Types.INT));
      result4.print("R4");
      

      4.2.2 ResultTypeQueryable

      • 第二種方法是通過實現 ResultTypeQueryable 接口來擴展函數以顯式提供返回類型的 TypeInformation。

      如下示例是一個顯式提供返回類型的 MapFunction:

      public static class ResultTypeMapFunction implements MapFunction<String, Stu>, ResultTypeQueryable {
          @Override
          public Stu map(String value) throws Exception {
              String[] params = value.split(",");
              String name = params[0];
              int age = Integer.parseInt(params[1]);
              return new Stu(name, age);
          }
      
          @Override
          public TypeInformation getProducedType() {
              return Types.POJO(Stu.class);
          }
      }
      

      5 使用場景

      5.1 Table 轉 DataStream

      • Table 轉 DataStream 的時候,Table 并清楚 DataStream 的數據結構。

      因此,需要給當前轉換出來的 DataStream 顯性的指定數據類型:

      // 轉化為 Pojo 類型
      DataStream<WordCount> stream1 = tEnv.toAppendStream(table, Types.POJO(WordCount.class));
      
      // 轉換為 Row 類型
      DataStream<Row> stream2 = tEnv.toAppendStream(table, Types.ROW(Types.STRING, Types.LONG));
      

      5.2 Lambda 表達式與泛型

      • 由于 Java 泛型會出現類型擦除問題,因此 Flink 通過 Java 反射機制盡可能重構類型信息,例如使用函數簽名以及子類的信息等。

      對于函數的返回類型取決于輸入類型的情況時,會包含一些簡單的類型推斷。
      但如果無法重構所有的泛型類型信息時,需要借助于類型提示來告訴系統函數中傳入的參數類型信息和輸出參數信息。
      如下所示使用 returns 語句指定生成的類型:

      env.fromElements(1, 2, 3)
        .map(i -> Tuple2.of(i, i*i))
        // 如果不指定 returns 返回的 TypeInformation 會拋出異常
        .returns(Types.TUPLE(Types.INT, Types.INT))
        .print();
      

      Y 推薦文獻

      X 參考文獻

      遇到【泛型】的類型擦除問題

      posted @ 2025-08-12 00:34  千千寰宇  閱讀(56)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲中文字幕国产综合| 麻豆精品一区二区三区蜜臀| 淮安市| 国产亚洲中文字幕久久网| 亚洲狼人久久伊人久久伊| 99国产精品99久久久久久| 国产成人综合色在线观看网站 | 97人妻精品一区二区三区| 性中国videossexo另类| 日韩本精品一区二区三区| 欧美日韩精品一区二区三区高清视频 | 久久国产乱子精品免费女| 亚洲欧美中文日韩V在线观看| 人妻在线无码一区二区三区| 另类图片亚洲人妻中文无码| 亚洲一区中文字幕第十页| 国产成人精品三级麻豆| 久久亚洲精品人成综合网| 永新县| 中文字幕成人精品久久不卡| 亚洲av一本二本三本| 粉嫩一区二区三区国产精品| 免费播放一区二区三区| 高清精品一区二区三区| 蜜臀一区二区三区精品免费| 国产99视频精品免费专区| 国产在线午夜不卡精品影院| 国产初高中生粉嫩无套第一次 | 国产日韩精品视频无码| 成人午夜免费无码视频在线观看| 麻豆一区二区三区精品视频| 蜜臀av一区二区三区不卡| 日本一区不卡高清更新二区| 深夜视频国产在线观看| 久久综合亚洲色一区二区三区| 国产a在视频线精品视频下载| 欧美特级午夜一区二区三区| 国产精品久久久久无码网站| 久久久天堂国产精品女人| 亚洲国产精品成人av网| 中国女人熟毛茸茸A毛片|