<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構

      Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構

      0x00 摘要

      Alink 是阿里巴巴基于實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文是漫談系列的第二篇,將從源碼入手,帶領大家具體剖析Alink設計思想和架構為何。

      因為Alink的公開資料太少,所以均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。

      0x01 Alink設計原則

      前文中 Alink漫談(一) : 從KMeans算法實現看Alink設計思想 我們推測總結出Alink部分設計原則

      • 算法的歸算法,Flink的歸Flink,盡量屏蔽AI算法和Flink之間的聯系。

      • 采用最簡單,最常見的開發語言和思維方式。

      • 盡量借鑒市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。

      • 構建一套戰術打法(middleware或adapter),即屏蔽了Flink,又可以利用好Flink,還能讓用戶快速開發算法。

      下面我們就針對這些設計原則,從上至下看看Alink如何設計自己這套戰術打法。

      為了能讓大家更好理解,先整理一個概要圖。因為Alink系統主要可以分成三個層面(頂層流水線, 中間層算法組件, 底層迭代計算框架),再加上一個Flink runtime,所以下圖就是分別從這四個層面出發來看程序執行流程。

      如何看待 pipeline.fit(data).transform(data).print();
      
      // 從頂層流水線角度看
      訓練流水線  +-----> [VectorAssembler(Transformer)] -----> [KMeans(Estimator)]
                |      // KMeans.fit之后,會生成一個KMeansModel用來轉換
                |      
      轉換流水線  +-----> [VectorAssembler(Transformer)] -----> [KMeansModel(Transformer)]
      
      
      // 從中間層算法組件角度看    
      訓練算法組件 +-----> [MapBatchOp] -----> [KMeansTrainBatchOp]  
                 |       // VectorAssemblerMapper in MapBatchOp 是業務邏輯
                 |      
      轉換算法組件 +-----> [MapBatchOp] -----> [ModelMapBatchOp]
                         // VectorAssemblerMapper in MapBatchOp 是業務邏輯
                         // KMeansModelMapper in ModelMapBatchOp 是業務邏輯
       
        
      // 從底層迭代計算框架角度看
      訓練by框架 +-----> [VectorAssemblerMapper] -----> [KMeansPreallocateCentroid / KMeansAssignCluster / AllReduce / KMeansUpdateCentroids in IterativeComQueue]   
                |       // 映射到Flink的各種算子進行訓練
                |      
      轉換(直接) +-----> [VectorAssemblerMapper] -----> [KMeansModelMapper]    
                        // 映射到Flink的各種算子進行轉換 
        
      // 從Flink runtime角度看  
      訓練 +-----> map, mapPartiiton...
          |       // VectorAssemblerMapper.map等會被調用
          |      
      轉換 +-----> map, mapPartiiton...
                  // 比如調用 KMeansModelMapper.map 來轉換  
      

      0x02 Alink實例代碼

      示例代碼還是用之前的KMeans算法部分模塊。

      算法調用

      public class KMeansExample {
      	    public static void main(String[] args) throws Exception {
              ......
      
              BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);
      
              VectorAssembler va = new VectorAssembler()
                  .setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
                  .setOutputCol("features");
      
              KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
                  .setPredictionCol("prediction_result")
                  .setPredictionDetailCol("prediction_detail")
                  .setReservedCols("category")
                  .setMaxIter(100);
      
              Pipeline pipeline = new Pipeline().add(va).add(kMeans);
              pipeline.fit(data).transform(data).print();
          }
      }
      

      算法主函數

      public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
      	implements KMeansTrainParams <KMeansTrainBatchOp> {
      
      	static DataSet <Row> iterateICQ(...省略...) {
      
      		return new IterativeComQueue()
      			.initWithPartitionedData(TRAIN_DATA, data)
      			.initWithBroadcastData(INIT_CENTROID, initCentroid)
      			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
      			.add(new KMeansPreallocateCentroid())
      			.add(new KMeansAssignCluster(distance))
      			.add(new AllReduce(CENTROID_ALL_REDUCE))
      			.add(new KMeansUpdateCentroids(distance))
      			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
      			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
      			.setMaxIter(maxIter)
      			.exec();
      	}
      }  
      

      算法模塊舉例

      基于點計數和坐標,計算新的聚類中心。

      // Update the centroids based on the sum of points and point number belonging to the same cluster.
      public class KMeansUpdateCentroids extends ComputeFunction {
          @Override
          public void calc(ComContext context) {
      
              Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
              Integer k = context.getObj(KMeansTrainBatchOp.K);
              double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
      
              Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
              if (context.getStepNo() % 2 == 0) {
                  stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
              } else {
                  stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
              }
      
              stepNumCentroids.f0 = context.getStepNo();
              context.putObj(KMeansTrainBatchOp.K,
                  updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
          }
      }
      

      0x03 頂層 -- 流水線

      本部分實現的設計原則是 :盡量借鑒市面上通用的設計思路和開發模式,讓開發者無縫切換。

      1. 機器學習重要概念

      一個典型的機器學習過程從數據收集開始,要經歷多個步驟,才能得到需要的輸出。這非常類似于流水線式工作,即通常會包含源數據ETL(抽取、轉化、加載),數據預處理,指標提取,模型訓練與交叉驗證,新數據預測等步驟。

      先來說一下幾個重要的概念:

      • Transformer:轉換器,是一種可以將一個數據轉換為另一個數據的算法。比如一個模型就是一個 Transformer。它可以把一個不包含轉換標簽的測試數據集 打上標簽,轉化成另一個包含轉換標簽的特征數據。Transformer可以理解為特征工程,即:特征標準化、特征正則化、特征離散化、特征平滑、onehot編碼等。該類型有一個transform方法,用于fit數據之后,輸入新的數據,進行特征變換。
      • Estimator:評估器,它是學習算法或在訓練數據上的訓練方法的概念抽象。所有的機器學習算法模型,都被稱為估計器。在 Pipeline 里通常是被用來操作 數據并生產一個 Transformer。從技術上講,Estimator實現了一個方法fit(),它接受一個特征數據并產生一個轉換器。比如一個隨機森林算法就是一個 Estimator,它可以調用fit(),通過訓練特征數據而得到一個隨機森林模型。
      • PipeLine:工作流或者管道。工作流將多個工作流階段(轉換器和估計器)連接在一起,形成機器學習的工作流,并獲得結果輸出。
      • Parameter:Parameter 被用來設置 Transformer 或者 Estimator 的參數。

      2. Alink中概念實現

      從 Alink的目錄結構中 ,我們可以看出,Alink提供了這些常見概念(其中有些代碼借鑒了Flink ML)。

      ./java/com/alibaba/alink:
      common		operator	params		pipeline
        
      ./java/com/alibaba/alink/params:
      associationrule	evaluation	nlp		regression	statistics
      classification	feature		onlinelearning	shared		tuning
      clustering	io		outlier		similarity	udf
      dataproc	mapper		recommendation	sql		validators
      
      ./java/com/alibaba/alink/pipeline:
      EstimatorBase.java	ModelBase.java		Trainer.java		feature
      LocalPredictable.java	ModelExporterUtils.java	TransformerBase.java	nlp
      LocalPredictor.java	Pipeline.java		classification		recommendation
      MapModel.java		PipelineModel.java	clustering		regression
      MapTransformer.java	PipelineStageBase.java	dataproc		tuning
      

      比較基礎的是三個接口:PipelineStages,Transformer,Estimator,分別恰好對應了機器學習的兩個通用概念 :轉換器 ,評估器。PipelineStages是這兩個的基礎接口。

      // Base class for a stage in a pipeline. The interface is only a concept, and does not have any actual functionality. Its subclasses must be either Estimator or Transformer. No other classes should inherit this interface directly.
      public interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable 
      
      // A transformer is a PipelineStage that transforms an input Table to a result Table.
      public interface Transformer<T extends Transformer<T>> extends PipelineStage<T> 
       
      // Estimators are PipelineStages responsible for training and generating machine learning models.
      public interface Estimator<E extends Estimator<E, M>, M extends Model<M>> extends PipelineStage<E>
      

      其次是三個抽象類定義:PipelineStageBase,EstimatorBase,TransformerBase,分別就對應了以上的三個接口。其中定義了一些基礎操作,比如 fit,transform。

      // The base class for a stage in a pipeline, either an EstimatorBase or a TransformerBase.
      public abstract class PipelineStageBase<S extends PipelineStageBase<S>>
          implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable 
        
      // The base class for estimator implementations.
      public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
          extends PipelineStageBase<E> implements Estimator<E, M>   
        
      // The base class for transformer implementations.
      public abstract class TransformerBase<T extends TransformerBase<T>>
          extends PipelineStageBase<T> implements Transformer<T>  
      

      然后是Pipeline基礎類,這個類就可以把Transformer,Estimator聯系起來 。

      // A pipeline is a linear workflow which chains EstimatorBases and TransformerBases to execute an algorithm 
      public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
      	private ArrayList<PipelineStageBase> stages = new ArrayList<>();
        
        	public Pipeline add(PipelineStageBase stage) {
      		this.stages.add(stage);
      		return this;
      	}
      }
      

      最后是 Parameter 概念相關舉例,比如實例中用到的 VectorAssemblerParams。

      // Parameters for MISOMapper.
      public interface MISOMapperParams<T> extends HasSelectedCols <T>,  HasOutputCol <T>,
      	HasReservedCols <T> {}
      
      // parameters of vector assembler.
      public interface VectorAssemblerParams<T> extends MISOMapperParams<T> {
      ParamInfo <String> HANDLE_INVALID = ParamInfoFactory
      		.createParamInfo("handleInvalid", String.class)
      		.setDescription("parameter for how to handle invalid data (NULL values)")
      		.setHasDefaultValue("error")
      		.build();
      }
      

      綜合來說,因為模型和數據,在Alink運行時候,都統一轉化為Table類型,所以可以整理如下:

      • Transformer: 將input table轉換為output table。
      • Estimator:將input table轉換為模型。
      • 模型:將input table轉換為output table。

      3. 結合實例看流水線

      首先是一些基礎抽象類,比如:

      • MapTransformer是 flat map 的Transformer。
      • ModelBase是模型定義,也是一個Transformer。
      • Trainer是訓練模型定義,是EstimatorBase。
      // Abstract class for a flat map TransformerBase. 
      public abstract class MapTransformer<T extends MapTransformer <T>>
      		extends TransformerBase<T> implements LocalPredictable {  
      
      // The base class for a machine learning model.
      public abstract class ModelBase<M extends ModelBase<M>> extends TransformerBase<M>
          implements Model<M> 
      
      // Abstract class for a trainer that train a machine learning model.
      public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
      	extends EstimatorBase<T, M> 
      

      然后就是我們實例用到的兩個類型定義。

      • KMeans 是一個Trainer,其實現了EstimatorBase;
      • VectorAssembler 是一個TransformerBase。
      // 這是一個 EstimatorBase 類型
      public class KMeans extends Trainer <KMeans, KMeansModel> implements
      	KMeansTrainParams <KMeans>, KMeansPredictParams <KMeans> {
      	@Override
      	protected BatchOperator train(BatchOperator in) {
      		return new KMeansTrainBatchOp(this.getParams()).linkFrom(in);
      	}
      }
      
      // 這是一個 TransformerBase 類型
      public class VectorAssembler extends MapTransformer<VectorAssembler>
      	implements VectorAssemblerParams <VectorAssembler> {
      	public VectorAssembler(Params params) {
      		super(VectorAssemblerMapper::new, params);
      	}
      }
      

      實例中,分別構建了兩個流水線階段,然后這兩個實例就被鏈接到流水線上。

      VectorAssembler va = new VectorAssembler()
      KMeans kMeans = new KMeans()  
      Pipeline pipeline = new Pipeline().add(va).add(kMeans);
      
      // 能看出來,流水線上有兩個階段,分別是VectorAssembler和KMeans。
      
      pipeline = {Pipeline@1201} 
       stages = {ArrayList@2853}  size = 2
         
        0 = {VectorAssembler@1199} 
         mapperBuilder = {VectorAssembler$lambda@2859} 
         params = {Params@2860} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
           
        1 = {KMeans@1200} 
         params = {Params@2857} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
      

      0x04 中間層 -- 算法組件

      算法組件是中間層的概念,可以認為是真正實現算法的模塊/層次。主要作用是承上啟下。

      • 其上層是流水線各個階段,流水線的生成結果就是一個算法組件。算法組件的作用是把流水線的Estimator或者Transformer翻譯成具體算法。算法組件彼此是通過 linkFrom 串聯在一起
      • 其下層是"迭代計算框架",算法組件把具體算法邏輯中的計算/通信分成一個個小模塊,映射到Mapper Function 或者具體"迭代計算框架"的計算/通信 Function 上,這樣才能更好的利用Flink的各種優勢。
      • "迭代計算框架" 中,主要兩個部分是 Mapper Function 和 計算/通信 Function,其在代碼中分別對應Mapper,ComQueueItem。
      • Mapper Function 是映射Function(系統寫好了部分Mapper,用戶也可以根據算法來寫自己的Mapper);
      • 計算/通信 Function是專門為算法寫的專用Function(也分成 系統內置的,算法自定義的)。
      • 可以這么理解:各種Function是業務邏輯(組件)。算法組件只是提供運行規則,業務邏輯(組件)作為運行在算法組件上的插件。
      • 也可以這么理解 :算法組件就是框架,其把部分業務邏輯委托給Mapper或者ComQueueItem。

      比如

      • KMeans 是 Estimator,其對應算法組件是 KMeansTrainBatchOp。其業務邏輯(組件)也在這個類中,是由IterativeComQueue為基礎串聯起來的一系列算法類(ComQueueItem)。
      • VectorAssembler 是 Transformer,其對應算法組件是 MapBatchOp。其業務邏輯(組件)是VectorAssemblerMapper(其 map 函數會做業務邏輯,把將多個數值列按順序匯總成一個向量列)。
      public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>   implements KMeansTrainParams <KMeansTrainBatchOp> 
          
      // class for a flat map BatchOperator.
      public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> 
      

      無論是調用Estimator.fit 還是 Transformer.transform,其本質都是通過linkFrom函數,把各個Operator聯系起來,這樣就把數據流串聯起來。然后就可以逐步映射到Flink具體運行計劃上

      1. Algorithm operators

      AlgoOperator是算子組件的基類,其子類有BatchOperator和StreamOperator,分別對應了批處理和流處理。

      // Base class for algorithm operators.
      public abstract class AlgoOperator<T extends AlgoOperator<T>>
          implements WithParams<T>, HasMLEnvironmentId<T>, Serializable 
      
      // Base class of batch algorithm operators.
      public abstract class BatchOperator<T extends BatchOperator <T>> extends AlgoOperator <T> {
          // Link this object to BatchOperator using the BatchOperators as its input.
        	public abstract T linkFrom(BatchOperator <?>... inputs);
        
          public <B extends BatchOperator <?>> B linkTo(B next) {
            return link(next);
          }
          public BatchOperator print() throws Exception {
            return linkTo(new PrintBatchOp().setMLEnvironmentId(getMLEnvironmentId()));
          }  
      }
      
      public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator <T>
      

      示例代碼如下:

      // 輸入csv文件被轉化為一個BatchOperator
      BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);
      
      ...
      
      pipeline.fit(data).transform(data).print();
      

      2. Mapper(提前說明)

      Mapper是底層迭代計算框架的一部分,是業務邏輯(組件)。從目錄結構能看出。這里提前說明,是因為在流水線講解過程中大量涉及,所以就提前放在這里說明

      ./java/com/alibaba/alink/common
      linalg mapper model comqueue utils io
      

      Mapper的幾個主要類定義如下,其作用廣泛,即可以映射輸入到輸出,也可以映射模型到具體數值

      // Abstract class for mappers.
      public abstract class Mapper implements Serializable {}
      
      // Abstract class for mappers with model.
      public abstract class ModelMapper extends Mapper {}
      
      // Find  the closest cluster center for every point.
      public class KMeansModelMapper extends ModelMapper {}
      
      // Mapper with Multi-Input columns and Single Output column(MISO).
      public abstract class MISOMapper extends Mapper {}
      
      // This mapper maps many columns to one vector. the columns should be vector or numerical columns.
      public class VectorAssemblerMapper extends MISOMapper {}
      

      Mapper的業務邏輯依賴于算法組件來運作,比如 [ VectorAssemblerMapper in MapBatchOp ] ,[ KMeansModelMapper in ModelMapBatchOp ]。

      ModelMapper具體運行則需要依賴 ModelMapperAdapter 來和Flink runtime聯系起來。ModelMapperAdapter繼承了RichMapFunction,ModelMapper作為其成員變量,在map操作中執行業務邏輯,ModelSource則是數據來源

      對應本實例,KMeansModelMapper 就是最后轉換的 BatchOperator,其map函數用來轉換

      3. 系統內置算法組件

      系統內置了一些常用的算法組件,比如:

      • MapBatchOp 功能是基于輸入來flat map,是 VectorAssembler 返回的算法組件。
      • ModelMapBatchOp 功能是基于模型進行flat map,是 KMeans 返回的算法組件。

      以 ModelMapBatchOp 為例給大家說明其作用,從下面代碼注釋中可以看出,linkFrom作用是:

      • 把inputs"算法組件" 和 本身"算法組件" 聯系起來,這就形成了一個算法邏輯鏈
      • 把業務邏輯映射成 "Flink算子",這就形成了一個 "Flink算子鏈"
      public class ModelMapBatchOp<T extends ModelMapBatchOp<T>> extends BatchOperator<T> {
      	@Override
      	public T linkFrom(BatchOperator<?>... inputs) {
      		checkOpSize(2, inputs);
      
      		try {
      			BroadcastVariableModelSource modelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME);
            // mapper是映射函數
      			ModelMapper mapper = this.mapperBuilder.apply(
      					inputs[0].getSchema(),
      					inputs[1].getSchema(),
      					this.getParams());
            // modelRows 是模型
      			DataSet<Row> modelRows = inputs[0].getDataSet().rebalance();
            // resultRows 是輸入數據的映射變化
      			DataSet<Row> resultRows = inputs[1].getDataSet()
      					.map(new ModelMapperAdapter(mapper, modelSource))
                 // 把模型作為廣播變量,后續會在 ModelMapperAdapter 中使用
      					.withBroadcastSet(modelRows, BROADCAST_MODEL_TABLE_NAME);
      
      			TableSchema outputSchema = mapper.getOutputSchema();
      			this.setOutput(resultRows, outputSchema);
      			return (T) this;
      		} catch (Exception ex) {
      			throw new RuntimeException(ex);
      		}
      	}
      }	
      

      ModelMapperAdapter

      ModelMapperAdapter 是適配器的實現,用來在flink上運行業務邏輯Mapper。從代碼可以看出,ModelMapperAdapter取出之前存儲的mapper和模型數據,然后基于此來進行具體算法業務。

      /**
       * Adapt a {@link ModelMapper} to run within flink.
      
       * This adapter class hold the target {@link ModelMapper} and it's {@link ModelSource}. Upon open(), it will load model rows from {@link ModelSource} into {@link ModelMapper}.
       */
      public class ModelMapperAdapter extends RichMapFunction<Row, Row> implements Serializable {
      
          /**
           * The ModelMapper to adapt.
           */
          private final ModelMapper mapper;
      
          /**
           * Load model data from ModelSource when open().
           */
          private final ModelSource modelSource;
      
          public ModelMapperAdapter(ModelMapper mapper, ModelSource modelSource) {
              // mapper是業務邏輯,modelSource是模型Broadcast source
              this.mapper = mapper; // 在map操作中執行業務邏輯
              this.modelSource = modelSource; // 數據來源
          }
      
          @Override
          public void open(Configuration parameters) throws Exception {
              // 從廣播變量中獲取模型數據
              List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext());
              this.mapper.loadModel(modelRows);
          }
      
          @Override
          public Row map(Row row) throws Exception {
              // 執行業務邏輯,在數據來源上轉換
              return this.mapper.map(row);
          }
      }
      

      4. 訓練階段fit

      pipeline.fit(data) 之中,會沿著流水線依次執行。如果流水線下一個階段遇到了是Transformer,就調用其transform;如果遇到的是EstimatorBase,就先調用其fit,把EstimatorBase轉換為Transformer,然后再次調用這個轉換出來的Transformer.transform。就這樣一個一個階段執行。

      4.1 具體流水線處理

      1. 如果流水線下一階段遇到EstimatorBase,會處理EstimatorBase的fit,把流水線上的Estimator轉換為 TransformerBase。Estimator.fit 接受一個特征數據并產生一個轉換器。

        如果這個階段 不是 流水線最后一個階段)會對這個 TransformerBase繼續處理。處理之后才能進入到流水線下一階段。

        如果這個階段 是 流水線最后一個階段)不會對這個 TransformerBase 做處理,直接結束流水線 fit 操作。

      2. 如果流水線下一階段遇到TransformerBase,就直接調用其transform函數。

      3. 對于所有需要處理的TransformerBase,無論是從EstimatorBase轉換出來的,還是Pipeline原有的 ,都調用其transform函數,轉換其input。input = transformers[i].transform(input); 。這樣每次轉換后的輸出再次賦值給input,作為流水線下一階段的輸入。

      4. 最后得到一個PipelineModel (其本身也是一個Transformer) ,這個屬于下一階段轉換流水線

      4.2 結合本實例概述

      本實例有兩個stage。VectorAssembler是Transformer,KMeans是EstimatorBase。

      這時候Pipeline其內部變量是:

      this = {Pipeline@1195} 
       stages = {ArrayList@2851}  size = 2
           
        0 = {VectorAssembler@1198} 
         mapperBuilder = {VectorAssembler$lambda@2857} 
         params = {Params@2858} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
             
        1 = {KMeans@2856} 
         params = {Params@2860} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
          params = {HashMap@2862}  size = 6
      
      • Pipeline 先調用Transformer類型的VectorAssembler,來處理其input (就是csv的BatchOperator)。這個處理csv是通過linkFrom(input)來構建的。處理之后再包裝成一個MapBatchOp返回賦值給input。
      • 其次調用EstimatorBase類型的Kmeans.fit函數,對input (就是 VectorAssembler 返回的MapBatchOp) 進行fit。fit過程中調用了KMeansTrainBatchOp.linkFrom來設置,fit生成了一個KMeansModel (Transformer)。因為這時候已經是流水線最后一步,所以不做后續的KMeansModel.transform操作。KMeansModel 就是訓練出來的判斷模型
      • 在上述調用過程中,會在transformers數組中記錄運算過的TransformerBase和EstimatorBase適配出來的Transformer。
      • 最后以這個transformers數組為參數,生成一個 PipelineModel (其也是一個Transformer類型)。生成 PipelineModel 的目的是:PipelineModel是后續轉換中的新流水線

      PipelineMode 的新流水線處理流程是:從 csv 讀入/ 映射(VectorAssembler 處理),然后 KMeansModel 做轉換(下一節會具體介紹)。

      fit 具體代碼是

      public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
        
          // Train the pipeline with batch data.
        	public PipelineModel fit(BatchOperator input) {
            
            int lastEstimatorIdx = getIndexOfLastEstimator();
            TransformerBase[] transformers = new TransformerBase[stages.size()];
            for (int i = 0; i < stages.size(); i++) {
              PipelineStageBase stage = stages.get(i);
              if (i <= lastEstimatorIdx) {
                if (stage instanceof EstimatorBase) {
                  // 這里會把流水線上的具體 Algorithm operators 通過 linkFrom 函數串聯起來。
                  transformers[i] = ((EstimatorBase) stage).fit(input); 
                } else if (stage instanceof TransformerBase) {
                  transformers[i] = (TransformerBase) stage;
                }
                // 注意,如果是流水線最后一個階段,則不做transform處理。
                if (i < lastEstimatorIdx) {
                  // 這里會調用到具體Transformer的transform函數,其會把流水線上的具體 Algorithm operators 通過 linkFrom 函數串聯起來。
                  input = transformers[i].transform(input);
                }
              } else {
                transformers[i] = (TransformerBase) stage;
              }
            }
            // 這里生成了一個PipelineModel,transformers會作為參數傳給他
            return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());   
          }
      }
      
      // MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,然后再調用MapBatchOp.linkFrom。
      public abstract class MapTransformer<T extends MapTransformer <T>>
      		extends TransformerBase<T> implements LocalPredictable {
      	@Override
      	public BatchOperator transform(BatchOperator input) {
      		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
      	}
      }
      
      // Trainer是KMeans的基類。
      public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
      	@Override
      	public M fit(BatchOperator input) {
          // KMeans.train 會調用 KMeansTrainBatchOp(this.getParams()).linkFrom(in);
          // createModel會生成一個新的model,本示例中是 com.alibaba.alink.pipeline.clustering.KMeansModel
        		return createModel(train(input).getOutputTable()); 
      	}
      }
      

      下面會逐一論述這兩個環節。

      4.3 VectorAssembler.transform

      這部分作用是把csv數據轉化為KMeans訓練所需要的數據類型。

      VectorAssembler.transform會調用到MapBatchOp.linkFrom。linkFrom首先把 csv input 進行了轉換,變成DataSet,然后以此為參數生成一個MapBatchOp返回,這個返回的 MapBatchOp。其業務邏輯是在 VectorAssemblerMapper 中實現的(將多個數值列按順序匯總成一個向量列)。

      public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> {
          public T linkFrom(BatchOperator<?>... inputs) {
              BatchOperator in = checkAndGetFirst(inputs);
      
              try {
                  Mapper mapper = (Mapper)this.mapperBuilder.apply(in.getSchema(), this.getParams());
                  // 這里對csv輸入進行了map,這里只是生成邏輯執行計劃,具體操作會在print之后才做的。
                  DataSet<Row> resultRows = in.getDataSet().map(new MapperAdapter(mapper));
                  TableSchema resultSchema = mapper.getOutputSchema();
                  this.setOutput(resultRows, resultSchema);
                  return this;
              } catch (Exception var6) {
                  throw new RuntimeException(var6);
              }
          }    
      }
      
      // MapBatchOp本身
      this = {MapBatchOp@3748} "UnnamedTable$1"
       mapperBuilder = {VectorAssembler$lambda@3744} 
       params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
       output = {TableImpl@5862} "UnnamedTable$1"
       sideOutputs = null
           
      // mapper就是業務邏輯模塊
      mapper = {VectorAssemblerMapper@5785} 
       handleInvalid = {VectorAssemblerMapper$HandleType@5813} "ERROR"
       outputColsHelper = {OutputColsHelper@5814} 
       colIndices = {int[4]@5815} 
       dataFieldNames = {String[5]@5816} 
       dataFieldTypes = {DataType[5]@5817} 
       params = {Params@5818} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
           
      // 返回數值如下
      resultRows = {MapOperator@5788} 
       function = {MapperAdapter@5826} 
        mapper = {VectorAssemblerMapper@5785} 
       defaultName = "linkFrom(MapBatchOp.java:35)"      
           
      // 調用棧如下
      
      linkFrom:31, MapBatchOp (com.alibaba.alink.operator.batch.utils)
      transform:34, MapTransformer (com.alibaba.alink.pipeline)
      fit:122, Pipeline (com.alibaba.alink.pipeline)
      main:31, KMeansExample (com.alibaba.alink)
      

      4.4 KMeans.fit

      這部分就是訓練模型

      KMeans是一個Trainer,其進而實現了EstimatorBase類型,所以流水線就調用到了其fit函數

      KMeans.fit就是調用了Trainer.fit。

      • Trainer.fit首先調用train函數,最終調用KMeansTrainBatchOp.linkFrom,這樣就和VectorAssembler串聯起來。KMeansTrainBatchOp 把VectorAssembler返回的 MapBatchOp進行處理。最后返回一個同樣類型KMeansTrainBatchOp。
      • Trainer.fit其次調用Trainer.createModel,該函數會根據this的類型決定應該生成什么Model。對于 KMeans,就生成了KMeansModel。

      因為KMeans是流水線最后一個階段,這時候不調用 input = transformers[i].transform(input); 所以目前還是訓練,生成一個模型 KMeansModel。

      // 實際部分代碼    
      
      Trainer.fit(BatchOperator input) {
      		return createModel(train(input).getOutputTable());
      }
        
      public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
      	implements KMeansTrainParams <KMeansTrainBatchOp> {	
          
          	public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
                  DataSet <Row> finalCentroid = iterateICQ(initCentroid, data,
                      vectorSize, maxIter, tol, distance, distanceType, vectorColName, null, null);
                  this.setOutput(finalCentroid, new KMeansModelDataConverter().getModelSchema());
                  return this;            
              }
      }
      
      // 變量內容
      			
      this = {KMeansTrainBatchOp@5887} 
       params = {Params@5895} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
       output = null
       sideOutputs = null
      inputs = {BatchOperator[1]@5888} 
       0 = {MapBatchOp@3748} "UnnamedTable$1"
        mapperBuilder = {VectorAssembler$lambda@3744} 
        params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
        output = {TableImpl@5862} "UnnamedTable$1"
        sideOutputs = null			
      			
      // 調用棧如下			
      			
      linkFrom:84, KMeansTrainBatchOp (com.alibaba.alink.operator.batch.clustering)
      train:31, KMeans (com.alibaba.alink.pipeline.clustering)
      fit:34, Trainer (com.alibaba.alink.pipeline)
      fit:117, Pipeline (com.alibaba.alink.pipeline)
      main:31, KMeansExample (com.alibaba.alink)			
      

      KMeansTrainBatchOp.linkFrom是算法重點。這里其實就是生成了算法所需要的一切前提,把各種Flink算子搭建好。后續會再提到。

      fit函數生成了 KMeansModel,其transform函數在基類MapModel中實現,會在下一個transform階段完成調用。這個就是訓練出來的KMeans模型,其也是一個Transformer。

      // Find  the closest cluster center for every point.
      public class KMeansModel extends MapModel<KMeansModel>
      	implements KMeansPredictParams <KMeansModel> {
      
      	public KMeansModel(Params params) {
      		super(KMeansModelMapper::new, params);
      	}
      }
      

      4.5 生成新的轉換流水線

      前面說到了,Pipeline的fit函數,返回一個PipelineModel。這個PipelineModel在后續會繼續調用transform,完成轉換階段。

      return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());
      

      5. 轉換階段transform

      轉換階段的流水線,依然要從VectorAssembler入手來讀取csv,進行map處理。然后調用 KMeansModel。

      PipelineModel會繼續調用transform函數。其作用是把Transformer轉化為BatchOperator。這時候其內部變量如下,看出來已經從最初流水線各種類型參雜 轉換為 統一transform實例。

      this = {PipelineModel@5016} 
       transformers = {TransformerBase[2]@5017} 
      
        0 = {VectorAssembler@1198} 
         mapperBuilder = {VectorAssembler$lambda@2855} 
         params = {Params@2856} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
           
        1 = {KMeansModel@5009} 
         mapperBuilder = {KMeansModel$lambda@5011} 
         modelData = {TableImpl@4984} "UnnamedTable$2"
         params = {Params@5012} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
       modelData = null
       params = {Params@5018} "Params {MLEnvironmentId=0}"
      
      • 第一次transform調用到了MapBatchOp.linkFrom,就是VectorAssembler.transform調用到的,其作用和 在 fit 流水線中起到的作用一樣,下面注釋中有解釋。

      • 第二次transform調用到了ModelMapBatchOp.linkFrom,就是KMeansModel.transform間接調用到的。下面注釋中有解釋。

      這兩次 transform 的調用生成了 BatchOperator 的串聯。最終返回結果是 ModelMapBatchOp,即一個BatchOperator。轉換將由ModelMapBatchOp來轉換。

      // The model fitted by Pipeline.
      public class PipelineModel extends ModelBase<PipelineModel> implements LocalPredictable {
          @Override
          public BatchOperator<?> transform(BatchOperator input) {
              for (TransformerBase transformer : this.transformers) {
                  input = transformer.transform(input);
              }
              return input;
          }  
      }
         
      // 經過變化后,得到一個最終的轉化結果 BatchOperator,以此來轉換
      // {KMeansModel$lambda@5050} 就是 KMeansModelMapper,轉換邏輯。
      
      input = {ModelMapBatchOp@5047} "UnnamedTable$3"
       mapperBuilder = {KMeansModel$lambda@5050} 
       params = {Params@5051} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
        params = {HashMap@5058}  size = 6
         "vectorCol" -> ""features""
         "maxIter" -> "100"
         "reservedCols" -> "["category"]"
         "k" -> "3"
         "predictionCol" -> ""prediction_result""
         "predictionDetailCol" -> ""prediction_detail""
       output = {TableImpl@5052} "UnnamedTable$3"
        tableEnvironment = {BatchTableEnvironmentImpl@5054} 
        operationTree = {DataSetQueryOperation@5055} 
        operationTreeBuilder = {OperationTreeBuilder@5056} 
        lookupResolver = {LookupCallResolver@5057} 
        tableName = "UnnamedTable$3"
       sideOutputs = null
          
      // MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,然后再調用MapBatchOp.linkFrom。
      public abstract class MapTransformer<T extends MapTransformer <T>>
      		extends TransformerBase<T> implements LocalPredictable {
      	@Override
      	public BatchOperator transform(BatchOperator input) {
      		return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
      	}  
      }
          
      // MapModel是KMeansModel的基類,transform會生成一個ModelMapBatchOp,然后再調用ModelMapBatchOp.linkFrom。
      public abstract class MapModel<T extends MapModel<T>>
      		extends ModelBase<T> implements LocalPredictable {
      	@Override
      	public BatchOperator transform(BatchOperator input) {
      		return new ModelMapBatchOp(this.mapperBuilder, this.params)
      				.linkFrom(BatchOperator.fromTable(this.getModelData())
      					.setMLEnvironmentId(input.getMLEnvironmentId()), input);
      	}
      }  
      

      在這兩個linkFrom中,還是分別生成了兩個MapOperator,然后拼接起來,構成了一個 BatchOperator 串。從上面代碼中可以看出,KMeansModel對應的ModelMapBatchOp,其linkFrom會返回一個ModelMapperAdapter。ModelMapperAdapter是一個RichMapFunction類型,它會把KMeansModelMapper作為RichMapFunction.function成員變量保存起來。然后會調用 .map(new ModelMapperAdapter(mapper, modelSource)),map就是Flink算子,這樣轉換算法就和Flink聯系起來了

      最后 Keans 算法的轉換工作是通過 KMeansModelMapper.map 來完成的

      6. 運行

      我們都知道,Flink程序中,為了讓程序運行,需要

      • 獲取execution environment : 調用類似 getExecutionEnvironment() 來獲取environment;
      • 觸發程序執行 : 調用類似 env.execute("KMeans Example"); 來真正執行。

      Alink其實就是一個Flink應用,只不過要比普通Flink應用復雜太多。

      但是從實例代碼中,我們沒有看到類似調用。這說明Alink封裝的非常好,但是作為好奇的程序員,我們需要知道究竟這些調用隱藏在哪里。

      獲取執行環境

      Alink是在Pipeline執行的時候,獲取到運行環境。具體來說,因為csv文件是最初的輸入,所以當transform調用其 in.getSchema() 時候,會獲取運行環境。

      public final class CsvSourceBatchOp extends BaseSourceBatchOp<CsvSourceBatchOp>
          implements CsvSourceParams<CsvSourceBatchOp> {
          @Override
          public Table initializeDataSource() {
            ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
          }
      }
      
      initializeDataSource:77, CsvSourceBatchOp (com.alibaba.alink.operator.batch.source)
      getOutputTable:52, BaseSourceBatchOp (com.alibaba.alink.operator.batch.source)
      getSchema:180, AlgoOperator (com.alibaba.alink.operator)
      linkFrom:34, MapBatchOp (com.alibaba.alink.operator.batch.utils)
      transform:34, MapTransformer (com.alibaba.alink.pipeline)
      fit:122, Pipeline (com.alibaba.alink.pipeline)
      main:31, KMeansExample (com.alibaba.alink)
      

      觸發程序運行

      截止到現在,Alink已經做了很多東西,也映射到了 Flink算子上,那么究竟什么地方才真正和Flink聯系起來呢?

      print 調用的是BatchOperator.print,真正從這里開始,會一層一層調用下去,最后來到

      package com.alibaba.alink.operator.batch.utils;
      
      public class PrintBatchOp extends BaseSinkBatchOp<PrintBatchOp> {
      	@Override
      	protected PrintBatchOp sinkFrom(BatchOperator in) {
      		this.setOutputTable(in.getOutputTable());
      		if (null != this.getOutputTable()) {
      			try {
                      // 在這個 collect 之后,會進入到 Flink 的runtime之中。
      				List <Row> rows = DataSetConversionUtil.fromTable(getMLEnvironmentId(), this.getOutputTable()).collect();
      				batchPrintStream.println(TableUtil.formatTitle(this.getColNames()));
      				for (Row row : rows) {
      					batchPrintStream.println(TableUtil.formatRows(row));
      				}
      			} catch (Exception ex) {
      				throw new RuntimeException(ex);
      			}
      		}
      		return this;
      	}    
      }
      

      在 LocalEnvironment 這里把Alink和Flink的運行環境聯系起來。

      public class LocalEnvironment extends ExecutionEnvironment {
      	@Override
      	public String getExecutionPlan() throws Exception {
      		Plan p = createProgramPlan(null, false);
              
              // 下面會真正的和Flink聯系起來。
      		if (executor != null) {
      			return executor.getOptimizerPlanAsJSON(p);
      		}
      		else {
      			PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
      			return tempExecutor.getOptimizerPlanAsJSON(p);
      		}
      	}    
      }
      
      // 調用棧如下
      
      execute:91, LocalEnvironment (org.apache.flink.api.java)
      execute:820, ExecutionEnvironment (org.apache.flink.api.java)
      collect:413, DataSet (org.apache.flink.api.java)
      sinkFrom:40, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
      sinkFrom:18, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
      linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
      linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
      link:89, BatchOperator (com.alibaba.alink.operator.batch)
      linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
      print:337, BatchOperator (com.alibaba.alink.operator.batch)
      main:31, KMeansExample (com.alibaba.alink)
      

      0x05 底層--迭代計算框架

      這里對應如下設計原則:

      • 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又可以利用好Flink,還可以讓用戶基于此可以快速開發算法
      • 采用最簡單,最常見的開發語言和開發模式。

      讓我們想想看,大概有哪些基礎工作需要做:

      • 如何初始化
      • 如何通信
      • 如何分割代碼,如何廣播代碼
      • 如何分割數據,如何廣播數據
      • 如何迭代算法

      其中最重要的概念是IterativeComQueue,這是把通信或者計算抽象成ComQueueItem,然后把ComQueueItem串聯起來形成隊列。這樣就形成了面向迭代計算場景的一套迭代通信計算框架。

      再次把目錄結構列在這里:

      ./java/com/alibaba/alink/common:
      MLEnvironment.java		linalg MLEnvironmentFactory.java	mapper
      VectorTypes.java		model comqueue			utils io
      

      里面大致有 :

      • Flink 封裝模塊 :MLEnvironment.java, MLEnvironmentFactory.java。
      • 線性代數模塊:linalg。
      • 計算/通訊隊列模塊:comqueue,其中ComputeFunction進行計算,比如訓練算法。
      • 映射模塊:mapper,其中Mapper進行各種映射,比如 ModelMapper 把模型映射為數值(就是轉換算法)。
      • 模型 :model,主要是用來讀取model source。
      • 基礎模塊:utils,io。

      算法組件在其linkFrom函數中,會做如下操作:

      • 先進行部分初始化,此時會調用部分Flink算子,比如groupBy等等。
      • 再將算法邏輯剝離出來,委托給Mapper或者ComQueueItem。
      • Mapper或者ComQueueItem會調用Flink map算子或者mapPartition算子等。
      • 調用Flink算子過程就是把算法分割然后適配到Flink上的過程。

      下面就一一闡述。

      1. Flink上下文封裝

      MLEnvironment 是個重要的類。其封裝了Flink開發所必須要的運行上下文。用戶可以通過這個類來獲取各種實際運行環境,可以建立table,可以運行SQL語句。

      /**
       * The MLEnvironment stores the necessary context in Flink.
       * Each MLEnvironment will be associated with a unique ID.
       * The operations associated with the same MLEnvironment ID
       * will share the same Flink job context.
       */
      public class MLEnvironment {
          private ExecutionEnvironment env;
          private StreamExecutionEnvironment streamEnv;
          private BatchTableEnvironment batchTableEnv;
          private StreamTableEnvironment streamTableEnv;
      }
      

      2. Function

      Function是計算框架中,對于計算和通訊等業務邏輯的最小模塊。具體定義如下。

      • ComputeFunction 是計算模塊。
      • CommunicateFunction 是通訊模塊。CommunicateFunction和ComputeFunction都是ComQueueItem子類,它們是業務邏輯實現者。
      • CompareCriterionFunction 是判斷模塊,用來判斷何時結束循環。這就允許用戶指定迭代終止條件。
      • CompleteResultFunction 用來在結束循環時候調用,作為循環結果。
      • Mapper也是一種Funciton,即Mapper Function。

      后續將統稱為 Function。

      /**
       * Basic build block in {@link BaseComQueue}, for either communication or computation.
       */
      public interface ComQueueItem extends Serializable {}
      
      /**
       * An BaseComQueue item for computation.
       */
      public abstract class ComputeFunction implements ComQueueItem {
      
      	/**
      	 * Perform the computation work.
      	 *
      	 * @param context to get input object and update output object.
      	 */
      	public abstract void calc(ComContext context);
      }
      
      /**
       * An BaseComQueue item for communication.
       */
      public abstract class CommunicateFunction implements ComQueueItem {
      
          /**
           * Perform communication work.
           *
           * @param input     output of previous queue item.
           * @param sessionId session id for shared objects.
           * @param <T>       Type of dataset.
           * @return result dataset.
           */
      	public abstract <T> DataSet <T> communicateWith(DataSet <T> input, int sessionId);
      }
      

      結合我們代碼來看,KMeansTrainBatchOp算法組件的部分作用是:KMeans算法被分割成若干CommunicateFunction。然后被添加到計算通訊隊列上。

      下面代碼中,具體 Item 如下:

      • ComputeFunction :KMeansPreallocateCentroid,KMeansAssignCluster,KMeansUpdateCentroids
      • CommunicateFunction :AllReduce
      • CompareCriterionFunction :KMeansIterTermination
      • CompleteResultFunction : KMeansOutputModel

      即算法實現的主要工作是:

      • 構建了一個IterativeComQueue。
      • 初始化數據,這里有兩種辦法:initWithPartitionedData將DataSet分片緩存至內存。initWithBroadcastData將DataSet整體緩存至每個worker的內存。
      • 將計算分割為若干ComputeFunction,串聯在IterativeComQueue
      • 運用AllReduce通信模型完成了數據同步
      	static DataSet <Row> iterateICQ(...省略...) {
      
      		return new IterativeComQueue()
      			.initWithPartitionedData(TRAIN_DATA, data)
      			.initWithBroadcastData(INIT_CENTROID, initCentroid)
      			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
      			.add(new KMeansPreallocateCentroid())
      			.add(new KMeansAssignCluster(distance))
      			.add(new AllReduce(CENTROID_ALL_REDUCE))
      			.add(new KMeansUpdateCentroids(distance))
      			.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
      			.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
      			.setMaxIter(maxIter)
      			.exec();
      	}
      

      3. 計算/通訊隊列

      BaseComQueue 就是這個迭代框架的基礎。它維持了一個 List<ComQueueItem> queue。用戶在生成算法模塊時候,會把各種 Function 添加到隊列中。

      IterativeComQueue 是 BaseComQueue 的缺省實現,具體實現了setMaxIter,setCompareCriterionOfNode0兩個函數。

      BaseComQueue兩個重要函數是:

      • optimize 函數:把隊列上相鄰的 ComputeFunction串聯起來,形成一個 ChainedComputation。在框架中進行優化,就是Alink的一個優勢所在
      • exec 函數:運行隊列上的各個 Function,返回最終的 Dataset。實際上,這里才真正到了 Flink,比如把計算隊列上的各個 ComputeFunction 映射到 Flink 的 RichMapPartitionFunction。然后在mapPartition函數調用中,會調用真實算法邏輯片斷 computation.calc(context);

      可以認為,BaseComQueue 是個邏輯概念,讓算法工程師可以更好的組織自己的業務語言。而通過其exec函數把算法邏輯映射到Flink算子上。這樣在某種程度上起到了與Flink解耦合的作用。

      具體定義(摘取函數內部分代碼)如下:

      // Base class for the com(Computation && Communicate) queue.
      public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {
      
      	/**
      	 * All computation or communication functions.
      	 */
      	private final List<ComQueueItem> queue = new ArrayList<>();
          
      	/**
      	 * The function executed to decide whether to break the loop.
      	 */
      	private CompareCriterionFunction compareCriterion;
      
      	/**
      	 * The function executed when closing the iteration
      	 */
      	private CompleteResultFunction completeResult;    
          
      	private void optimize() {
      		if (queue.isEmpty()) {
      			return;
      		}
      
      		int current = 0;
      		for (int ahead = 1; ahead < queue.size(); ++ahead) {
      			ComQueueItem curItem = queue.get(current);
      			ComQueueItem aheadItem = queue.get(ahead);
      
                  // 這里進行判斷,是否是前后都是 ComputeFunction,然后合并成 ChainedComputation
      			if (aheadItem instanceof ComputeFunction && curItem instanceof ComputeFunction) {
      				if (curItem instanceof ChainedComputation) {
      					queue.set(current, ((ChainedComputation) curItem).add((ComputeFunction) aheadItem));
      				} else {
      					queue.set(current, new ChainedComputation()
      						.add((ComputeFunction) curItem)
      						.add((ComputeFunction) aheadItem)
      					);
      				}
      			} else {
      				queue.set(++current, aheadItem);
      			}
      		}
      
      		queue.subList(current + 1, queue.size()).clear();
      	}    
          
      	/**
      	 * Execute the BaseComQueue and get the result dataset.
      	 *
      	 * @return result dataset.
      	 */
      	public DataSet<Row> exec() {
              
      		optimize();
      
      		IterativeDataSet<byte[]> loop
      			= loopStartDataSet(executionEnvironment)
      			.iterate(maxIter);
      
      		DataSet<byte[]> input = loop
      			.mapPartition(new DistributeData(cacheDataObjNames, sessionId))
      			.withBroadcastSet(loop, "barrier")
      			.name("distribute data");
      
      		for (ComQueueItem com : queue) {
      			if ((com instanceof CommunicateFunction)) {
      				CommunicateFunction communication = ((CommunicateFunction) com);         
               // 這里會調用比如 AllReduce.communication, 其會返回allReduce包裝后賦值給input,當循環遇到了下一個ComputeFunction(KMeansUpdateCentroids)時候,會把input賦給它處理。比如input = {MapPartitionOperator@5248},input.function = {AllReduce$AllReduceRecv@5260},input調用mapPartition,去間接調用KMeansUpdateCentroids。              
      				input = communication.communicateWith(input, sessionId);
      			} else if (com instanceof ComputeFunction) {
      				final ComputeFunction computation = (ComputeFunction) com;
      
              // 這里才到了 Flink,把計算隊列上的各個 ComputeFunction 映射到 Flink 的RichMapPartitionFunction。
      				input = input
      						.mapPartition(new RichMapPartitionFunction<byte[], byte[]>() {
      
      						@Override
      						public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
      							ComContext context = new ComContext(
      								sessionId, getIterationRuntimeContext()
      							);
                    // 在這里會被Flink調用具體計算函數,就是之前算法工程師拆分的算法片段。
      							computation.calc(context);
      						}
      					})
      					.withBroadcastSet(input, "barrier")
      					.name(com instanceof ChainedComputation ?
      						((ChainedComputation) com).name()
      						: "computation@" + computation.getClass().getSimpleName());
      			} else {
      				throw new RuntimeException("Unsupported op in iterative queue.");
      			}
      		}
      
      		return serializeModel(clearObjs(loopEnd));
      	}
      }
      

      4. Mapper(Function)

      Mapper是底層迭代計算框架的一部分,可以認為是 Mapper Function。因為涉及到業務邏輯,所以提前說明。

      5. 初始化

      初始化發生在 KMeansTrainBatchOp.linkFrom 中。我們可以看到在初始化時候,是可以調用 Flink 各種算子(比如.rebalance().map()) ,因為這時候還沒有和框架相關聯,這時候的計算是用戶自行控制,不需要加到 IterativeComQueue 之上。

      如果某一個計算既要加到 IterativeComQueue 之上,還要自己玩 Flink 算子,那框架就懵圈了,不知道該如何處理。所以用戶自由操作只能發生在沒有和框架聯系之前

      	@Override
      	public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
      		DataSet <FastDistanceVectorData> data = statistics.f0.rebalance().map(
      			new MapFunction <Vector, FastDistanceVectorData>() {
      				@Override
      				public FastDistanceVectorData map(Vector value) {
      					return distance.prepareVectorData(Row.of(value), 0);
      				}
      			});
      		......     
          }
      

      框架也提供了初始化功能,用于將DataSet緩存到內存中,緩存的形式包括Partition和Broadcast兩種形式。前者將DataSet分片緩存至內存,后者將DataSet整體緩存至每個worker的內存。

      		return new IterativeComQueue()
      			.initWithPartitionedData(TRAIN_DATA, data)
      			.initWithBroadcastData(INIT_CENTROID, initCentroid)
      			.initWithBroadcastData(KMEANS_STATISTICS, statistics)
                  ......
      

      6. ComputeFunction

      這是算法的具體計算模塊,算法工程師應該把算法拆分成各個可以并行處理的模塊,分別用 ComputeFunction 實現,這樣可以利用 Flnk 的分布式計算效力。

      下面舉出一個例子如下,這段代碼為每個點(point)計算最近的聚類中心,為每個聚類中心的點坐標的計數和求和:

      /**
       * Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
       */
      public class KMeansAssignCluster extends ComputeFunction {
          private FastDistance fastDistance;
          private transient DenseMatrix distanceMatrix;
      
          @Override
          public void calc(ComContext context) {
              Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
              Integer k = context.getObj(KMeansTrainBatchOp.K);
              // get iterative coefficient from static memory.
              Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
              if (context.getStepNo() % 2 == 0) {
                  stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
              } else {
                  stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
              }
      
              if (null == distanceMatrix) {
                  distanceMatrix = new DenseMatrix(k, 1);
              }
      
              double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
              if (sumMatrixData == null) {
                  sumMatrixData = new double[k * (vectorSize + 1)];
                  context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
              }
      
              Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
              if (trainData == null) {
                  return;
              }
      
              Arrays.fill(sumMatrixData, 0.0);
              for (FastDistanceVectorData sample : trainData) {
                  KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance,
                      distanceMatrix);
              }
          }
      }
      

      這里能夠看出,在 ComputeFunction 中,使用的是 命令式編程模式,這樣能夠最大的契合目前程序員現狀,極高提升生產力

      7. CommunicateFunction

      前面代碼中有一個關鍵處 .add(new AllReduce(CENTROID_ALL_REDUCE))。這部分代碼起到了承前啟后的作用。之前的 KMeansPreallocateCentroid,KMeansAssignCluster和其后的 KMeansUpdateCentroids通過它做了一個 reduce / broadcast 通訊。

      具體從注解中可以看到,AllReduce 是 MPI 相關通訊原語的一個實現。這里主要是對 double[] object 進行 reduce / broadcast。

      public class AllReduce extends CommunicateFunction {
      	public static <T> DataSet <T> allReduce(
      		DataSet <T> input,
      		final String bufferName,
      		final String lengthName,
      		final SerializableBiConsumer <double[], double[]> op,
      		final int sessionId) {
      		final String transferBufferName = UUID.randomUUID().toString();
      
      		return input
      			.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
      			.withBroadcastSet(input, "barrier")
      			.returns(
      				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
      			.name("AllReduceSend")
      			.partitionCustom(new Partitioner <Integer>() {
      				@Override
      				public int partition(Integer key, int numPartitions) {
      					return key;
      				}
      			}, 0)
      			.name("AllReduceBroadcastRaw")
      			.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
      			.returns(
      				new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
      			.name("AllReduceSum")
      			.partitionCustom(new Partitioner <Integer>() {
      				@Override
      				public int partition(Integer key, int numPartitions) {
      					return key;
      				}
      			}, 0)
      			.name("AllReduceBroadcastSum")
      			.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
      			.returns(input.getType())
      			.name("AllReduceRecv");
      	}    
      }
      

      經過調試我們能看出來,AllReduceSum 是在自己mapPartition實現中,調用了 SUM。

      	/**
      	 * The all-reduce operation which does elementwise sum operation.
      	 */
      	public final static SerializableBiConsumer <double[], double[]> SUM
      		= new SerializableBiConsumer <double[], double[]>() {
      		@Override
      		public void accept(double[] a, double[] b) {
      			for (int i = 0; i < a.length; ++i) {
      				a[i] += b[i];
      			}
      		}
      	};
      
      private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> {
      		@Override
      		public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,
      								 Collector <Tuple3 <Integer, Integer, double[]>> out) {
      
            // 省略各種初始化操作,比如確定傳輸位置,傳輸目標等
            ......
      	
      			do {
      				Tuple3 <Integer, Integer, double[]> val = it.next();
      				int localPos = val.f1 - startPos;
      				if (sum[localPos] == null) {
      					sum[localPos] = val.f2;
      					agg[localPos]++;
      				} else {
                // 這里會調用 SUM 
      					op.accept(sum[localPos], val.f2);
      				}
      			} while (it.hasNext());
      
      			for (int i = 0; i < numOfSubTasks; ++i) {
      				for (int j = 0; j < cnt; ++j) {
      					out.collect(Tuple3.of(i, startPos + j, sum[j]));
      				}
      			}
      		}
      	}
      
      accept:129, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
      accept:126, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
      mapPartition:314, AllReduce$AllReduceSum (com.alibaba.alink.common.comqueue.communication)
      run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
      run:504, BatchTask (org.apache.flink.runtime.operators)
      run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
      run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
      invoke:369, BatchTask (org.apache.flink.runtime.operators)
      doRun:705, Task (org.apache.flink.runtime.taskmanager)
      run:530, Task (org.apache.flink.runtime.taskmanager)
      run:745, Thread (java.lang)
      

      0x06 另一種打法

      總結到現在,我們發現這個迭代計算框架設計的非常優秀。但是Alink并沒有限定大家只能使用這個框架來實現算法。如果你是Flink高手,你完全可以隨心所欲的實現。

      Alink例子中本身就有一個這樣的實現 ALSExample。其核心類 AlsTrainBatchOp 就是直接使用了 Flink 算子,IterativeDataSet 等。

      這就好比是武松武都頭,一雙戒刀搠得倒貪官佞臣,赤手空拳也打得死吊睛白額大蟲

      public final class AlsTrainBatchOp
          extends BatchOperator<AlsTrainBatchOp>
          implements AlsTrainParams<AlsTrainBatchOp> {
      
          @Override
          public AlsTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
              BatchOperator<?> in = checkAndGetFirst(inputs);
      
       		......
      
              AlsTrain als = new AlsTrain(rank, numIter, lambda, implicitPrefs, alpha, numMiniBatches, nonNegative);
              DataSet<Tuple3<Byte, Long, float[]>> factors = als.fit(alsInput);
      
              DataSet<Row> output = factors.mapPartition(new RichMapPartitionFunction<Tuple3<Byte, Long, float[]>, Row>() {
                  @Override
                  public void mapPartition(Iterable<Tuple3<Byte, Long, float[]>> values, Collector<Row> out) {
                      new AlsModelDataConverter(userColName, itemColName).save(values, out);
                  }
              });
       
              return this;
          }
      }
      

      多提一點,Flink ML中也有ALS算法,是一個Scala實現。沒有Scala經驗的算法工程師看代碼會咬碎鋼牙。

      0x07 總結

      經過這兩篇文章的推測和驗證,現在我們總結如下。

      Alink的部分設計原則

      • 算法的歸算法,Flink的歸Flink,盡量屏蔽AI算法和Flink之間的聯系。

      • 采用最簡單,最常見的開發語言和思維方式。

      • 盡量借鑒市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。

      • 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又可以利用好Flink,還可以讓用戶基于此可以快速開發算法。

      針對這些原則,Alink實現了

      • 頂層流水線,Estimator, Transformer...
      • 算法組件中間層
      • 底層迭代計算框架

      這樣Alink即可以最大限度的享受Flink帶來的各種優勢,也能順應目前形勢,讓算法工程師工作更方便。從而達到系統性能和生產力的雙重提升。

      下一篇文章爭取介紹 AllReduce 的具體實現。

      0x08 參考

      k-means聚類算法原理簡析

      flink kmeans聚類算法實現

      Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer

      開源 | 全球首個批流一體機器學習平臺

      斬獲GitHub 2000+ Star,阿里云開源的 Alink 機器學習平臺如何跑贏雙11數據“博弈”?|AI 技術生態論

      Flink DataSet API

      posted @ 2020-05-10 09:10  羅西的思考  閱讀(3102)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 任我爽精品视频在线播放 | 人妻日韩精品中文字幕| 亚洲熟妇熟女久久精品综合 | 成年午夜无码av片在线观看| 国产成人免费| 日韩中文字幕有码av| 激情在线网| 美女黄网站18禁免费看| 色欲AV无码一区二区人妻| 九九热久久这里全是精品| 国产午夜亚洲精品久久| 色综合AV综合无码综合网站| 青青青爽在线视频观看| 97欧美精品系列一区二区| 久久综合色一综合色88| 人妻少妇久久中文字幕一区二区| 人妻av无码系列一区二区三区| 在线免费观看视频1区| 亚洲欧美日韩国产精品一区二区| 日韩中文字幕国产精品| 少妇的丰满3中文字幕| 精品一区二区三区不卡| 欧美成人午夜在线观看视频| 国产中文字幕精品在线| 亚洲国产精品日韩av专区| 亚洲一区二区三区在线观看精品中文| 国内精品久久久久影院网站| 久久精品国产最新地址| 国产女同疯狂作爱系列| 国产人妻人伦精品婷婷| 日本电影一区二区三区| 无套内射视频囯产| 亚洲青青草视频在线播放| 无码国产欧美一区二区三区不卡| 中文字幕亚洲精品乱码| 国产激情免费视频在线观看 | а天堂中文最新一区二区三区| 国产偷自一区二区三区在线| 久草热8精品视频在线观看| 亚洲成AV人片在线观高清| 亚洲高清国产拍精品熟女|