Alink提供了Pipeline/PipelineModel,其在功能和使用方式上与Scikit-learn和SparkML的Pipeline/PipelineModel类似。保持训练和预测过程中数据处理的一致性,这样调用过程清晰、简练,也降低了用户的学习成本。Alink Pipeline/PipelineModel与批式/流式组件(BatchOperator/ StreamOperator)一样,将Flink Table作为计算输入和输出结果的数据类型。
Scikit-learn和SparkML的Pipeline是针对批式数据的训练和预测设计的。Alink不仅支持批式数据的训练和预测,也支持将批式训练出的模型用于预测流式数据。
管道(Pipeline)的概念源于Scikit-learn。可以将数据处理的过程看成数据在“管道”中流动。管道分为若干个阶段(PipelineStage),数据每通过一个阶段就发生一次变换,数据通过整个管道,也就依次经历了所有变换。
如果要在管道中加入分类器,对数据进行类别预测,就涉及模型的训练和预测,这需要分两个步骤完成。所以,管道也会被细分为管道定义与管道模型(PipelineModel)。在管道定义中,每个PipelineStage会按其是否需要进行模型训练,分为估计器(Estimator)和转换器(Transformer)。随后,可以对其涉及模型的部分,即对估计器(Estimator)进行估计,从而得到含有模型的转换器。该转换器被称为Model,并用Model替换相应的估计器,从而每个阶段都可以直接对数据进行处理。我们将该Model称为PipelineModel。
上面介绍了几个概念,用图形表示它们之间的关系如下:
Pipeline与PipelineModel构成了完整的机器学习处理过程,可以分为三个子过程:定义过程,模型训练过程,数据处理过程。
比如,LR分类算法作为一个Estimator,可以在构建Pipleline的时候进行定义,之后在fit的过程中,会使用LR的训练算法,得到LR model,并将其作为整个PipelineModel的一部分;在使用PipelineModel处理数据时,会相应地调用LR算法的预测部分。
以使用朴素贝叶斯算法进行多分类为例,演示三个经典场景:批式训练、批式预测与流式预测。为了更好地体现它们之间的关联,我们用一张图表示了出来,如下图所示。
此图有如下特点:
(1)左边这列组件展示了批式训练的流程,中间那列组件展示了批式预测的流程,右边那列组件展示了流式预测的流程。
(2)训练和预测都经历了四个阶段:缺失值填充、MultiStringIndexer(将字符串数据用索引值替换)、VectorAssembler(将各数据字段组装为向量)、使用朴素贝叶斯算法进行训练/预测。
(3)在VectorAssembler阶段,运行时不需要额外的信息。在批式训练、批式预测和流式预测的过程中,组件均可直接使用。
(4)除VectorAssembler外的三个阶段,都需要对整体数据进行扫描或者迭代训练,得到模型后,才能处理(模型预测)数据。训练过程需要拿到所有的模型,缺失值填充和MultiStringIndexer阶段需要在得到模型后,对当前数据进行预测,将预测结果数据传给后面的阶段。
上面的四个阶段分别对应四个PipelineStage,这些PipelineStage构成了Pipeline。对应的代码如下:
pipeline = Pipeline()\ .add( Imputer()\ .setSelectedCols("...")\ .setStrategy('VALUE')\ .setFillValue("null") )\ .add( MultiStringIndexer()\ .setSelectedCols("...") )\ .add( VectorAssembler()\ .setSelectedCols("...")\ .setOutputCol("vec") )\ .add( NaiveBayesTextClassifier()\ .setVectorCol("vec")\ .setLabelCol("label")\ .setPredictionCol("pred") );
Pipeline可多次复用,不同的训练数据通过同样的流程可得到不同的模型:
trainData1 = CsvSourceBatchOp() trainData2= CsvSourceBatchOp() model = pipeline.fit(trainData1) model2 = pipeline.fit(trainData2)
得到的PipelineModel可重复应用于不同的预测数据:
predictData1= CsvSourceBatchOp() predictData2= CsvSourceBatchOp() predict1 = model.transform(predictData1) predict2 = model.transform(predictData2)
PipelineModel可以无区别地应用于批式预测和流式预测:
batchData = CsvSourceBatchOp() streamData= CsvSourceStreamOp() predictBatch = model.transform(batchData) predictStream = model.transform(streamData)
从PipelineModel可以得到用于本地预测的LocalPredictor,直接嵌入Java应用服务:
inputSchemaStr = "..." localPredictor = model.collectLocalPredictor(inputSchemaStr) inputRow = [ ... ] pred = localPredictor.map(inputRow)