Alink教程(Java版)

第30.2节 多线程(Multi-threads)


还是以OneHot+GBDT+FM 这个线上推理服务为例,这个服务是有三个推理组件构成,其中OneHot最轻,GBDT次之,FM最重,如果按照之前的并行模型必然导致资源分配不均衡,OneHot分配资源太多,FM分的太少。

加入多线程后我们可以为每一个推理子组件设置并行线程数,我们可以为计算负载较轻的 OneHot 设置2个线程,GBDT次之,设置3个线程,FM最重,设置4个线程,从而让资源分配更加合理,提升推理效率。

30.2.1 批式组件使用多线程


与第30.1.1节中实验相同,批式任务还是使用4个并行,代码如下:

BatchOperator.setParallelism(4);


KNN批式预测组件KnnPredictBatchOp,使用setNumThreads方法,设置多线程数为2,具体代码如下:

new KnnPredictBatchOp()
	.setK(3)
	.setVectorCol(VECTOR_COL_NAME)
	.setPredictionCol(PREDICTION_COL_NAME)
	.setNumThreads(2)
	.linkFrom(getKnnModel(), getTestSet())
	.link(
		new EvalMultiClassBatchOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
			.lazyPrintMetrics()
	);
BatchOperator.execute();


运行结果如下,当前方式(4个并行,2线程)用时47.884秒,对比第30.1.1节中,批式推理(4个并行,使用默认单线程)用时58.368秒,时间上有显著减少。

Batch mode with Parallelism=4 and NumThreads=2
-------------------------------- Metrics: --------------------------------
Accuracy:0.9719	Macro F1:0.9718	Micro F1:0.9719	Kappa:0.9688	
|Pred\Real|  9|  8|  7|...|  2|   1|  0|
|---------|---|---|---|---|---|----|---|
|        9|971|  4| 12|...|  1|   0|  0|
|        8|  4|924|  0|...|  2|   0|  0|
|        7|  8|  4|991|...| 13|   0|  1|
|      ...|...|...|...|...|...| ...|...|
|        2|  1|  3|  4|...|994|   2|  1|
|        1|  4|  0| 19|...|  9|1133|  1|
|        0|  4|  7|  0|...| 10|   0|974|

47 seconds  884.0 milliseconds.


有的读者会问:“为什么时间没有变为1/2呢?”,因为多线程只是缩短了由模型进行推理的时间,但整个流程中,模型加载、读取数据、评估等阶段并没有变化。

30.2.2 流式组件使用多线程


与第30.1.2节中实验相同,流式任务还是使用4个并行,代码如下:

StreamOperator.setParallelism(4);


KNN流式预测组件KnnPredictStreamOp,使用setNumThreads方法,设置多线程数为2,具体代码如下:

getTestStream()
	.link(
		new KnnPredictStreamOp(getKnnModel())
			.setK(3)
			.setVectorCol(VECTOR_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
			.setNumThreads(2)
	)
	.link(
		new EvalMultiClassStreamOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
	)
	.link(
		new JsonValueStreamOp()
			.setSelectedCol("Data")
			.setReservedCols(new String[] {"Statistics"})
			.setOutputCols(new String[] {"Accuracy", "Kappa"})
			.setJsonPath(new String[] {"$.Accuracy", "$.Kappa"})
	)
	.print();
StreamOperator.execute();


运行结果如下,【此处需要重新运行】

Statistics|Accuracy|Kappa
----------|--------|-----
all|0.9618441971383148|0.9575622468170278
window|0.9618441971383148|0.9575622468170278
window|0.9551020408163265|0.9499737029359898
all|0.9582111436950147|0.9534851720075838
window|0.9582278481012658|0.9535783985526859
all|0.958217270194986|0.9535357505800242
window|0.9533169533169533|0.9481023255970027
all|0.9568733153638814|0.9520503164721521
window|0.968789013732834|0.9652701901164424
all|0.9594056778986468|0.9548648580320753
all|0.958916083916084|0.9543275914197533
window|0.9566294919454771|0.9516305529820548
window|0.9706666666666667|0.9673687978467362
all|0.96057078482914|0.9561659421998374
all|0.9627487667970743|0.9585870444485952
......



30.2.3 PipelineModel使用多线程


PipelineModel使用多线程,也是使用setNumThreads方法,设置多线程数。PipelineModel的transform方法,可以处理批式数据,也可以处理流式数据,

在批式场景中,PipelineModel使用4并行,2线程的方式预测批式数据的主要代码如下:

BatchOperator.setParallelism(4);

......

getPipelineModel()
	.setNumThreads(2)
	.transform(getTestSet())

......
    
BatchOperator.execute();


在流式场景中,PipelineModel使用4并行,2线程的方式预测流式数据的主要代码如下:

StreamOperator.setParallelism(4);

......
    
getPipelineModel()
	.setNumThreads(2)
	.transform(getTestStream())

......
    
StreamOperator.execute();