Alink教程(Java版)

第29.4节 PipelineModel构成的模型流


对于一些复杂的场景,不仅分类/回归模型会不断更新,与之对应的数据处理、特征工程阶段也会产生随数据变化的模型,这就要求多个模型要同时更新。将多个阶段PipelineStage当作一个整体,即PipelineModel,这就解决了同时更新的问题。PipelineModel是个“超级”批式模型,包含了其中多个Stage的最新参数,也可以放入到模型流中。从模型流中得到的最新PipelineModel,可以在流式任务或服务不停的前提下,整体替换旧的PipelineModel




29.4.1 PipelineModel模型流的生成


定义Pipeline包含4个阶段:

      1. 数值特征进行标准化操作,使用StandardScaler组件
      2. 离散特征进行One-Hot编码,使用OneHotEncoder组件
      3. 将各特征列组装为一个特征向量,使用VectorAssembler组件
      4. LogisticRegression模型

具体代码如下:

Pipeline pipeline = new Pipeline()
	.add(
		new StandardScaler()
			.setSelectedCols(NUMERICAL_COL_NAMES)
	)
	.add(
		new OneHotEncoder()
			.setSelectedCols(CATEGORY_COL_NAMES)
			.setDropLast(false)
			.setEncode(Encode.ASSEMBLED_VECTOR)
			.setOutputCols(VEC_COL_NAME)
	)
	.add(
		new VectorAssembler()
			.setSelectedCols(ArrayUtils.add(NUMERICAL_COL_NAMES, VEC_COL_NAME))
			.setOutputCol(VEC_COL_NAME)
	)
	.add(
		new LogisticRegression()
			.setVectorCol(VEC_COL_NAME)
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
			.setReservedCols(LABEL_COL_NAME)
			.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
	);


模型训练集来自文件(路径为DATA_DIR + "avazu-small.csv")

CsvSourceBatchOp train_set = new CsvSourceBatchOp()
	.setFilePath(DATA_DIR + "avazu-small.csv")
	.setSchemaStr(SCHEMA_STRING);


本节会通过控制训练集的大小,训练出一系列PipelineModel。首先使用5%的训练数据,得到PipelineModel作为后续预测操作的初始模型,保存到文件(路径为DATA_DIR + INIT_PIPELINE_MODEL_FILE),具体代码如下:

pipeline
	.fit(train_set.sample(0.05))
	.save(DATA_DIR + INIT_PIPELINE_MODEL_FILE, true);

BatchOperator.execute();


接下来,循环进行PipelineModel的训练。每次train_set的采样率增加5%,形成当前训练集,pipeline训练得到当前的PipelineModel,并通过AppendModelStreamFileSinkBatchOp组件,将其添加到模型流。

for (int i = 2; i <= 20; i++) {
	pipeline
		.fit(train_set.sample(0.05 * i))
		.save()
		.link(
			new AppendModelStreamFileSinkBatchOp()
				.setFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR)
				.setNumKeepModel(19)
		);

	BatchOperator.execute();

	System.out.println("\nTrain " + (i - 1) + " PipelineModels.\n");

	Thread.sleep(2000);
}


查看模型流所在文件夹(路径为DATA_DIR + PIPELINE_MODEL_STREAM_DIR),显示如下图。



29.4.2 流式任务中使用PipelineModel模型流

在常规的使用PipelineModel进行流式预测流程的基础上,使用setModelStreamFilePath方法,设置模型流文件夹的路径,就可以通过模型流,更新PipelineModel了。具体代码如下:

StreamOperator <?> predResult = PipelineModel
	.load(DATA_DIR + INIT_PIPELINE_MODEL_FILE)
	.setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR)
	.transform(
		new CsvSourceStreamOp()
			.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/data-files/avazu-ctr-train-8M.csv")
			.setSchemaStr(SCHEMA_STRING)
			.setIgnoreFirstLine(true)
	);

predResult
	.sample(0.0001)
	.select("'Pred Sample' AS out_type, *")
	.print();

predResult
	.link(
		new EvalBinaryClassStreamOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
			.setTimeInterval(10)
	)
	.link(
		new JsonValueStreamOp()
			.setSelectedCol("Data")
			.setReservedCols(new String[] {"Statistics"})
			.setOutputCols(new String[] {"Accuracy", "AUC", "ConfusionMatrix"})
			.setJsonPath("$.Accuracy", "$.AUC", "$.ConfusionMatrix")
	)
	.select("'Eval Metric' AS out_type, *")
	.print();

StreamOperator.execute();


上面的代码为函数Chap29.c_4_2()的内容,同时运行Chap29.c_4_2()和上一节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新预测模型。Chap29.c_4_2()的运行结果如下,其中包含了两种输出,一种是预测结果的采样输出,一种是流式模型评估结果。为了节省篇幅,我们略去大部分预测结果的输出,可以看到流式评估指标有提升。

out_type|click|pred|pred_info
--------|-----|----|---------
out_type|Statistics|Accuracy|AUC|ConfusionMatrix
--------|----------|--------|---|---------------
Pred Sample|0|0|{"0":"0.8824338740627036","1":"0.11756612593729643"}
......
Pred Sample|0|0|{"0":"0.9444219542671783","1":"0.05557804573282166"}
Eval Metric|all|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]]
Eval Metric|window|0.8297297852244261|0.6668952993333711|[[3435,5052],[28118,158203]]
Pred Sample|0|0|{"0":"0.9398133307532698","1":"0.06018666924673022"}
......
Pred Sample|0|0|{"0":"0.8557456903577869","1":"0.14425430964221309"}
Eval Metric|window|0.8295145505896383|0.6684136785227608|[[3868,5052],[37263,202020]]
Eval Metric|all|0.8296091970628269|0.6676815710617451|[[7303,10104],[65381,360223]]
Pred Sample|0|0|{"0":"0.9778352680682564","1":"0.02216473193174362"}
......
Pred Sample|0|0|{"0":"0.7777008376593497","1":"0.22229916234065028"}
Eval Metric|window|0.8339849908557735|0.6724791468610775|[[3245,3411],[38709,208347]]
Eval Metric|all|0.8312026443794737|0.6690371151657384|[[10548,13515],[104090,568570]]
Pred Sample|0|0|{"0":"0.878261454337365","1":"0.12173854566263498"}
......
Pred Sample|0|0|{"0":"0.9112290282109129","1":"0.0887709717890871"}
Eval Metric|all|0.8322811498899614|0.6725518118953981|[[14109,17319],[141577,774390]]
Eval Metric|window|0.835278770664454|0.6831684163174138|[[3561,3804],[37487,205820]]
Pred Sample|0|0|{"0":"0.7908054547875323","1":"0.20919454521246772"}
......
Pred Sample|0|0|{"0":"0.9038244759100549","1":"0.09617552408994512"}
Eval Metric|window|0.8333604847521605|0.6792313027625998|[[3957,4390],[37549,205779]]
Eval Metric|all|0.8325076934624334|0.6738985382938575|[[18066,21709],[179126,980169]]
Pred Sample|1|0|{"0":"0.9126242140286693","1":"0.08737578597133067"}
Pred Sample|0|0|{"0":"0.8954749661946126","1":"0.10452503380538736"}



29.4.3 LocalPredictor使用PipelineModel模型流


LocalPredictor本身没有提供接入模型流的方法,需要在其导入的PipelineModel中进行设置。如下面代码所示,导入初始模型(模型文件路径:DATA_DIR + INIT_PIPELINE_MODEL_FILE),得到PipelineModel,然后使用setModelStreamFilePath方法,设置模型流所在的文件夹路径。再将包含模型流设置的PipelineModel保存为模型文件(路径为DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE)。

PipelineModel
	.load(DATA_DIR + INIT_PIPELINE_MODEL_FILE)
	.setModelStreamFilePath(DATA_DIR + PIPELINE_MODEL_STREAM_DIR)
	.save(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, true);
BatchOperator.execute();


LocalPredictor构造函数中输入包含模型流设置的PipelineModel模型文件路径(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE),这样就得到了接入PipelineModel模型流的LocalPredictor,具体代码如下:

LocalPredictor localPredictor
	= new LocalPredictor(DATA_DIR + PIPELINEMODEL_WITH_MODELSTREAM_FILE, SCHEMA_STRING);


我们输入一条数据,接下来,对该数据循环进行预测,预测间隔2秒,预测结束后,调用localPredictor的close方法,释放资源,结束对模型流文件夹的监控。

Object[] input = new Object[] {
	"10000949271186029916", "1", "14102100", "1005", 0, "1fbe01fe", "f3845767", "28905ebd",
	"ecad2386", "7801e8d9", "07d7df22", "a99f214a", "37e8da74", "5db079b5", "1", "2",
	15707, 320, 50, 1722, 0, 35, -1, 79};

for (int i = 1; i <= 100; i++) {
	System.out.print(i + "\t");
	System.out.println(ArrayUtils.toString(localPredictor.predict(input)));
	Thread.sleep(2000);
}

localPredictor.close();


上面的代码为函数Chap29.c_4_3()的内容,同时运行Chap29.c_4_3()和第29.4.1节介绍的PipelineModel模型流生成程序Chap29.c_4_1(),即,一边产生新的PipelineModel,一边不断更新localPredictor的预测模型。Chap29.c_4_3()的运行的部分结果如下所示,因为每次预测的都是同一个数据,如果模型没有变化,结果也是相同的,我们从运行结果上,可以看出预测结果在发生变化,说明了localPredictor在根据PipelineModel模型流更新预测模型。

18	1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
19	1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
20	1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
21	1,0,{"0":"0.7017719823271469","1":"0.2982280176728531"}
22	1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
23	1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
24	1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
25	1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
26	1,0,{"0":"0.7775813140410618","1":"0.2224186859589382"}
27	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
28	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
29	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
30	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
31	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
32	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
33	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
34	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
35	1,0,{"0":"0.7438455030430449","1":"0.25615449695695514"}
36	1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}
37	1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}
38	1,0,{"0":"0.7503320064425485","1":"0.2496679935574515"}