对于一些复杂的场景,不仅分类/回归模型会不断更新,与之对应的数据处理、特征工程阶段也会产生随数据变化的模型,这就要求多个模型要同时更新。将多个阶段PipelineStage当作一个整体,即PipelineModel,这就解决了同时更新的问题。PipelineModel是个“超级”批式模型,包含了其中多个Stage的最新参数,也可以放入到模型流中。从模型流中得到的最新PipelineModel,可以在流式任务或服务不停的前提下,整体替换旧的PipelineModel

定义Pipeline包含4个阶段:
具体代码如下:
Pipeline pipeline = new Pipeline() .add( new StandardScaler() .setSelectedCols(NUMERICAL_COL_NAMES) ) .add( new OneHotEncoder() .setSelectedCols(CATEGORY_COL_NAMES) .setDropLast(false) .setEncode(Encode.ASSEMBLED_VECTOR) .setOutputCols(VEC_COL_NAME) ) .add( new VectorAssembler() .setSelectedCols(ArrayUtils.add(NUMERICAL_COL_NAMES, VEC_COL_NAME)) .setOutputCol(VEC_COL_NAME) ) .add( new LogisticRegression() .setVectorCol(VEC_COL_NAME) .setLabelCol(LABEL_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setReservedCols(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) );
模型训练集来自文件(路径为DATA_DIR + "avazu-small.csv")
CsvSourceBatchOp train_set = new CsvSourceBatchOp() .setFilePath(DATA_DIR + "avazu-small.csv") .setSchemaStr(SCHEMA_STRING);
本节会通过控制训练集的大小,训练出一系列PipelineModel。首先使用5%的训练数据,得到PipelineModel作为后续预测操作的初始模型,保存到文件(路径为DATA_DIR + INIT_PIPELINE_MODEL_FILE),具体代码如下:
pipeline .fit(train_set.sample(0.05)) .save(DATA_DIR + INIT_PIPELINE_MODEL_FILE, true); BatchOperator.execute();
接下来,循环进行PipelineModel的训练。每次train_set的采样率增加5%,形成当前训练集,pipeline训练得到当前的PipelineModel,并通过AppendModelStreamFileSinkBatchOp组件,将其添加到模型流。
for (int i = 2; i <= 20; i++) {
pipeline
.fit(train_set.sample(0.05 * i))
.save()
.link(
new AppendModelStreamFileSinkBatchOp()
.setFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR)
.setNumKeepModel(19)
);
BatchOperator.execute();
System.out.println("\nTrain " + (i - 1) + " PipelineModels.\n");
Thread.sleep(2000);
}查看模型流所在文件夹(路径为DATA_DIR + PIPELINE_MODEL_STREAM_DIR),显示如下图。

在常规的使用PipelineModel进行流式预测流程的基础上,使用setModelStreamFilePath方法,设置模型流文件夹的路径,就可以通过模型流,更新PipelineModel了。具体代码如下:
StreamOperator <?> predResult = PipelineModel
.load(DATA_DIR + INIT_PIPELINE_MODEL_FILE)
.setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR)
.transform(
new CsvSourceStreamOp()
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv")
.setSchemaStr(SCHEMA_STRING)
.setIgnoreFirstLine(true)
);
predResult
.sample(0.0001)
.select("'Pred Sample' AS out_type, *")
.print();
predResult
.link(
new EvalBinaryClassStreamOp()
.setLabelCol(LABEL_COL_NAME)
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
.setTimeInterval(10)
)
.link(
new JsonValueStreamOp()
.setSelectedCol("Data")
.setReservedCols(new String[] {"Statistics"})
.setOutputCols(new String[] {"Accuracy", "AUC", "ConfusionMatrix"})
.setJsonPath("$.Accuracy", "$.AUC", "$.ConfusionMatrix")
)
.select("'Eval Metric' AS out_type, *")
.print();
StreamOperator.execute();上面的代码为函数Chap29.c_4_2()的内容,同时运行Chap29.c_4_2()和上一节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新预测模型。Chap29.c_4_2()的运行结果如下,其中包含了两种输出,一种是预测结果的采样输出,一种是流式模型评估结果。为了节省篇幅,我们略去大部分预测结果的输出,可以看到流式评估指标有提升。
out_type|click|pred|pred_info
--------|-----|----|---------
out_type|Statistics|Accuracy|AUC|ConfusionMatrix
--------|----------|--------|---|---------------
Pred Sample|0|0|{"0":"0.8824338740627036","1":"0.11756612593729643"}
......
Pred Sample|0|0|{"0":"0.9444219542671783","1":"0.05557804573282166"}
Eval Metric|all|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]]
Eval Metric|window|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]]
Pred Sample|0|0|{"0":"0.9398133307532698","1":"0.06018666924673022"}
......
Pred Sample|0|0|{"0":"0.8557456903577869","1":"0.14425430964221309"}
Eval Metric|window|0.8295145505896383|0.6684136785227608|[[3868,5052],[37263,202020]]
Eval Metric|all|0.8296091970628269|0.6676815710617451|[[7303,10104],[65381,360223]]
Pred Sample|0|0|{"0":"0.9778352680682564","1":"0.02216473193174362"}
......
Pred Sample|0|0|{"0":"0.7777008376593497","1":"0.22229916234065028"}
Eval Metric|window|0.8339849908557735|0.6724791468610775|[[3245,3411],[38709,208347]]
Eval Metric|all|0.8312026443794737|0.6690371151657384|[[10548,13515],[104090,568570]]
Pred Sample|0|0|{"0":"0.878261454337365","1":"0.12173854566263498"}
......
Pred Sample|0|0|{"0":"0.9112290282109129","1":"0.0887709717890871"}
Eval Metric|all|0.8322811498899614|0.6725518118953981|[[14109,17319],[141577,774390]]
Eval Metric|window|0.835278770664454|0.6831684163174138|[[3561,3804],[37487,205820]]
Pred Sample|0|0|{"0":"0.7908054547875323","1":"0.20919454521246772"}
......
Pred Sample|0|0|{"0":"0.9038244759100549","1":"0.09617552408994512"}
Eval Metric|window|0.8333604847521605|0.6792313027625998|[[3957,4390],[37549,205779]]
Eval Metric|all|0.8325076934624334|0.6738985382938575|[[18066,21709],[179126,980169]]
Pred Sample|1|0|{"0":"0.9126242140286693","1":"0.08737578597133067"}
Pred Sample|0|0|{"0":"0.8954749661946126","1":"0.10452503380538736"}LocalPredictor本身没有提供接入模型流的方法,需要在其导入的PipelineModel中进行设置。如下面代码所示,导入初始模型(模型文件路径:DATA_DIR + INIT_PIPELINE_MODEL_FILE),得到PipelineModel,然后使用setModelStreamFilePath方法,设置模型流所在的文件夹路径。再将包含模型流设置的PipelineModel保存为模型文件(路径为DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE)。
PipelineModel .load(DATA_DIR + INIT_PIPELINE_MODEL_FILE) .setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR) .save(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, true); BatchOperator.execute();
LocalPredictor构造函数中输入包含模型流设置的PipelineModel模型文件路径(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE),这样就得到了接入PipelineModel模型流的LocalPredictor,具体代码如下:
LocalPredictor localPredictor = new LocalPredictor(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, SCHEMA_STRING);
我们输入一条数据,接下来,对该数据循环进行预测,预测间隔2秒,预测结束后,调用localPredictor的close方法,释放资源,结束对模型流文件夹的监控。
Object[] input = new Object[] {
"10000949271186029916", "1", "14102100", "1005", 0, "1fbe01fe", "f3845767", "28905ebd",
"ecad2386", "7801e8d9", "07d7df22", "a99f214a", "37e8da74", "5db079b5", "1", "2",
15707, 320, 50, 1722, 0, 35, -1, 79};
for (int i = 1; i <= 100; i++) {
System.out.print(i + "\t");
System.out.println(ArrayUtils.toString(localPredictor.predict(input)));
Thread.sleep(2000);
}
localPredictor.close();上面的代码为函数Chap29.c_4_3()的内容,同时运行Chap29.c_4_3()和第29.4.1节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新localPredictor的预测模型。Chap29.c_4_3()的运行的部分结果如下所示,因为每次预测的都是同一个数据,如果模型没有变化,结果也是相同的,我们从运行结果上,可以看出预测结果在发生变化,说明了localPredictor在根据PipelineModel模型流更新预测模型。
18 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
19 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
20 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
21 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
22 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
23 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
24 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
25 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
26 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
27 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
28 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
29 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
30 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
31 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
32 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
33 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
34 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
35 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
36 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}
37 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}
38 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}