还是以OneHot+GBDT+FM 这个线上推理服务为例,这个服务是有三个推理组件构成,其中OneHot最轻,GBDT次之,FM最重,如果按照之前的并行模型必然导致资源分配不均衡,OneHot分配资源太多,FM分的太少。
加入多线程后我们可以为每一个推理子组件设置并行线程数,我们可以为计算负载较轻的 OneHot 设置2个线程,GBDT次之,设置3个线程,FM最重,设置4个线程,从而让资源分配更加合理,提升推理效率。
与第30.1.1节中实验相同,批式任务还是使用4个并行,代码如下:
BatchOperator.setParallelism(4);
KNN批式预测组件KnnPredictBatchOp,使用setNumThreads方法,设置多线程数为2,具体代码如下:
new KnnPredictBatchOp() .setK(3) .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setNumThreads(2) .linkFrom(getKnnModel(), getTestSet()) .link( new EvalMultiClassBatchOp() .setLabelCol(LABEL_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .lazyPrintMetrics() ); BatchOperator.execute();
运行结果如下,当前方式(4个并行,2线程)用时47.884秒,对比第30.1.1节中,批式推理(4个并行,使用默认单线程)用时58.368秒,时间上有显著减少。
Batch mode with Parallelism=4 and NumThreads=2 -------------------------------- Metrics: -------------------------------- Accuracy:0.9719 Macro F1:0.9718 Micro F1:0.9719 Kappa:0.9688 |Pred\Real| 9| 8| 7|...| 2| 1| 0| |---------|---|---|---|---|---|----|---| | 9|971| 4| 12|...| 1| 0| 0| | 8| 4|924| 0|...| 2| 0| 0| | 7| 8| 4|991|...| 13| 0| 1| | ...|...|...|...|...|...| ...|...| | 2| 1| 3| 4|...|994| 2| 1| | 1| 4| 0| 19|...| 9|1133| 1| | 0| 4| 7| 0|...| 10| 0|974| 47 seconds 884.0 milliseconds.
有的读者会问:“为什么时间没有变为1/2呢?”,因为多线程只是缩短了由模型进行推理的时间,但整个流程中,模型加载、读取数据、评估等阶段并没有变化。
与第30.1.2节中实验相同,流式任务还是使用4个并行,代码如下:
StreamOperator.setParallelism(4);
KNN流式预测组件KnnPredictStreamOp,使用setNumThreads方法,设置多线程数为2,具体代码如下:
getTestStream() .link( new KnnPredictStreamOp(getKnnModel()) .setK(3) .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setNumThreads(2) ) .link( new EvalMultiClassStreamOp() .setLabelCol(LABEL_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) ) .link( new JsonValueStreamOp() .setSelectedCol("Data") .setReservedCols(new String[] {"Statistics"}) .setOutputCols(new String[] {"Accuracy", "Kappa"}) .setJsonPath(new String[] {"$.Accuracy", "$.Kappa"}) ) .print(); StreamOperator.execute();
运行结果如下,【此处需要重新运行】。
Statistics|Accuracy|Kappa ----------|--------|----- all|0.9618441971383148|0.9575622468170278 window|0.9618441971383148|0.9575622468170278 window|0.9551020408163265|0.9499737029359898 all|0.9582111436950147|0.9534851720075838 window|0.9582278481012658|0.9535783985526859 all|0.958217270194986|0.9535357505800242 window|0.9533169533169533|0.9481023255970027 all|0.9568733153638814|0.9520503164721521 window|0.968789013732834|0.9652701901164424 all|0.9594056778986468|0.9548648580320753 all|0.958916083916084|0.9543275914197533 window|0.9566294919454771|0.9516305529820548 window|0.9706666666666667|0.9673687978467362 all|0.96057078482914|0.9561659421998374 all|0.9627487667970743|0.9585870444485952 ......
PipelineModel使用多线程,也是使用setNumThreads方法,设置多线程数。PipelineModel的transform方法,可以处理批式数据,也可以处理流式数据,
在批式场景中,PipelineModel使用4并行,2线程的方式预测批式数据的主要代码如下:
BatchOperator.setParallelism(4); ...... getPipelineModel() .setNumThreads(2) .transform(getTestSet()) ...... BatchOperator.execute();
在流式场景中,PipelineModel使用4并行,2线程的方式预测流式数据的主要代码如下:
StreamOperator.setParallelism(4); ...... getPipelineModel() .setNumThreads(2) .transform(getTestStream()) ...... StreamOperator.execute();