Alink教程(Java版)

第29.1节 “看到”模型流


在第14章(在线学习)的示例中,其实就已经在使用模型流。如下图所示, FtrlTrain(FTRL 在线训练组件)和 FtrlPredict(FTRL 流式预测组件)之间通过模型数据流连接,即 FtrlTrain 不断产生新的模型,流式地传给 FtrlPredict 组件,每当 FtrlPredict 组件接收到一个完整的模型时,便会替换其旧的模型,切换成新的模型。


接下来,我们先使用简单示例,复现在线学习的过程,再尝试去“看到”模型流。

29.1.1 一个简单的在线学习

在第14章的示例上进行精简,只考虑数值型特征,这样可以省略特征工程的部分,直接进入在线学习的训练和预测。关于数据和流程方面的介绍,请读者阅读第14章内容,这里不再赘述。

首先,需要离线训练一个LR模型,作为后续在线学习的训练和预测的初始模型,具体代码如下:

new CsvSourceBatchOp()
	.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-small.csv")
	.setSchemaStr(SCHEMA_STRING)
	.link(
		new LogisticRegressionTrainBatchOp()
			.setFeatureCols(NUMERICAL_COL_NAMES)
			.setLabelCol(LABEL_COL_NAME)
			.setMaxIter(10)
	)
	.link(
		new AkSinkBatchOp()
			.setFilePath(DATA_DIR + INIT_NUMERIC_LR_MODEL_FILE)
	);
BatchOperator.execute();


有了初始模型,下一步搭建一个精简的在线学习流程,如下面代码所示:

AkSourceBatchOp initModel = new AkSourceBatchOp()
	.setFilePath(DATA_DIR + INIT_NUMERIC_LR_MODEL_FILE);

// prepare stream train data
CsvSourceStreamOp data = new CsvSourceStreamOp()
	.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv")
	.setSchemaStr(SCHEMA_STRING)
	.setIgnoreFirstLine(true);

// split stream to train and eval data
SplitStreamOp spliter = new SplitStreamOp().setFraction(0.5).linkFrom(data);
StreamOperator train_stream_data = spliter;
StreamOperator test_stream_data = spliter.getSideOutput(0);

// ftrl train
FtrlTrainStreamOp model_stream = new FtrlTrainStreamOp(initModel)
	.setFeatureCols(NUMERICAL_COL_NAMES)
	.setLabelCol(LABEL_COL_NAME)
	.setTimeInterval(10)
	.linkFrom(train_stream_data);

// ftrl predict
FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel)
	.setPredictionCol(PREDICTION_COL_NAME)
	.setReservedCols(LABEL_COL_NAME)
	.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
	.linkFrom(model_stream, test_stream_data);

predResult
	.sample(0.0001)
	.select("'Pred Sample' AS out_type, *")
	.print();

// ftrl eval
predResult
	.link(
		new EvalBinaryClassStreamOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
			.setTimeInterval(10)
	)
	.link(
		new JsonValueStreamOp()
			.setSelectedCol("Data")
			.setReservedCols(new String[] {"Statistics"})
			.setOutputCols(new String[] {"Accuracy", "AUC", "ConfusionMatrix"})
			.setJsonPath(new String[] {"$.Accuracy", "$.AUC", "$.ConfusionMatrix"})
	)
	.select("'Eval Metric' AS out_type, *")
	.print();

StreamOperator.execute();

这里简单介绍一下如何输出流式结果,更详细的内容参考第14章的介绍。由于数据较多,将预测结果流 predResult接入流式采样组件,减少打印输出的条数, 并在每行预测数据前用 SQL 语句增加标识列,显示“Pred Sample”。 再将预测结果流 predResult 接入流式二分类评估组件 EvalBinaryClassStreamOp, 并设置相应的参数。由于每次评估结果是 Json 格式的,为了便于显示,还可以在后面上 Json 内容提取组件 JsonValueStreamOp,并增加标识列,显示相应行输出的内容为评估指标 Eval Metric。

输出了两种流式数据,一种是预测数据,各列的名称如下。

out_type|click|pred|pred_info
--------|-----|----|---------

第 1 列为标识信息,第 2 列是原始的 click 信息,第 3 列为预测结果,第 4 列为预测的详细 信息。对应的预测结果形式如下。

Pred Sample|0|0|{"0":"0.9220358860557992","1":"0.07796411394420077"} 

另一种是评估指标,各列的名称如下。

 out_type|Statistics|Accuracy|AUC|ConfusionMatrix 
 --------|----------|--------|---|---------------

其中,Statistics 列有两个值 all windowall 表示从开始运行到现在的所有预测数据的评 估结果;window 表示时间窗口(当前设置为 10 秒)的所有预测数据的评估结果。



29.1.2 模型流“现身”

在上节代码基础上,将下面这段代码:

// ftrl predict
FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel)
	.setPredictionCol(PREDICTION_COL_NAME)
	.setReservedCols(LABEL_COL_NAME)
	.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
	.linkFrom(model_stream, test_stream_data);


改为:

model_stream.link(
	new ModelStreamFileSinkStreamOp()
		.setFilePath(DATA_DIR + FTRL_MODEL_STREAM_DIR)
		.setNumKeepModel(5)
);

StreamOperator <?> new_model_stream = new ModelStreamFileSourceStreamOp()
	.setSchemaStr(TableUtil.schema2SchemaStr(initModel.getSchema()))
	.setFilePath(DATA_DIR + FTRL_MODEL_STREAM_DIR);

// ftrl predict
FtrlPredictStreamOp predResult = new FtrlPredictStreamOp(initModel)
	.setPredictionCol(PREDICTION_COL_NAME)
	.setReservedCols(LABEL_COL_NAME)
	.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
	.linkFrom(new_model_stream, test_stream_data);

这里引入了两个新组件:

  • ModelStreamFileSinkStreamOp组件将model_stream写出到文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR),为防止文件夹内保存太多的模型,占用大量空间,这里设置保留最近的5个模型。
  • ModelStreamFileSourceStreamOp组件负责从文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR,即上个组件写出的路径)中读取新的模型流new_model_stream,然后,将new_model_stream提供给FTRL预测组件。


运行流式任务,可以通过打印输出内容了解预测结果于模型评估指标,与上节的输出内容相似。说明将模型流写到文件系统,再导入流式任务的方式没有影响原先的在线学习流程。

我们打开模型流所在的文件夹(路径为DATA_DIR + FTRL_MODEL_STREAM_DIR),显示如下图所示,每个模型都以时间进行命名,有个“conf”子文件夹,用来存放模型流的一些meta信息。

本节实验中,我们在文件系统中“看到”了模型流,模型流可以保存到文件系统,也可以从文件系统导入,这给了我们更大的想象空间:

  • 模型训练和基于模型流动态更新模型的预测过程可以分离。
  • 在线学习可以产生模型流,批式任务产生的模型也可以加入模型流
  • 增量训练得到批式模型,多个增量训练可以产生一系列模型,加入到模型流
  • 流式组件可以消费模型流,PipelineModel中的PipelineStage也可以在transform流式数据时消费模型流,
  • PipelineModel是个“超级”批式模型,包含了其中多个Stage的最新参数,也可以放入到模型流中
  • 从模型流中得到的最新PipelineModel,可以在流式任务或服务不停的前提下,整体替换旧的PipelineModel
  • LocalPredictor在执行嵌入式预测服务时,也可以从文件系统中读取模型流,动态更新模型
  • 刚训练出的模型流,可以通过流式或批式的方式导入,进行评估、筛选,选中的模型进入正式发布的模型流


我们想到的这些,现在都已在Alink上成为现实。在后面的章节中,我们将选择典型的场景进行介绍。