这里将通过几个简单的Alink示例,让读者留下一个初步的Alink印象。
我们以常用的鸢尾花(iris)数据集为例,演示一下如何读取数据。
数据下载链接:http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Alink的CSV格式数据源读取组件,不但可以读取本地文件,还可以直接读取网络文件;在该组件读取文件的时候需要指定各列数据的名称和类型。下面是具体的代码。在此,读取出数据后,选择前5条数据进行打印输出:
source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string") source.firstN(5).print();
运行结果如下:
下一步,我们对数据进行采样,采样数为10条。然后连接到CSV格式输出组件CsvSinkBatchOp,并设置相应的输出路径。可以设置覆盖写参数为true。
source.sampleWithSize(10)\ .link( CsvSinkBatchOp()\ .setFilePath(DATA_DIR + "iris_10.data")\ .setOverwriteSink(True) ) BatchOperator.execute();
采样出来的数据被保存到文件iris_10.data。使用文本编辑器显示这些数据,如下图所示,正好10行,数据间使用逗号进行分隔,这是标准的CSV格式。
从本节开始,我们以预测某电商平台“双11”的成交总额(GMV,Gross Merchandise Volume)为例,演示各种操作。
我们很容易查到某电商平台历年“双11”的成交总额,下面是其2009—2017年的记录,成交总额的单位为万亿元,左边年份列记为x,右边成交总额列记为gmv。我们可以使用Python中常用的构造DataFrame的方法,然后使用fromDataframe方法,将DataFrame格式的数据转换为Alink批式数据。(注:在本书的7.6节中,有关于Python数组、DataFrame和Alink批式数据互相转换的介绍。)
df = pd.DataFrame( [ [2009, 0.5], [2010, 9.36], [2011, 52.0], [2012, 191.0], [2013, 350.0], [2014, 571.0], [2015, 912.0], [2016, 1207.0], [2017, 1682.0] ] ) train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double') df_2 = pd.DataFrame( [ [2018], [2019] ] ) pred_set = BatchOperator.fromDataframe(df_2, schemaStr='x int')
我们需要对这些数据进行建模,从而预测该电商平台2018年和2019年的值。由于2018年和2019年已经过去,我们知道实际的数值:2018年该电商平台的成交总额为2135亿元,2019年该电商平台的成交总额为2684亿元。
从年份和成交总额的数据上看,二者显然不成线性关系,这就需要构造新的特征,更好地拟合历史数据,进行预测。我们可以定义x的平方为新的特征,这样就相当于计算gmv与关于x的二次多项式之间的关系。
train_set = train_set.select("x, x*x AS x2, gmv"); trainer = LinearRegTrainBatchOp()\ .setFeatureCols(["x", "x2"])\ .setLabelCol("gmv") train_set.link(trainer); trainer.link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + "gmv_reg.model")\ .setOverwriteSink(True) ) BatchOperator.execute();
需要说明的是:
运行结束后,可以看到生成了gmv_reg.model文件。下面,我们导入此模型,并进行批式预测:
lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model") pred_set = pred_set.select("x, x*x AS x2") predictor = LinearRegPredictBatchOp().setPredictionCol("pred") predictor.linkFrom(lr_model, pred_set).print();
预测流程分为如下几步:
(1)通过AlinkFileSourceBatchOp读取文件gmv_reg.model,得到模型数据,并记为lr_model。
(2)预测的原始数据只有一列x,需要执行SQL SELECT方案,生成新的特征x2。
(3)定义线性回归的批式预测组件(LinearRegPredictBatchOp)为predictor,指定输出结果列的名称。
(4)将这些组件组合为预测流程。因为predictor需要模型数据及预测特征数据,所以使用linkFrom方法,同时连接两个上游组件。之后链式调用print方法,输出预测结果。注意:在批式场景中的print方法本身就会触发批式任务执行,这里就不用再调用BatchOperator.execute()了。
下图为模型预测结果,其最后一列为预测的成交额(单位:亿元)。我们对比前面提到的实际数值(2018年该电商平台的成交总额为2135亿元,2019年该电商平台的成交总额为2684亿元),可以发现该预测结果与实际数值非常接近。
前面介绍了批式模型训练,以及针对批式数据的预测。在实际的场景中,需要预测的数据常常以流的方式陆续到来,因此需要随时进行预测,即进行流式预测。
我们还是以某电商平台“双11”的成交总额预测为例,演示一个流式预测的简单场景:预测模型已经通过批式训练的方式生成,使用这个固定的预测模型,构建流式任务,预测流式数据。
Alink批式任务的代码与流式任务的代码很相似,将批式组件的名称后缀“BatchOp”改为“StreamOp”即可,参数基本不用调整;在一些使用方式上需要进行微调。流式预测的代码如下:
pred_set = StreamOperator.fromDataframe(df_2, schemaStr='x int') lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model") predictor = LinearRegPredictStreamOp(lr_model).setPredictionCol("pred") pred_set\ .select("x, x*x AS x2")\ .link(predictor)\ .print() StreamOperator.execute()
可以对比批式预测的代码,理解流式预测的代码:
(1)在数据源获取方面,通过内存数据构造出一个流式数据源。这里的流式组件MemSourceStreamOp与前面的批式组件MemSourceBatchOp只有名称有区别,组件参数的设置类似。这就为我们进行流式操作和批式操作的互相转换提供了方便。
(2)因为使用批式的模型,这里还是通过AlinkFileSourceBatchOp读取文件gmv_reg.model,得到模型数据,并记为lr_model。
(3)定义线性回归的流式预测组件(LinearRegPredictStreamOp)为predictor,指定输出结果列的名称。注意:因为流式任务的流程中无法接入批式的数据,所以将批式模型数据lr_model作为LinearRegPredictStreamOp构造函数的参数传入。在流式任务执行前,会先完成批式模型数据的导入。
(4)组装预测流程。由于流式预测组件predictor已经在构造函数中导入了模型数据,因此只需连入一个流式预测数据即可,不要使用linkFrom方法。整个流程可以写得更简单,可直接把预测数据作为源头,接入流式SQL SELECT操作,生成x2(注意这里代码的写法,与批式调用时完全一致),然后连接流式预测组件predictor,并对预测结果进行打印。
(5)调用StreamOperator.execute(),执行流式任务。
流式预测结果,如下图所示,其与批式预测的结果完全相同。
注意:Alink在Jupyter上的流式数据的显示,采用时间窗口刷新的方式。如果数据流结束,则输出结果停留在最近显示的内容上;图上方的一串输出字符,就对应着流式数据显示的时间窗口信息。
前面介绍了Alink在批式模型训练、批式预测和流式预测中的例子,可以看到这三个流程中都涉及两个步骤(使用SQL SELECT构造新的特征项,以及线性回归的训练或者预测),因此,可以对该功能进行抽象,形成Pipeline(管道)。其具体概念和用法在本书后面会介绍,这里就不展开说明了。这里只是通过示例,让读者有一个初步的印象:可以通过Pipeline简化操作。
示例代码如下:
train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double') pipeline = Pipeline()\ .add( Select().setClause("*, x*x AS x2") )\ .add( LinearRegression()\ .setFeatureCols(["x", "x2"])\ .setLabelCol("gmv")\ .setPredictionCol("pred") ) pipeline.fit(train_set).save(DATA_DIR + "gmv_pipeline.model") BatchOperator.execute()
这段代码由四部分构成:
下面我们将通过读取模型文件gmv_pipeline.model,得到PipelineModel,并使用其transform方法,对批式数据和流式数据进行预测。
读取模型文件,得到PipelineModel的代码很简单:
pipelineModel = PipelineModel.load(DATA_DIR + "gmv_pipeline.model")
对于批式数据的预测代码如下:
pred_batch = BatchOperator.fromDataframe(df_2, schemaStr='x int') pipelineModel.transform(pred_batch).print()
结果如下图所示:
对于流式数据的预测代码如下,我们看到pipelineModel对于批式数据和流式数据处理所用的方法名称都是一样的,使用方式也是一样的;但transform方法的输出结果究竟是批式的还是流式的,取决于输入数据是批式的还是流式的:
pred_stream = StreamOperator.fromDataframe(df_2, schemaStr='x int') pipelineModel.transform(pred_stream).print() StreamOperator.execute()
使用Pipeline方式,流式预测的结果如下图所示,与批式预测的结果相同。
除了使用Alink算法组件直接对批式的数据或者流式的数据进行预测,用户也希望我们能提供SDK的方式,即,由参数或模型数据直接构建一个本地的Java实例,可以对单条数据进行预测,我们称之为LocalPredictor。如此一来,预测不再必须由Flink任务完成,而可以嵌入提供RestAPI的预测服务系统中,或者嵌入用户的业务系统里。
有了存储好的PipelineModel模型,可以直接使用LocalPredictor的构造函数来构建实例。构造函数需要两个参数,一个是PipelineModel模型文件的路径,另一个是所要预测数据的各数据列名称和类型,即输入一个Alink Schema String格式(详见2.8节的内容)的参数。这里的参数为"x int",表示输入的预测数据只有1列,名称为x,类型为整型。具体代码如下:
predictor = LocalPredictor(DATA_DIR + "gmv_pipeline.model", "x int")
LocalPredictor输入的预测数据是Row格式的,输出的预测结果也是Row格式的。Row格式本身并没有列名和类型的定义,需要通过在构造函数中输入预测数据的Schema来获取预测数据各数据列的名称和类型;LocalPredictor可以根据PipelineModel模型的计算过程,推导出预测结果的Schema(结果数据列名称及类型)。对于我们刚构建的localPredictor,使用getOutputSchema方法获取Schema信息并打印显示Schema信息,具体代码如下:
print(predictor.getOutputColNames());
运行结果如下:
['x', 'x2', 'pred']
可以看出,LocalPredictor的预测结果与批式和流式预测结果一样,其预测输出共3列,最关键的分类预测结果列“pred”在最后。
LocalPredictor使用map方法进行预测,具体代码如下:
for x in [2018, 2019] : print(predictor.map([x]))
计算结果如下:
[2018 4072324 2142.404761955142] [2019 4076361 2682.2262857556343]
最右面的数值是预测结果,对其保留小数点后6位有效数字,得到的结果与前面批式/流式任务计算的结果相同。