在第14章(在线学习)的示例中,其实就已经在使用模型流。如下图所示, FtrlTrain(FTRL 在线训练组件)和 FtrlPredict(FTRL 流式预测组件)之间通过模型数据流连接,即 FtrlTrain 不断产生新的模型,流式地传给 FtrlPredict 组件,每当 FtrlPredict 组件接收到一个完整的模型时,便会替换其旧的模型,切换成新的模型。
接下来,我们先使用简单示例,复现在线学习的过程,再尝试去“看到”模型流。
在第14章的示例上进行精简,只考虑数值型特征,这样可以省略特征工程的部分,直接进入在线学习的训练和预测。关于数据和流程方面的介绍,请读者阅读第14章内容,这里不再赘述。
首先,需要离线训练一个LR模型,作为后续在线学习的训练和预测的初始模型,具体代码如下:
new CsvSourceBatchOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv") .setSchemaStr(SCHEMA_STRING) .link( new LogisticRegressionTrainBatchOp() .setFeatureCols(NUMERICAL_COL_NAMES) .setLabelCol(LABEL_COL_NAME) .setMaxIter(10) ) .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + INIT_NUMERIC_LR_MODEL_FILE) ); BatchOperator.execute();
有了初始模型,下一步搭建一个精简的在线学习流程,如下面代码所示:
AkSourceBatchOp initModel = new AkSourceBatchOp() .setFilePath(DATA_DIR + INIT_NUMERIC_LR_MODEL_FILE); // prepare stream train data CsvSourceStreamOp data = new CsvSourceStreamOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv") .setSchemaStr(SCHEMA_STRING) .setIgnoreFirstLine(true); // split stream to train and eval data SplitStreamOp spliter = new SplitStreamOp().setFraction(0.5).linkFrom(data); StreamOperator train_stream_data = spliter; StreamOperator test_stream_data = spliter.getSideOutput(0); // ftrl train FtrlTrainStreamOp model_stream = new FtrlTrainStreamOp(initModel) .setFeatureCols(NUMERICAL_COL_NAMES) .setLabelCol(LABEL_COL_NAME) .setTimeInterval(10) .linkFrom(train_stream_data); // ftrl predict FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel) .setPredictionCol(PREDICTION_COL_NAME) .setReservedCols(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) .linkFrom(model_stream, test_stream_data); predResult .sample(0.0001) .select("'Pred Sample' AS out_type, *") .print(); // ftrl eval predResult .link( new EvalBinaryClassStreamOp() .setLabelCol(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) .setTimeInterval(10) ) .link( new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[] {"Statistics"}) .setOutputCols(new String[] {"Accuracy", "AUC", "ConfusionMatrix"}) .setJsonPath(new String[] {"$.Accuracy", "$.AUC", "$.ConfusionMatrix"}) ) .select("'Eval Metric' AS out_type, *") .print(); StreamOperator.execute();
这里简单介绍一下如何输出流式结果,更详细的内容参考第14章的介绍。由于数据较多,将预测结果流 predResult接入流式采样组件,减少打印输出的条数, 并在每行预测数据前用 SQL 语句增加标识列,显示“Pred Sample”。 再将预测结果流 predResult 接入流式二分类评估组件 EvalBinaryClassStreamOp, 并设置相应的参数。由于每次评估结果是 Json 格式的,为了便于显示,还可以在后面上 Json 内容提取组件 JsonValueStreamOp,并增加标识列,显示相应行输出的内容为评估指标 Eval Metric。
输出了两种流式数据,一种是预测数据,各列的名称如下。
out_type|click|pred|pred_info --------|-----|----|---------
第 1 列为标识信息,第 2 列是原始的 click 信息,第 3 列为预测结果,第 4 列为预测的详细 信息。对应的预测结果形式如下。
Pred Sample|0|0|{"0":"0.9220358860557992","1":"0.07796411394420077"}
另一种是评估指标,各列的名称如下。
out_type|Statistics|Accuracy|AUC|ConfusionMatrix --------|----------|--------|---|---------------
其中,Statistics 列有两个值 all 和 window,all 表示从开始运行到现在的所有预测数据的评 估结果;window 表示时间窗口(当前设置为 10 秒)的所有预测数据的评估结果。
在上节代码基础上,将下面这段代码:
// ftrl predict FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel) .setPredictionCol(PREDICTION_COL_NAME) .setReservedCols(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) .linkFrom(model_stream, test_stream_data);
改为:
model_stream.link( new ModelStreamFileSinkStreamOp() .setFilePath(DATA_DIR + FTRL_MODEL_STREAM_DIR) .setNumKeepModel(5) ); StreamOperator <?> new_model_stream = new ModelStreamFileSourceStreamOp() .setSchemaStr(TableUtil.schema2SchemaStr(initModel.getSchema())) .setFilePath(DATA_DIR + FTRL_MODEL_STREAM_DIR); // ftrl predict FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel) .setPredictionCol(PREDICTION_COL_NAME) .setReservedCols(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) .linkFrom(new_model_stream, test_stream_data);
这里引入了两个新组件:
运行流式任务,可以通过打印输出内容了解预测结果于模型评估指标,与上节的输出内容相似。说明将模型流写到文件系统,再导入流式任务的方式没有影响原先的在线学习流程。
我们打开模型流所在的文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR),显示如下图所示,每个模型都以时间进行命名,有个“conf”子文件夹,用来存放模型流的一些meta信息。
本节实验中,我们在文件系统中“看到”了模型流,模型流可以保存到文件系统,也可以从文件系统导入,这给了我们更大的想象空间:
我们想到的这些,现在都已在Alink上成为现实。在后面的章节中,我们将选择典型的场景进行介绍。