Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構
Alink漫談(二) : 從源碼看機器學習平臺Alink設計和架構
0x00 摘要
Alink 是阿里巴巴基于實時計算引擎 Flink 研發的新一代機器學習算法平臺,是業界首個同時支持批式算法、流式算法的機器學習平臺。本文是漫談系列的第二篇,將從源碼入手,帶領大家具體剖析Alink設計思想和架構為何。
因為Alink的公開資料太少,所以均為自行揣測,肯定會有疏漏錯誤,希望大家指出,我會隨時更新。
0x01 Alink設計原則
前文中 Alink漫談(一) : 從KMeans算法實現看Alink設計思想 我們推測總結出Alink部分設計原則
-
算法的歸算法,Flink的歸Flink,盡量屏蔽AI算法和Flink之間的聯系。
-
采用最簡單,最常見的開發語言和思維方式。
-
盡量借鑒市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。
-
構建一套戰術打法(middleware或adapter),即屏蔽了Flink,又可以利用好Flink,還能讓用戶快速開發算法。
下面我們就針對這些設計原則,從上至下看看Alink如何設計自己這套戰術打法。
為了能讓大家更好理解,先整理一個概要圖。因為Alink系統主要可以分成三個層面(頂層流水線, 中間層算法組件, 底層迭代計算框架),再加上一個Flink runtime,所以下圖就是分別從這四個層面出發來看程序執行流程。
如何看待 pipeline.fit(data).transform(data).print();
// 從頂層流水線角度看
訓練流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeans(Estimator)]
| // KMeans.fit之后,會生成一個KMeansModel用來轉換
|
轉換流水線 +-----> [VectorAssembler(Transformer)] -----> [KMeansModel(Transformer)]
// 從中間層算法組件角度看
訓練算法組件 +-----> [MapBatchOp] -----> [KMeansTrainBatchOp]
| // VectorAssemblerMapper in MapBatchOp 是業務邏輯
|
轉換算法組件 +-----> [MapBatchOp] -----> [ModelMapBatchOp]
// VectorAssemblerMapper in MapBatchOp 是業務邏輯
// KMeansModelMapper in ModelMapBatchOp 是業務邏輯
// 從底層迭代計算框架角度看
訓練by框架 +-----> [VectorAssemblerMapper] -----> [KMeansPreallocateCentroid / KMeansAssignCluster / AllReduce / KMeansUpdateCentroids in IterativeComQueue]
| // 映射到Flink的各種算子進行訓練
|
轉換(直接) +-----> [VectorAssemblerMapper] -----> [KMeansModelMapper]
// 映射到Flink的各種算子進行轉換
// 從Flink runtime角度看
訓練 +-----> map, mapPartiiton...
| // VectorAssemblerMapper.map等會被調用
|
轉換 +-----> map, mapPartiiton...
// 比如調用 KMeansModelMapper.map 來轉換
0x02 Alink實例代碼
示例代碼還是用之前的KMeans算法部分模塊。
算法調用
public class KMeansExample {
public static void main(String[] args) throws Exception {
......
BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);
VectorAssembler va = new VectorAssembler()
.setSelectedCols(new String[]{"sepal_length", "sepal_width", "petal_length", "petal_width"})
.setOutputCol("features");
KMeans kMeans = new KMeans().setVectorCol("features").setK(3)
.setPredictionCol("prediction_result")
.setPredictionDetailCol("prediction_detail")
.setReservedCols("category")
.setMaxIter(100);
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
pipeline.fit(data).transform(data).print();
}
}
算法主函數
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
implements KMeansTrainParams <KMeansTrainBatchOp> {
static DataSet <Row> iterateICQ(...省略...) {
return new IterativeComQueue()
.initWithPartitionedData(TRAIN_DATA, data)
.initWithBroadcastData(INIT_CENTROID, initCentroid)
.initWithBroadcastData(KMEANS_STATISTICS, statistics)
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
.setMaxIter(maxIter)
.exec();
}
}
算法模塊舉例
基于點計數和坐標,計算新的聚類中心。
// Update the centroids based on the sum of points and point number belonging to the same cluster.
public class KMeansUpdateCentroids extends ComputeFunction {
@Override
public void calc(ComContext context) {
Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
Integer k = context.getObj(KMeansTrainBatchOp.K);
double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
}
stepNumCentroids.f0 = context.getStepNo();
context.putObj(KMeansTrainBatchOp.K,
updateCentroids(stepNumCentroids.f1, k, vectorSize, sumMatrixData, distance));
}
}
0x03 頂層 -- 流水線
本部分實現的設計原則是 :盡量借鑒市面上通用的設計思路和開發模式,讓開發者無縫切換。
1. 機器學習重要概念
一個典型的機器學習過程從數據收集開始,要經歷多個步驟,才能得到需要的輸出。這非常類似于流水線式工作,即通常會包含源數據ETL(抽取、轉化、加載),數據預處理,指標提取,模型訓練與交叉驗證,新數據預測等步驟。
先來說一下幾個重要的概念:
- Transformer:轉換器,是一種可以將一個數據轉換為另一個數據的算法。比如一個模型就是一個 Transformer。它可以把一個不包含轉換標簽的測試數據集 打上標簽,轉化成另一個包含轉換標簽的特征數據。Transformer可以理解為特征工程,即:特征標準化、特征正則化、特征離散化、特征平滑、onehot編碼等。該類型有一個transform方法,用于fit數據之后,輸入新的數據,進行特征變換。
- Estimator:評估器,它是學習算法或在訓練數據上的訓練方法的概念抽象。所有的機器學習算法模型,都被稱為估計器。在 Pipeline 里通常是被用來操作 數據并生產一個 Transformer。從技術上講,Estimator實現了一個方法fit(),它接受一個特征數據并產生一個轉換器。比如一個隨機森林算法就是一個 Estimator,它可以調用fit(),通過訓練特征數據而得到一個隨機森林模型。
- PipeLine:工作流或者管道。工作流將多個工作流階段(轉換器和估計器)連接在一起,形成機器學習的工作流,并獲得結果輸出。
- Parameter:Parameter 被用來設置 Transformer 或者 Estimator 的參數。
2. Alink中概念實現
從 Alink的目錄結構中 ,我們可以看出,Alink提供了這些常見概念(其中有些代碼借鑒了Flink ML)。
./java/com/alibaba/alink:
common operator params pipeline
./java/com/alibaba/alink/params:
associationrule evaluation nlp regression statistics
classification feature onlinelearning shared tuning
clustering io outlier similarity udf
dataproc mapper recommendation sql validators
./java/com/alibaba/alink/pipeline:
EstimatorBase.java ModelBase.java Trainer.java feature
LocalPredictable.java ModelExporterUtils.java TransformerBase.java nlp
LocalPredictor.java Pipeline.java classification recommendation
MapModel.java PipelineModel.java clustering regression
MapTransformer.java PipelineStageBase.java dataproc tuning
比較基礎的是三個接口:PipelineStages,Transformer,Estimator,分別恰好對應了機器學習的兩個通用概念 :轉換器 ,評估器。PipelineStages是這兩個的基礎接口。
// Base class for a stage in a pipeline. The interface is only a concept, and does not have any actual functionality. Its subclasses must be either Estimator or Transformer. No other classes should inherit this interface directly.
public interface PipelineStage<T extends PipelineStage<T>> extends WithParams<T>, Serializable
// A transformer is a PipelineStage that transforms an input Table to a result Table.
public interface Transformer<T extends Transformer<T>> extends PipelineStage<T>
// Estimators are PipelineStages responsible for training and generating machine learning models.
public interface Estimator<E extends Estimator<E, M>, M extends Model<M>> extends PipelineStage<E>
其次是三個抽象類定義:PipelineStageBase,EstimatorBase,TransformerBase,分別就對應了以上的三個接口。其中定義了一些基礎操作,比如 fit,transform。
// The base class for a stage in a pipeline, either an EstimatorBase or a TransformerBase.
public abstract class PipelineStageBase<S extends PipelineStageBase<S>>
implements WithParams<S>, HasMLEnvironmentId<S>, Cloneable
// The base class for estimator implementations.
public abstract class EstimatorBase<E extends EstimatorBase<E, M>, M extends ModelBase<M>>
extends PipelineStageBase<E> implements Estimator<E, M>
// The base class for transformer implementations.
public abstract class TransformerBase<T extends TransformerBase<T>>
extends PipelineStageBase<T> implements Transformer<T>
然后是Pipeline基礎類,這個類就可以把Transformer,Estimator聯系起來 。
// A pipeline is a linear workflow which chains EstimatorBases and TransformerBases to execute an algorithm
public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
private ArrayList<PipelineStageBase> stages = new ArrayList<>();
public Pipeline add(PipelineStageBase stage) {
this.stages.add(stage);
return this;
}
}
最后是 Parameter 概念相關舉例,比如實例中用到的 VectorAssemblerParams。
// Parameters for MISOMapper.
public interface MISOMapperParams<T> extends HasSelectedCols <T>, HasOutputCol <T>,
HasReservedCols <T> {}
// parameters of vector assembler.
public interface VectorAssemblerParams<T> extends MISOMapperParams<T> {
ParamInfo <String> HANDLE_INVALID = ParamInfoFactory
.createParamInfo("handleInvalid", String.class)
.setDescription("parameter for how to handle invalid data (NULL values)")
.setHasDefaultValue("error")
.build();
}
綜合來說,因為模型和數據,在Alink運行時候,都統一轉化為Table類型,所以可以整理如下:
- Transformer: 將input table轉換為output table。
- Estimator:將input table轉換為模型。
- 模型:將input table轉換為output table。
3. 結合實例看流水線
首先是一些基礎抽象類,比如:
- MapTransformer是 flat map 的Transformer。
- ModelBase是模型定義,也是一個Transformer。
- Trainer是訓練模型定義,是EstimatorBase。
// Abstract class for a flat map TransformerBase.
public abstract class MapTransformer<T extends MapTransformer <T>>
extends TransformerBase<T> implements LocalPredictable {
// The base class for a machine learning model.
public abstract class ModelBase<M extends ModelBase<M>> extends TransformerBase<M>
implements Model<M>
// Abstract class for a trainer that train a machine learning model.
public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
extends EstimatorBase<T, M>
然后就是我們實例用到的兩個類型定義。
- KMeans 是一個Trainer,其實現了EstimatorBase;
- VectorAssembler 是一個TransformerBase。
// 這是一個 EstimatorBase 類型
public class KMeans extends Trainer <KMeans, KMeansModel> implements
KMeansTrainParams <KMeans>, KMeansPredictParams <KMeans> {
@Override
protected BatchOperator train(BatchOperator in) {
return new KMeansTrainBatchOp(this.getParams()).linkFrom(in);
}
}
// 這是一個 TransformerBase 類型
public class VectorAssembler extends MapTransformer<VectorAssembler>
implements VectorAssemblerParams <VectorAssembler> {
public VectorAssembler(Params params) {
super(VectorAssemblerMapper::new, params);
}
}
實例中,分別構建了兩個流水線階段,然后這兩個實例就被鏈接到流水線上。
VectorAssembler va = new VectorAssembler()
KMeans kMeans = new KMeans()
Pipeline pipeline = new Pipeline().add(va).add(kMeans);
// 能看出來,流水線上有兩個階段,分別是VectorAssembler和KMeans。
pipeline = {Pipeline@1201}
stages = {ArrayList@2853} size = 2
0 = {VectorAssembler@1199}
mapperBuilder = {VectorAssembler$lambda@2859}
params = {Params@2860} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
1 = {KMeans@1200}
params = {Params@2857} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
0x04 中間層 -- 算法組件
算法組件是中間層的概念,可以認為是真正實現算法的模塊/層次。主要作用是承上啟下。
- 其上層是流水線各個階段,流水線的生成結果就是一個算法組件。算法組件的作用是把流水線的Estimator或者Transformer翻譯成具體算法。算法組件彼此是通過 linkFrom 串聯在一起。
- 其下層是"迭代計算框架",算法組件把具體算法邏輯中的計算/通信分成一個個小模塊,映射到Mapper Function 或者具體"迭代計算框架"的計算/通信 Function 上,這樣才能更好的利用Flink的各種優勢。
- "迭代計算框架" 中,主要兩個部分是 Mapper Function 和 計算/通信 Function,其在代碼中分別對應Mapper,ComQueueItem。
- Mapper Function 是映射Function(系統寫好了部分Mapper,用戶也可以根據算法來寫自己的Mapper);
- 計算/通信 Function是專門為算法寫的專用Function(也分成 系統內置的,算法自定義的)。
- 可以這么理解:各種Function是業務邏輯(組件)。算法組件只是提供運行規則,業務邏輯(組件)作為運行在算法組件上的插件。
- 也可以這么理解 :算法組件就是框架,其把部分業務邏輯委托給Mapper或者ComQueueItem。
比如
- KMeans 是 Estimator,其對應算法組件是 KMeansTrainBatchOp。其業務邏輯(組件)也在這個類中,是由IterativeComQueue為基礎串聯起來的一系列算法類(ComQueueItem)。
- VectorAssembler 是 Transformer,其對應算法組件是 MapBatchOp。其業務邏輯(組件)是VectorAssemblerMapper(其 map 函數會做業務邏輯,把將多個數值列按順序匯總成一個向量列)。
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp> implements KMeansTrainParams <KMeansTrainBatchOp>
// class for a flat map BatchOperator.
public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T>
無論是調用Estimator.fit 還是 Transformer.transform,其本質都是通過linkFrom函數,把各個Operator聯系起來,這樣就把數據流串聯起來。然后就可以逐步映射到Flink具體運行計劃上。
1. Algorithm operators
AlgoOperator是算子組件的基類,其子類有BatchOperator和StreamOperator,分別對應了批處理和流處理。
// Base class for algorithm operators.
public abstract class AlgoOperator<T extends AlgoOperator<T>>
implements WithParams<T>, HasMLEnvironmentId<T>, Serializable
// Base class of batch algorithm operators.
public abstract class BatchOperator<T extends BatchOperator <T>> extends AlgoOperator <T> {
// Link this object to BatchOperator using the BatchOperators as its input.
public abstract T linkFrom(BatchOperator <?>... inputs);
public <B extends BatchOperator <?>> B linkTo(B next) {
return link(next);
}
public BatchOperator print() throws Exception {
return linkTo(new PrintBatchOp().setMLEnvironmentId(getMLEnvironmentId()));
}
}
public abstract class StreamOperator<T extends StreamOperator <T>> extends AlgoOperator <T>
示例代碼如下:
// 輸入csv文件被轉化為一個BatchOperator
BatchOperator data = new CsvSourceBatchOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);
...
pipeline.fit(data).transform(data).print();
2. Mapper(提前說明)
Mapper是底層迭代計算框架的一部分,是業務邏輯(組件)。從目錄結構能看出。這里提前說明,是因為在流水線講解過程中大量涉及,所以就提前放在這里說明。
./java/com/alibaba/alink/common
linalg mapper model comqueue utils io
Mapper的幾個主要類定義如下,其作用廣泛,即可以映射輸入到輸出,也可以映射模型到具體數值。
// Abstract class for mappers.
public abstract class Mapper implements Serializable {}
// Abstract class for mappers with model.
public abstract class ModelMapper extends Mapper {}
// Find the closest cluster center for every point.
public class KMeansModelMapper extends ModelMapper {}
// Mapper with Multi-Input columns and Single Output column(MISO).
public abstract class MISOMapper extends Mapper {}
// This mapper maps many columns to one vector. the columns should be vector or numerical columns.
public class VectorAssemblerMapper extends MISOMapper {}
Mapper的業務邏輯依賴于算法組件來運作,比如 [ VectorAssemblerMapper in MapBatchOp ] ,[ KMeansModelMapper in ModelMapBatchOp ]。
ModelMapper具體運行則需要依賴 ModelMapperAdapter 來和Flink runtime聯系起來。ModelMapperAdapter繼承了RichMapFunction,ModelMapper作為其成員變量,在map操作中執行業務邏輯,ModelSource則是數據來源。
對應本實例,KMeansModelMapper 就是最后轉換的 BatchOperator,其map函數用來轉換。
3. 系統內置算法組件
系統內置了一些常用的算法組件,比如:
- MapBatchOp 功能是基于輸入來flat map,是 VectorAssembler 返回的算法組件。
- ModelMapBatchOp 功能是基于模型進行flat map,是 KMeans 返回的算法組件。
以 ModelMapBatchOp 為例給大家說明其作用,從下面代碼注釋中可以看出,linkFrom作用是:
- 把inputs"算法組件" 和 本身"算法組件" 聯系起來,這就形成了一個算法邏輯鏈。
- 把業務邏輯映射成 "Flink算子",這就形成了一個 "Flink算子鏈"。
public class ModelMapBatchOp<T extends ModelMapBatchOp<T>> extends BatchOperator<T> {
@Override
public T linkFrom(BatchOperator<?>... inputs) {
checkOpSize(2, inputs);
try {
BroadcastVariableModelSource modelSource = new BroadcastVariableModelSource(BROADCAST_MODEL_TABLE_NAME);
// mapper是映射函數
ModelMapper mapper = this.mapperBuilder.apply(
inputs[0].getSchema(),
inputs[1].getSchema(),
this.getParams());
// modelRows 是模型
DataSet<Row> modelRows = inputs[0].getDataSet().rebalance();
// resultRows 是輸入數據的映射變化
DataSet<Row> resultRows = inputs[1].getDataSet()
.map(new ModelMapperAdapter(mapper, modelSource))
// 把模型作為廣播變量,后續會在 ModelMapperAdapter 中使用
.withBroadcastSet(modelRows, BROADCAST_MODEL_TABLE_NAME);
TableSchema outputSchema = mapper.getOutputSchema();
this.setOutput(resultRows, outputSchema);
return (T) this;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
ModelMapperAdapter
ModelMapperAdapter 是適配器的實現,用來在flink上運行業務邏輯Mapper。從代碼可以看出,ModelMapperAdapter取出之前存儲的mapper和模型數據,然后基于此來進行具體算法業務。
/**
* Adapt a {@link ModelMapper} to run within flink.
* This adapter class hold the target {@link ModelMapper} and it's {@link ModelSource}. Upon open(), it will load model rows from {@link ModelSource} into {@link ModelMapper}.
*/
public class ModelMapperAdapter extends RichMapFunction<Row, Row> implements Serializable {
/**
* The ModelMapper to adapt.
*/
private final ModelMapper mapper;
/**
* Load model data from ModelSource when open().
*/
private final ModelSource modelSource;
public ModelMapperAdapter(ModelMapper mapper, ModelSource modelSource) {
// mapper是業務邏輯,modelSource是模型Broadcast source
this.mapper = mapper; // 在map操作中執行業務邏輯
this.modelSource = modelSource; // 數據來源
}
@Override
public void open(Configuration parameters) throws Exception {
// 從廣播變量中獲取模型數據
List<Row> modelRows = this.modelSource.getModelRows(getRuntimeContext());
this.mapper.loadModel(modelRows);
}
@Override
public Row map(Row row) throws Exception {
// 執行業務邏輯,在數據來源上轉換
return this.mapper.map(row);
}
}
4. 訓練階段fit
在 pipeline.fit(data) 之中,會沿著流水線依次執行。如果流水線下一個階段遇到了是Transformer,就調用其transform;如果遇到的是EstimatorBase,就先調用其fit,把EstimatorBase轉換為Transformer,然后再次調用這個轉換出來的Transformer.transform。就這樣一個一個階段執行。
4.1 具體流水線處理
-
如果流水線下一階段遇到EstimatorBase,會處理EstimatorBase的fit,把流水線上的Estimator轉換為 TransformerBase。Estimator.fit 接受一個特征數據并產生一個轉換器。
(如果這個階段 不是 流水線最后一個階段)會對這個 TransformerBase繼續處理。處理之后才能進入到流水線下一階段。
(如果這個階段 是 流水線最后一個階段)不會對這個 TransformerBase 做處理,直接結束流水線 fit 操作。
-
如果流水線下一階段遇到TransformerBase,就直接調用其transform函數。
-
對于所有需要處理的TransformerBase,無論是從EstimatorBase轉換出來的,還是Pipeline原有的 ,都調用其transform函數,轉換其input。
input = transformers[i].transform(input);。這樣每次轉換后的輸出再次賦值給input,作為流水線下一階段的輸入。 -
最后得到一個PipelineModel (其本身也是一個Transformer) ,這個屬于下一階段轉換流水線。
4.2 結合本實例概述
本實例有兩個stage。VectorAssembler是Transformer,KMeans是EstimatorBase。
這時候Pipeline其內部變量是:
this = {Pipeline@1195}
stages = {ArrayList@2851} size = 2
0 = {VectorAssembler@1198}
mapperBuilder = {VectorAssembler$lambda@2857}
params = {Params@2858} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
1 = {KMeans@2856}
params = {Params@2860} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
params = {HashMap@2862} size = 6
- Pipeline 先調用Transformer類型的VectorAssembler,來處理其input (就是csv的BatchOperator)。這個處理csv是通過linkFrom(input)來構建的。處理之后再包裝成一個MapBatchOp返回賦值給input。
- 其次調用EstimatorBase類型的Kmeans.fit函數,對input (就是 VectorAssembler 返回的MapBatchOp) 進行fit。fit過程中調用了KMeansTrainBatchOp.linkFrom來設置,fit生成了一個KMeansModel (Transformer)。因為這時候已經是流水線最后一步,所以不做后續的KMeansModel.transform操作。KMeansModel 就是訓練出來的判斷模型。
- 在上述調用過程中,會在transformers數組中記錄運算過的TransformerBase和EstimatorBase適配出來的Transformer。
- 最后以這個transformers數組為參數,生成一個 PipelineModel (其也是一個Transformer類型)。生成 PipelineModel 的目的是:PipelineModel是后續轉換中的新流水線。
PipelineMode 的新流水線處理流程是:從 csv 讀入/ 映射(VectorAssembler 處理),然后 KMeansModel 做轉換(下一節會具體介紹)。
fit 具體代碼是
public class Pipeline extends EstimatorBase<Pipeline, PipelineModel> {
// Train the pipeline with batch data.
public PipelineModel fit(BatchOperator input) {
int lastEstimatorIdx = getIndexOfLastEstimator();
TransformerBase[] transformers = new TransformerBase[stages.size()];
for (int i = 0; i < stages.size(); i++) {
PipelineStageBase stage = stages.get(i);
if (i <= lastEstimatorIdx) {
if (stage instanceof EstimatorBase) {
// 這里會把流水線上的具體 Algorithm operators 通過 linkFrom 函數串聯起來。
transformers[i] = ((EstimatorBase) stage).fit(input);
} else if (stage instanceof TransformerBase) {
transformers[i] = (TransformerBase) stage;
}
// 注意,如果是流水線最后一個階段,則不做transform處理。
if (i < lastEstimatorIdx) {
// 這里會調用到具體Transformer的transform函數,其會把流水線上的具體 Algorithm operators 通過 linkFrom 函數串聯起來。
input = transformers[i].transform(input);
}
} else {
transformers[i] = (TransformerBase) stage;
}
}
// 這里生成了一個PipelineModel,transformers會作為參數傳給他
return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());
}
}
// MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,然后再調用MapBatchOp.linkFrom。
public abstract class MapTransformer<T extends MapTransformer <T>>
extends TransformerBase<T> implements LocalPredictable {
@Override
public BatchOperator transform(BatchOperator input) {
return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
}
}
// Trainer是KMeans的基類。
public abstract class Trainer<T extends Trainer <T, M>, M extends ModelBase<M>>
@Override
public M fit(BatchOperator input) {
// KMeans.train 會調用 KMeansTrainBatchOp(this.getParams()).linkFrom(in);
// createModel會生成一個新的model,本示例中是 com.alibaba.alink.pipeline.clustering.KMeansModel
return createModel(train(input).getOutputTable());
}
}
下面會逐一論述這兩個環節。
4.3 VectorAssembler.transform
這部分作用是把csv數據轉化為KMeans訓練所需要的數據類型。
VectorAssembler.transform會調用到MapBatchOp.linkFrom。linkFrom首先把 csv input 進行了轉換,變成DataSet
public class MapBatchOp<T extends MapBatchOp<T>> extends BatchOperator<T> {
public T linkFrom(BatchOperator<?>... inputs) {
BatchOperator in = checkAndGetFirst(inputs);
try {
Mapper mapper = (Mapper)this.mapperBuilder.apply(in.getSchema(), this.getParams());
// 這里對csv輸入進行了map,這里只是生成邏輯執行計劃,具體操作會在print之后才做的。
DataSet<Row> resultRows = in.getDataSet().map(new MapperAdapter(mapper));
TableSchema resultSchema = mapper.getOutputSchema();
this.setOutput(resultRows, resultSchema);
return this;
} catch (Exception var6) {
throw new RuntimeException(var6);
}
}
}
// MapBatchOp本身
this = {MapBatchOp@3748} "UnnamedTable$1"
mapperBuilder = {VectorAssembler$lambda@3744}
params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
output = {TableImpl@5862} "UnnamedTable$1"
sideOutputs = null
// mapper就是業務邏輯模塊
mapper = {VectorAssemblerMapper@5785}
handleInvalid = {VectorAssemblerMapper$HandleType@5813} "ERROR"
outputColsHelper = {OutputColsHelper@5814}
colIndices = {int[4]@5815}
dataFieldNames = {String[5]@5816}
dataFieldTypes = {DataType[5]@5817}
params = {Params@5818} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
// 返回數值如下
resultRows = {MapOperator@5788}
function = {MapperAdapter@5826}
mapper = {VectorAssemblerMapper@5785}
defaultName = "linkFrom(MapBatchOp.java:35)"
// 調用棧如下
linkFrom:31, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)
4.4 KMeans.fit
這部分就是訓練模型。
KMeans是一個Trainer,其進而實現了EstimatorBase類型,所以流水線就調用到了其fit函數
KMeans.fit就是調用了Trainer.fit。
- Trainer.fit首先調用train函數,最終調用KMeansTrainBatchOp.linkFrom,這樣就和VectorAssembler串聯起來。KMeansTrainBatchOp 把VectorAssembler返回的 MapBatchOp進行處理。最后返回一個同樣類型KMeansTrainBatchOp。
- Trainer.fit其次調用Trainer.createModel,該函數會根據this的類型決定應該生成什么Model。對于 KMeans,就生成了KMeansModel。
因為KMeans是流水線最后一個階段,這時候不調用 input = transformers[i].transform(input); 所以目前還是訓練,生成一個模型 KMeansModel。
// 實際部分代碼
Trainer.fit(BatchOperator input) {
return createModel(train(input).getOutputTable());
}
public final class KMeansTrainBatchOp extends BatchOperator <KMeansTrainBatchOp>
implements KMeansTrainParams <KMeansTrainBatchOp> {
public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
DataSet <Row> finalCentroid = iterateICQ(initCentroid, data,
vectorSize, maxIter, tol, distance, distanceType, vectorColName, null, null);
this.setOutput(finalCentroid, new KMeansModelDataConverter().getModelSchema());
return this;
}
}
// 變量內容
this = {KMeansTrainBatchOp@5887}
params = {Params@5895} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
output = null
sideOutputs = null
inputs = {BatchOperator[1]@5888}
0 = {MapBatchOp@3748} "UnnamedTable$1"
mapperBuilder = {VectorAssembler$lambda@3744}
params = {Params@3754} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
output = {TableImpl@5862} "UnnamedTable$1"
sideOutputs = null
// 調用棧如下
linkFrom:84, KMeansTrainBatchOp (com.alibaba.alink.operator.batch.clustering)
train:31, KMeans (com.alibaba.alink.pipeline.clustering)
fit:34, Trainer (com.alibaba.alink.pipeline)
fit:117, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)
KMeansTrainBatchOp.linkFrom是算法重點。這里其實就是生成了算法所需要的一切前提,把各種Flink算子搭建好。后續會再提到。
fit函數生成了 KMeansModel,其transform函數在基類MapModel中實現,會在下一個transform階段完成調用。這個就是訓練出來的KMeans模型,其也是一個Transformer。
// Find the closest cluster center for every point.
public class KMeansModel extends MapModel<KMeansModel>
implements KMeansPredictParams <KMeansModel> {
public KMeansModel(Params params) {
super(KMeansModelMapper::new, params);
}
}
4.5 生成新的轉換流水線
前面說到了,Pipeline的fit函數,返回一個PipelineModel。這個PipelineModel在后續會繼續調用transform,完成轉換階段。
return new PipelineModel(transformers).setMLEnvironmentId(input.getMLEnvironmentId());
5. 轉換階段transform
轉換階段的流水線,依然要從VectorAssembler入手來讀取csv,進行map處理。然后調用 KMeansModel。
PipelineModel會繼續調用transform函數。其作用是把Transformer轉化為BatchOperator。這時候其內部變量如下,看出來已經從最初流水線各種類型參雜 轉換為 統一transform實例。
this = {PipelineModel@5016}
transformers = {TransformerBase[2]@5017}
0 = {VectorAssembler@1198}
mapperBuilder = {VectorAssembler$lambda@2855}
params = {Params@2856} "Params {outputCol="features", selectedCols=["sepal_length","sepal_width","petal_length","petal_width"]}"
1 = {KMeansModel@5009}
mapperBuilder = {KMeansModel$lambda@5011}
modelData = {TableImpl@4984} "UnnamedTable$2"
params = {Params@5012} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
modelData = null
params = {Params@5018} "Params {MLEnvironmentId=0}"
-
第一次transform調用到了MapBatchOp.linkFrom,就是VectorAssembler.transform調用到的,其作用和 在 fit 流水線中起到的作用一樣,下面注釋中有解釋。
-
第二次transform調用到了ModelMapBatchOp.linkFrom,就是KMeansModel.transform間接調用到的。下面注釋中有解釋。
這兩次 transform 的調用生成了 BatchOperator 的串聯。最終返回結果是 ModelMapBatchOp,即一個BatchOperator。轉換將由ModelMapBatchOp來轉換。
// The model fitted by Pipeline.
public class PipelineModel extends ModelBase<PipelineModel> implements LocalPredictable {
@Override
public BatchOperator<?> transform(BatchOperator input) {
for (TransformerBase transformer : this.transformers) {
input = transformer.transform(input);
}
return input;
}
}
// 經過變化后,得到一個最終的轉化結果 BatchOperator,以此來轉換
// {KMeansModel$lambda@5050} 就是 KMeansModelMapper,轉換邏輯。
input = {ModelMapBatchOp@5047} "UnnamedTable$3"
mapperBuilder = {KMeansModel$lambda@5050}
params = {Params@5051} "Params {vectorCol="features", maxIter=100, reservedCols=["category"], k=3, predictionCol="prediction_result", predictionDetailCol="prediction_detail"}"
params = {HashMap@5058} size = 6
"vectorCol" -> ""features""
"maxIter" -> "100"
"reservedCols" -> "["category"]"
"k" -> "3"
"predictionCol" -> ""prediction_result""
"predictionDetailCol" -> ""prediction_detail""
output = {TableImpl@5052} "UnnamedTable$3"
tableEnvironment = {BatchTableEnvironmentImpl@5054}
operationTree = {DataSetQueryOperation@5055}
operationTreeBuilder = {OperationTreeBuilder@5056}
lookupResolver = {LookupCallResolver@5057}
tableName = "UnnamedTable$3"
sideOutputs = null
// MapTransformer是VectorAssembler的基類。transform會生成一個MapBatchOp,然后再調用MapBatchOp.linkFrom。
public abstract class MapTransformer<T extends MapTransformer <T>>
extends TransformerBase<T> implements LocalPredictable {
@Override
public BatchOperator transform(BatchOperator input) {
return new MapBatchOp(this.mapperBuilder, this.params).linkFrom(input);
}
}
// MapModel是KMeansModel的基類,transform會生成一個ModelMapBatchOp,然后再調用ModelMapBatchOp.linkFrom。
public abstract class MapModel<T extends MapModel<T>>
extends ModelBase<T> implements LocalPredictable {
@Override
public BatchOperator transform(BatchOperator input) {
return new ModelMapBatchOp(this.mapperBuilder, this.params)
.linkFrom(BatchOperator.fromTable(this.getModelData())
.setMLEnvironmentId(input.getMLEnvironmentId()), input);
}
}
在這兩個linkFrom中,還是分別生成了兩個MapOperator,然后拼接起來,構成了一個 BatchOperator 串。從上面代碼中可以看出,KMeansModel對應的ModelMapBatchOp,其linkFrom會返回一個ModelMapperAdapter。ModelMapperAdapter是一個RichMapFunction類型,它會把KMeansModelMapper作為RichMapFunction.function成員變量保存起來。然后會調用 .map(new ModelMapperAdapter(mapper, modelSource)),map就是Flink算子,這樣轉換算法就和Flink聯系起來了。
最后 Keans 算法的轉換工作是通過 KMeansModelMapper.map 來完成的。
6. 運行
我們都知道,Flink程序中,為了讓程序運行,需要
- 獲取execution environment : 調用類似
getExecutionEnvironment()來獲取environment; - 觸發程序執行 : 調用類似
env.execute("KMeans Example");來真正執行。
Alink其實就是一個Flink應用,只不過要比普通Flink應用復雜太多。
但是從實例代碼中,我們沒有看到類似調用。這說明Alink封裝的非常好,但是作為好奇的程序員,我們需要知道究竟這些調用隱藏在哪里。
獲取執行環境
Alink是在Pipeline執行的時候,獲取到運行環境。具體來說,因為csv文件是最初的輸入,所以當transform調用其 in.getSchema() 時候,會獲取運行環境。
public final class CsvSourceBatchOp extends BaseSourceBatchOp<CsvSourceBatchOp>
implements CsvSourceParams<CsvSourceBatchOp> {
@Override
public Table initializeDataSource() {
ExecutionEnvironment execEnv = MLEnvironmentFactory.get(getMLEnvironmentId()).getExecutionEnvironment();
}
}
initializeDataSource:77, CsvSourceBatchOp (com.alibaba.alink.operator.batch.source)
getOutputTable:52, BaseSourceBatchOp (com.alibaba.alink.operator.batch.source)
getSchema:180, AlgoOperator (com.alibaba.alink.operator)
linkFrom:34, MapBatchOp (com.alibaba.alink.operator.batch.utils)
transform:34, MapTransformer (com.alibaba.alink.pipeline)
fit:122, Pipeline (com.alibaba.alink.pipeline)
main:31, KMeansExample (com.alibaba.alink)
觸發程序運行
截止到現在,Alink已經做了很多東西,也映射到了 Flink算子上,那么究竟什么地方才真正和Flink聯系起來呢?
print 調用的是BatchOperator.print,真正從這里開始,會一層一層調用下去,最后來到
package com.alibaba.alink.operator.batch.utils;
public class PrintBatchOp extends BaseSinkBatchOp<PrintBatchOp> {
@Override
protected PrintBatchOp sinkFrom(BatchOperator in) {
this.setOutputTable(in.getOutputTable());
if (null != this.getOutputTable()) {
try {
// 在這個 collect 之后,會進入到 Flink 的runtime之中。
List <Row> rows = DataSetConversionUtil.fromTable(getMLEnvironmentId(), this.getOutputTable()).collect();
batchPrintStream.println(TableUtil.formatTitle(this.getColNames()));
for (Row row : rows) {
batchPrintStream.println(TableUtil.formatRows(row));
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
return this;
}
}
在 LocalEnvironment 這里把Alink和Flink的運行環境聯系起來。
public class LocalEnvironment extends ExecutionEnvironment {
@Override
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(null, false);
// 下面會真正的和Flink聯系起來。
if (executor != null) {
return executor.getOptimizerPlanAsJSON(p);
}
else {
PlanExecutor tempExecutor = PlanExecutor.createLocalExecutor(configuration);
return tempExecutor.getOptimizerPlanAsJSON(p);
}
}
}
// 調用棧如下
execute:91, LocalEnvironment (org.apache.flink.api.java)
execute:820, ExecutionEnvironment (org.apache.flink.api.java)
collect:413, DataSet (org.apache.flink.api.java)
sinkFrom:40, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
sinkFrom:18, PrintBatchOp (com.alibaba.alink.operator.batch.utils)
linkFrom:31, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
linkFrom:17, BaseSinkBatchOp (com.alibaba.alink.operator.batch.sink)
link:89, BatchOperator (com.alibaba.alink.operator.batch)
linkTo:239, BatchOperator (com.alibaba.alink.operator.batch)
print:337, BatchOperator (com.alibaba.alink.operator.batch)
main:31, KMeansExample (com.alibaba.alink)
0x05 底層--迭代計算框架
這里對應如下設計原則:
- 構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又可以利用好Flink,還可以讓用戶基于此可以快速開發算法。
- 采用最簡單,最常見的開發語言和開發模式。
讓我們想想看,大概有哪些基礎工作需要做:
- 如何初始化
- 如何通信
- 如何分割代碼,如何廣播代碼
- 如何分割數據,如何廣播數據
- 如何迭代算法
其中最重要的概念是IterativeComQueue,這是把通信或者計算抽象成ComQueueItem,然后把ComQueueItem串聯起來形成隊列。這樣就形成了面向迭代計算場景的一套迭代通信計算框架。
再次把目錄結構列在這里:
./java/com/alibaba/alink/common:
MLEnvironment.java linalg MLEnvironmentFactory.java mapper
VectorTypes.java model comqueue utils io
里面大致有 :
- Flink 封裝模塊 :MLEnvironment.java, MLEnvironmentFactory.java。
- 線性代數模塊:linalg。
- 計算/通訊隊列模塊:comqueue,其中ComputeFunction進行計算,比如訓練算法。
- 映射模塊:mapper,其中Mapper進行各種映射,比如 ModelMapper 把模型映射為數值(就是轉換算法)。
- 模型 :model,主要是用來讀取model source。
- 基礎模塊:utils,io。
算法組件在其linkFrom函數中,會做如下操作:
- 先進行部分初始化,此時會調用部分Flink算子,比如groupBy等等。
- 再將算法邏輯剝離出來,委托給Mapper或者ComQueueItem。
- Mapper或者ComQueueItem會調用Flink map算子或者mapPartition算子等。
- 調用Flink算子過程就是把算法分割然后適配到Flink上的過程。
下面就一一闡述。
1. Flink上下文封裝
MLEnvironment 是個重要的類。其封裝了Flink開發所必須要的運行上下文。用戶可以通過這個類來獲取各種實際運行環境,可以建立table,可以運行SQL語句。
/**
* The MLEnvironment stores the necessary context in Flink.
* Each MLEnvironment will be associated with a unique ID.
* The operations associated with the same MLEnvironment ID
* will share the same Flink job context.
*/
public class MLEnvironment {
private ExecutionEnvironment env;
private StreamExecutionEnvironment streamEnv;
private BatchTableEnvironment batchTableEnv;
private StreamTableEnvironment streamTableEnv;
}
2. Function
Function是計算框架中,對于計算和通訊等業務邏輯的最小模塊。具體定義如下。
- ComputeFunction 是計算模塊。
- CommunicateFunction 是通訊模塊。CommunicateFunction和ComputeFunction都是ComQueueItem子類,它們是業務邏輯實現者。
- CompareCriterionFunction 是判斷模塊,用來判斷何時結束循環。這就允許用戶指定迭代終止條件。
- CompleteResultFunction 用來在結束循環時候調用,作為循環結果。
- Mapper也是一種Funciton,即Mapper Function。
后續將統稱為 Function。
/**
* Basic build block in {@link BaseComQueue}, for either communication or computation.
*/
public interface ComQueueItem extends Serializable {}
/**
* An BaseComQueue item for computation.
*/
public abstract class ComputeFunction implements ComQueueItem {
/**
* Perform the computation work.
*
* @param context to get input object and update output object.
*/
public abstract void calc(ComContext context);
}
/**
* An BaseComQueue item for communication.
*/
public abstract class CommunicateFunction implements ComQueueItem {
/**
* Perform communication work.
*
* @param input output of previous queue item.
* @param sessionId session id for shared objects.
* @param <T> Type of dataset.
* @return result dataset.
*/
public abstract <T> DataSet <T> communicateWith(DataSet <T> input, int sessionId);
}
結合我們代碼來看,KMeansTrainBatchOp算法組件的部分作用是:KMeans算法被分割成若干CommunicateFunction。然后被添加到計算通訊隊列上。
下面代碼中,具體 Item 如下:
- ComputeFunction :KMeansPreallocateCentroid,KMeansAssignCluster,KMeansUpdateCentroids
- CommunicateFunction :AllReduce
- CompareCriterionFunction :KMeansIterTermination
- CompleteResultFunction : KMeansOutputModel
即算法實現的主要工作是:
- 構建了一個IterativeComQueue。
- 初始化數據,這里有兩種辦法:initWithPartitionedData將DataSet分片緩存至內存。initWithBroadcastData將DataSet整體緩存至每個worker的內存。
- 將計算分割為若干ComputeFunction,串聯在IterativeComQueue
- 運用AllReduce通信模型完成了數據同步
static DataSet <Row> iterateICQ(...省略...) {
return new IterativeComQueue()
.initWithPartitionedData(TRAIN_DATA, data)
.initWithBroadcastData(INIT_CENTROID, initCentroid)
.initWithBroadcastData(KMEANS_STATISTICS, statistics)
.add(new KMeansPreallocateCentroid())
.add(new KMeansAssignCluster(distance))
.add(new AllReduce(CENTROID_ALL_REDUCE))
.add(new KMeansUpdateCentroids(distance))
.setCompareCriterionOfNode0(new KMeansIterTermination(distance, tol))
.closeWith(new KMeansOutputModel(distanceType, vectorColName, latitudeColName, longitudeColName))
.setMaxIter(maxIter)
.exec();
}
3. 計算/通訊隊列
BaseComQueue 就是這個迭代框架的基礎。它維持了一個 List<ComQueueItem> queue。用戶在生成算法模塊時候,會把各種 Function 添加到隊列中。
IterativeComQueue 是 BaseComQueue 的缺省實現,具體實現了setMaxIter,setCompareCriterionOfNode0兩個函數。
BaseComQueue兩個重要函數是:
- optimize 函數:把隊列上相鄰的 ComputeFunction串聯起來,形成一個 ChainedComputation。在框架中進行優化,就是Alink的一個優勢所在。
- exec 函數:運行隊列上的各個 Function,返回最終的 Dataset。實際上,這里才真正到了 Flink,比如把計算隊列上的各個 ComputeFunction 映射到 Flink 的 RichMapPartitionFunction。然后在mapPartition函數調用中,會調用真實算法邏輯片斷
computation.calc(context);。
可以認為,BaseComQueue 是個邏輯概念,讓算法工程師可以更好的組織自己的業務語言。而通過其exec函數把算法邏輯映射到Flink算子上。這樣在某種程度上起到了與Flink解耦合的作用。
具體定義(摘取函數內部分代碼)如下:
// Base class for the com(Computation && Communicate) queue.
public class BaseComQueue<Q extends BaseComQueue<Q>> implements Serializable {
/**
* All computation or communication functions.
*/
private final List<ComQueueItem> queue = new ArrayList<>();
/**
* The function executed to decide whether to break the loop.
*/
private CompareCriterionFunction compareCriterion;
/**
* The function executed when closing the iteration
*/
private CompleteResultFunction completeResult;
private void optimize() {
if (queue.isEmpty()) {
return;
}
int current = 0;
for (int ahead = 1; ahead < queue.size(); ++ahead) {
ComQueueItem curItem = queue.get(current);
ComQueueItem aheadItem = queue.get(ahead);
// 這里進行判斷,是否是前后都是 ComputeFunction,然后合并成 ChainedComputation
if (aheadItem instanceof ComputeFunction && curItem instanceof ComputeFunction) {
if (curItem instanceof ChainedComputation) {
queue.set(current, ((ChainedComputation) curItem).add((ComputeFunction) aheadItem));
} else {
queue.set(current, new ChainedComputation()
.add((ComputeFunction) curItem)
.add((ComputeFunction) aheadItem)
);
}
} else {
queue.set(++current, aheadItem);
}
}
queue.subList(current + 1, queue.size()).clear();
}
/**
* Execute the BaseComQueue and get the result dataset.
*
* @return result dataset.
*/
public DataSet<Row> exec() {
optimize();
IterativeDataSet<byte[]> loop
= loopStartDataSet(executionEnvironment)
.iterate(maxIter);
DataSet<byte[]> input = loop
.mapPartition(new DistributeData(cacheDataObjNames, sessionId))
.withBroadcastSet(loop, "barrier")
.name("distribute data");
for (ComQueueItem com : queue) {
if ((com instanceof CommunicateFunction)) {
CommunicateFunction communication = ((CommunicateFunction) com);
// 這里會調用比如 AllReduce.communication, 其會返回allReduce包裝后賦值給input,當循環遇到了下一個ComputeFunction(KMeansUpdateCentroids)時候,會把input賦給它處理。比如input = {MapPartitionOperator@5248},input.function = {AllReduce$AllReduceRecv@5260},input調用mapPartition,去間接調用KMeansUpdateCentroids。
input = communication.communicateWith(input, sessionId);
} else if (com instanceof ComputeFunction) {
final ComputeFunction computation = (ComputeFunction) com;
// 這里才到了 Flink,把計算隊列上的各個 ComputeFunction 映射到 Flink 的RichMapPartitionFunction。
input = input
.mapPartition(new RichMapPartitionFunction<byte[], byte[]>() {
@Override
public void mapPartition(Iterable<byte[]> values, Collector<byte[]> out) {
ComContext context = new ComContext(
sessionId, getIterationRuntimeContext()
);
// 在這里會被Flink調用具體計算函數,就是之前算法工程師拆分的算法片段。
computation.calc(context);
}
})
.withBroadcastSet(input, "barrier")
.name(com instanceof ChainedComputation ?
((ChainedComputation) com).name()
: "computation@" + computation.getClass().getSimpleName());
} else {
throw new RuntimeException("Unsupported op in iterative queue.");
}
}
return serializeModel(clearObjs(loopEnd));
}
}
4. Mapper(Function)
Mapper是底層迭代計算框架的一部分,可以認為是 Mapper Function。因為涉及到業務邏輯,所以提前說明。
5. 初始化
初始化發生在 KMeansTrainBatchOp.linkFrom 中。我們可以看到在初始化時候,是可以調用 Flink 各種算子(比如.rebalance().map()) ,因為這時候還沒有和框架相關聯,這時候的計算是用戶自行控制,不需要加到 IterativeComQueue 之上。
如果某一個計算既要加到 IterativeComQueue 之上,還要自己玩 Flink 算子,那框架就懵圈了,不知道該如何處理。所以用戶自由操作只能發生在沒有和框架聯系之前。
@Override
public KMeansTrainBatchOp linkFrom(BatchOperator <?>... inputs) {
DataSet <FastDistanceVectorData> data = statistics.f0.rebalance().map(
new MapFunction <Vector, FastDistanceVectorData>() {
@Override
public FastDistanceVectorData map(Vector value) {
return distance.prepareVectorData(Row.of(value), 0);
}
});
......
}
框架也提供了初始化功能,用于將DataSet緩存到內存中,緩存的形式包括Partition和Broadcast兩種形式。前者將DataSet分片緩存至內存,后者將DataSet整體緩存至每個worker的內存。
return new IterativeComQueue()
.initWithPartitionedData(TRAIN_DATA, data)
.initWithBroadcastData(INIT_CENTROID, initCentroid)
.initWithBroadcastData(KMEANS_STATISTICS, statistics)
......
6. ComputeFunction
這是算法的具體計算模塊,算法工程師應該把算法拆分成各個可以并行處理的模塊,分別用 ComputeFunction 實現,這樣可以利用 Flnk 的分布式計算效力。
下面舉出一個例子如下,這段代碼為每個點(point)計算最近的聚類中心,為每個聚類中心的點坐標的計數和求和:
/**
* Find the closest cluster for every point and calculate the sums of the points belonging to the same cluster.
*/
public class KMeansAssignCluster extends ComputeFunction {
private FastDistance fastDistance;
private transient DenseMatrix distanceMatrix;
@Override
public void calc(ComContext context) {
Integer vectorSize = context.getObj(KMeansTrainBatchOp.VECTOR_SIZE);
Integer k = context.getObj(KMeansTrainBatchOp.K);
// get iterative coefficient from static memory.
Tuple2<Integer, FastDistanceMatrixData> stepNumCentroids;
if (context.getStepNo() % 2 == 0) {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID1);
} else {
stepNumCentroids = context.getObj(KMeansTrainBatchOp.CENTROID2);
}
if (null == distanceMatrix) {
distanceMatrix = new DenseMatrix(k, 1);
}
double[] sumMatrixData = context.getObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE);
if (sumMatrixData == null) {
sumMatrixData = new double[k * (vectorSize + 1)];
context.putObj(KMeansTrainBatchOp.CENTROID_ALL_REDUCE, sumMatrixData);
}
Iterable<FastDistanceVectorData> trainData = context.getObj(KMeansTrainBatchOp.TRAIN_DATA);
if (trainData == null) {
return;
}
Arrays.fill(sumMatrixData, 0.0);
for (FastDistanceVectorData sample : trainData) {
KMeansUtil.updateSumMatrix(sample, 1, stepNumCentroids.f1, vectorSize, sumMatrixData, k, fastDistance,
distanceMatrix);
}
}
}
這里能夠看出,在 ComputeFunction 中,使用的是 命令式編程模式,這樣能夠最大的契合目前程序員現狀,極高提升生產力。
7. CommunicateFunction
前面代碼中有一個關鍵處 .add(new AllReduce(CENTROID_ALL_REDUCE))。這部分代碼起到了承前啟后的作用。之前的 KMeansPreallocateCentroid,KMeansAssignCluster和其后的 KMeansUpdateCentroids通過它做了一個 reduce / broadcast 通訊。
具體從注解中可以看到,AllReduce 是 MPI 相關通訊原語的一個實現。這里主要是對 double[] object 進行 reduce / broadcast。
public class AllReduce extends CommunicateFunction {
public static <T> DataSet <T> allReduce(
DataSet <T> input,
final String bufferName,
final String lengthName,
final SerializableBiConsumer <double[], double[]> op,
final int sessionId) {
final String transferBufferName = UUID.randomUUID().toString();
return input
.mapPartition(new AllReduceSend <T>(bufferName, lengthName, transferBufferName, sessionId))
.withBroadcastSet(input, "barrier")
.returns(
new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
.name("AllReduceSend")
.partitionCustom(new Partitioner <Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key;
}
}, 0)
.name("AllReduceBroadcastRaw")
.mapPartition(new AllReduceSum(bufferName, lengthName, sessionId, op))
.returns(
new TupleTypeInfo <>(Types.INT, Types.INT, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO))
.name("AllReduceSum")
.partitionCustom(new Partitioner <Integer>() {
@Override
public int partition(Integer key, int numPartitions) {
return key;
}
}, 0)
.name("AllReduceBroadcastSum")
.mapPartition(new AllReduceRecv <T>(bufferName, lengthName, sessionId))
.returns(input.getType())
.name("AllReduceRecv");
}
}
經過調試我們能看出來,AllReduceSum 是在自己mapPartition實現中,調用了 SUM。
/**
* The all-reduce operation which does elementwise sum operation.
*/
public final static SerializableBiConsumer <double[], double[]> SUM
= new SerializableBiConsumer <double[], double[]>() {
@Override
public void accept(double[] a, double[] b) {
for (int i = 0; i < a.length; ++i) {
a[i] += b[i];
}
}
};
private static class AllReduceSum extends RichMapPartitionFunction <Tuple3 <Integer, Integer, double[]>, Tuple3 <Integer, Integer, double[]>> {
@Override
public void mapPartition(Iterable <Tuple3 <Integer, Integer, double[]>> values,
Collector <Tuple3 <Integer, Integer, double[]>> out) {
// 省略各種初始化操作,比如確定傳輸位置,傳輸目標等
......
do {
Tuple3 <Integer, Integer, double[]> val = it.next();
int localPos = val.f1 - startPos;
if (sum[localPos] == null) {
sum[localPos] = val.f2;
agg[localPos]++;
} else {
// 這里會調用 SUM
op.accept(sum[localPos], val.f2);
}
} while (it.hasNext());
for (int i = 0; i < numOfSubTasks; ++i) {
for (int j = 0; j < cnt; ++j) {
out.collect(Tuple3.of(i, startPos + j, sum[j]));
}
}
}
}
accept:129, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
accept:126, AllReduce$3 (com.alibaba.alink.common.comqueue.communication)
mapPartition:314, AllReduce$AllReduceSum (com.alibaba.alink.common.comqueue.communication)
run:103, MapPartitionDriver (org.apache.flink.runtime.operators)
run:504, BatchTask (org.apache.flink.runtime.operators)
run:157, AbstractIterativeTask (org.apache.flink.runtime.iterative.task)
run:107, IterationIntermediateTask (org.apache.flink.runtime.iterative.task)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:705, Task (org.apache.flink.runtime.taskmanager)
run:530, Task (org.apache.flink.runtime.taskmanager)
run:745, Thread (java.lang)
0x06 另一種打法
總結到現在,我們發現這個迭代計算框架設計的非常優秀。但是Alink并沒有限定大家只能使用這個框架來實現算法。如果你是Flink高手,你完全可以隨心所欲的實現。
Alink例子中本身就有一個這樣的實現 ALSExample。其核心類 AlsTrainBatchOp 就是直接使用了 Flink 算子,IterativeDataSet 等。
這就好比是武松武都頭,一雙戒刀搠得倒貪官佞臣,赤手空拳也打得死吊睛白額大蟲。
public final class AlsTrainBatchOp
extends BatchOperator<AlsTrainBatchOp>
implements AlsTrainParams<AlsTrainBatchOp> {
@Override
public AlsTrainBatchOp linkFrom(BatchOperator<?>... inputs) {
BatchOperator<?> in = checkAndGetFirst(inputs);
......
AlsTrain als = new AlsTrain(rank, numIter, lambda, implicitPrefs, alpha, numMiniBatches, nonNegative);
DataSet<Tuple3<Byte, Long, float[]>> factors = als.fit(alsInput);
DataSet<Row> output = factors.mapPartition(new RichMapPartitionFunction<Tuple3<Byte, Long, float[]>, Row>() {
@Override
public void mapPartition(Iterable<Tuple3<Byte, Long, float[]>> values, Collector<Row> out) {
new AlsModelDataConverter(userColName, itemColName).save(values, out);
}
});
return this;
}
}
多提一點,Flink ML中也有ALS算法,是一個Scala實現。沒有Scala經驗的算法工程師看代碼會咬碎鋼牙。
0x07 總結
經過這兩篇文章的推測和驗證,現在我們總結如下。
Alink的部分設計原則
-
算法的歸算法,Flink的歸Flink,盡量屏蔽AI算法和Flink之間的聯系。
-
采用最簡單,最常見的開發語言和思維方式。
-
盡量借鑒市面上通用的機器學習設計思路和開發模式,讓開發者無縫切換。
-
構建一套戰術打法(middleware或者adapter),即屏蔽了Flink,又可以利用好Flink,還可以讓用戶基于此可以快速開發算法。
針對這些原則,Alink實現了
- 頂層流水線,Estimator, Transformer...
- 算法組件中間層
- 底層迭代計算框架
這樣Alink即可以最大限度的享受Flink帶來的各種優勢,也能順應目前形勢,讓算法工程師工作更方便。從而達到系統性能和生產力的雙重提升。
下一篇文章爭取介紹 AllReduce 的具體實現。
0x08 參考
Spark ML簡介之Pipeline,DataFrame,Estimator,Transformer
斬獲GitHub 2000+ Star,阿里云開源的 Alink 機器學習平臺如何跑贏雙11數據“博弈”?|AI 技術生態論
浙公網安備 33010602011771號