对于一些复杂的场景,不仅分类/回归模型会不断更新,与之对应的数据处理、特征工程阶段也会产生随数据变化的模型,这就要求多个模型要同时更新。将多个阶段PipelineStage当作一个整体,即PipelineModel,这就解决了同时更新的问题。PipelineModel是个“超级”批式模型,包含了其中多个Stage的最新参数,也可以放入到模型流中。从模型流中得到的最新PipelineModel,可以在流式任务或服务不停的前提下,整体替换旧的PipelineModel
定义Pipeline包含4个阶段:
具体代码如下:
Pipeline pipeline = new Pipeline() .add( new StandardScaler() .setSelectedCols(NUMERICAL_COL_NAMES) ) .add( new OneHotEncoder() .setSelectedCols(CATEGORY_COL_NAMES) .setDropLast(false) .setEncode(Encode.ASSEMBLED_VECTOR) .setOutputCols(VEC_COL_NAME) ) .add( new VectorAssembler() .setSelectedCols(ArrayUtils.add(NUMERICAL_COL_NAMES, VEC_COL_NAME)) .setOutputCol(VEC_COL_NAME) ) .add( new LogisticRegression() .setVectorCol(VEC_COL_NAME) .setLabelCol(LABEL_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setReservedCols(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) );
模型训练集来自文件(路径为DATA_DIR + "avazu-small.csv")
CsvSourceBatchOp train_set = new CsvSourceBatchOp() .setFilePath(DATA_DIR + "avazu-small.csv") .setSchemaStr(SCHEMA_STRING);
本节会通过控制训练集的大小,训练出一系列PipelineModel。首先使用5%的训练数据,得到PipelineModel作为后续预测操作的初始模型,保存到文件(路径为DATA_DIR + INIT_PIPELINE_MODEL_FILE),具体代码如下:
pipeline .fit(train_set.sample(0.05)) .save(DATA_DIR + INIT_PIPELINE_MODEL_FILE, true); BatchOperator.execute();
接下来,循环进行PipelineModel的训练。每次train_set的采样率增加5%,形成当前训练集,pipeline训练得到当前的PipelineModel,并通过AppendModelStreamFileSinkBatchOp组件,将其添加到模型流。
for (int i = 2; i <= 20; i++) { pipeline .fit(train_set.sample(0.05 * i)) .save() .link( new AppendModelStreamFileSinkBatchOp() .setFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR) .setNumKeepModel(19) ); BatchOperator.execute(); System.out.println("\nTrain " + (i - 1) + " PipelineModels.\n"); Thread.sleep(2000); }
查看模型流所在文件夹(路径为DATA_DIR + PIPELINE_MODEL_STREAM_DIR),显示如下图。
在常规的使用PipelineModel进行流式预测流程的基础上,使用setModelStreamFilePath方法,设置模型流文件夹的路径,就可以通过模型流,更新PipelineModel了。具体代码如下:
StreamOperator <?> predResult = PipelineModel .load(DATA_DIR + INIT_PIPELINE_MODEL_FILE) .setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR) .transform( new CsvSourceStreamOp() .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv") .setSchemaStr(SCHEMA_STRING) .setIgnoreFirstLine(true) ); predResult .sample(0.0001) .select("'Pred Sample' AS out_type, *") .print(); predResult .link( new EvalBinaryClassStreamOp() .setLabelCol(LABEL_COL_NAME) .setPredictionDetailCol(PRED_DETAIL_COL_NAME) .setTimeInterval(10) ) .link( new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[] {"Statistics"}) .setOutputCols(new String[] {"Accuracy", "AUC", "ConfusionMatrix"}) .setJsonPath("$.Accuracy", "$.AUC", "$.ConfusionMatrix") ) .select("'Eval Metric' AS out_type, *") .print(); StreamOperator.execute();
上面的代码为函数Chap29.c_4_2()的内容,同时运行Chap29.c_4_2()和上一节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新预测模型。Chap29.c_4_2()的运行结果如下,其中包含了两种输出,一种是预测结果的采样输出,一种是流式模型评估结果。为了节省篇幅,我们略去大部分预测结果的输出,可以看到流式评估指标有提升。
out_type|click|pred|pred_info --------|-----|----|--------- out_type|Statistics|Accuracy|AUC|ConfusionMatrix --------|----------|--------|---|--------------- Pred Sample|0|0|{"0":"0.8824338740627036","1":"0.11756612593729643"} ...... Pred Sample|0|0|{"0":"0.9444219542671783","1":"0.05557804573282166"} Eval Metric|all|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]] Eval Metric|window|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]] Pred Sample|0|0|{"0":"0.9398133307532698","1":"0.06018666924673022"} ...... Pred Sample|0|0|{"0":"0.8557456903577869","1":"0.14425430964221309"} Eval Metric|window|0.8295145505896383|0.6684136785227608|[[3868,5052],[37263,202020]] Eval Metric|all|0.8296091970628269|0.6676815710617451|[[7303,10104],[65381,360223]] Pred Sample|0|0|{"0":"0.9778352680682564","1":"0.02216473193174362"} ...... Pred Sample|0|0|{"0":"0.7777008376593497","1":"0.22229916234065028"} Eval Metric|window|0.8339849908557735|0.6724791468610775|[[3245,3411],[38709,208347]] Eval Metric|all|0.8312026443794737|0.6690371151657384|[[10548,13515],[104090,568570]] Pred Sample|0|0|{"0":"0.878261454337365","1":"0.12173854566263498"} ...... Pred Sample|0|0|{"0":"0.9112290282109129","1":"0.0887709717890871"} Eval Metric|all|0.8322811498899614|0.6725518118953981|[[14109,17319],[141577,774390]] Eval Metric|window|0.835278770664454|0.6831684163174138|[[3561,3804],[37487,205820]] Pred Sample|0|0|{"0":"0.7908054547875323","1":"0.20919454521246772"} ...... Pred Sample|0|0|{"0":"0.9038244759100549","1":"0.09617552408994512"} Eval Metric|window|0.8333604847521605|0.6792313027625998|[[3957,4390],[37549,205779]] Eval Metric|all|0.8325076934624334|0.6738985382938575|[[18066,21709],[179126,980169]] Pred Sample|1|0|{"0":"0.9126242140286693","1":"0.08737578597133067"} Pred Sample|0|0|{"0":"0.8954749661946126","1":"0.10452503380538736"}
LocalPredictor本身没有提供接入模型流的方法,需要在其导入的PipelineModel中进行设置。如下面代码所示,导入初始模型(模型文件路径:DATA_DIR + INIT_PIPELINE_MODEL_FILE),得到PipelineModel,然后使用setModelStreamFilePath方法,设置模型流所在的文件夹路径。再将包含模型流设置的PipelineModel保存为模型文件(路径为DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE)。
PipelineModel .load(DATA_DIR + INIT_PIPELINE_MODEL_FILE) .setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR) .save(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, true); BatchOperator.execute();
LocalPredictor构造函数中输入包含模型流设置的PipelineModel模型文件路径(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE),这样就得到了接入PipelineModel模型流的LocalPredictor,具体代码如下:
LocalPredictor localPredictor = new LocalPredictor(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, SCHEMA_STRING);
我们输入一条数据,接下来,对该数据循环进行预测,预测间隔2秒,预测结束后,调用localPredictor的close方法,释放资源,结束对模型流文件夹的监控。
Object[] input = new Object[] { "10000949271186029916", "1", "14102100", "1005", 0, "1fbe01fe", "f3845767", "28905ebd", "ecad2386", "7801e8d9", "07d7df22", "a99f214a", "37e8da74", "5db079b5", "1", "2", 15707, 320, 50, 1722, 0, 35, -1, 79}; for (int i = 1; i <= 100; i++) { System.out.print(i + "\t"); System.out.println(ArrayUtils.toString(localPredictor.predict(input))); Thread.sleep(2000); } localPredictor.close();
上面的代码为函数Chap29.c_4_3()的内容,同时运行Chap29.c_4_3()和第29.4.1节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新localPredictor的预测模型。Chap29.c_4_3()的运行的部分结果如下所示,因为每次预测的都是同一个数据,如果模型没有变化,结果也是相同的,我们从运行结果上,可以看出预测结果在发生变化,说明了localPredictor在根据PipelineModel模型流更新预测模型。
18 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"} 19 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"} 20 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"} 21 1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"} 22 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"} 23 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"} 24 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"} 25 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"} 26 1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"} 27 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 28 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 29 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 30 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 31 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 32 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 33 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 34 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 35 1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"} 36 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"} 37 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"} 38 1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}