Alink教程(Java版)

第30.1节 并行度(Parallelism)

设置当前环境执行的并行度,对于当前任务所涉及的所有批式组件或者流式组件都会按此并行度进行并行执行。

向集群提交分布式任务时,并行度是必填参数,整个任务分布运行在多台机器上。在本地环境执行时,如果不指定并行度,会根据机器硬件配置的不同而被默认指定,并行度的默认值为本地环境的CPU核心/线程数。

可以使用如下方法设置批式任务、流式任务的并行度参数:

BatchOperator.setParallelism(4);

StreamOperator.setParallelism(4);

以OneHot+GBDT+FM 推理服务为例,4个并发度的运行示意图如下,绿色部分为内存保存的模型数据,t0代表使用的一个线程, 推理任务中的所有节点组件都是一样的资源分配。


30.1.1 批式任务的并行度设定


设定并行度的代码如下,对后面的KNN模型批式推理任务及Pipeline模型批式推理任务都起作用。

BatchOperator.setParallelism(4);


使用KNN批式预测组件KnnPredictBatchOp的代码如下,通过getKnnModel方法获取KNN模型,通过getTestSet方法获取批式测试集,最后使用评估组件,并输出评估指标。

new KnnPredictBatchOp()
	.setK(3)
	.setVectorCol(VECTOR_COL_NAME)
	.setPredictionCol(PREDICTION_COL_NAME)
	.linkFrom(getKnnModel(), getTestSet())
	.link(
		new EvalMultiClassBatchOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
			.lazyPrintMetrics()
	);
BatchOperator.execute();


通过getPipelineModel方法获得Pipeline模型,然后使用tranform方法进行推理,通过getTestSet方法获取批式测试集,最后使用评估组件,并输出评估指标。具体代码如下:

getPipelineModel()
	.transform(getTestSet())
	.link(
		new EvalMultiClassBatchOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
			.lazyPrintMetrics()
	);
BatchOperator.execute();


输出的结果如下,根据两次预测结果计算的评估指标是一样的,用时都是58秒多。

Batch mode with parallelism = 4
-------------------------------- Metrics: --------------------------------
Accuracy:0.9719	Macro F1:0.9718	Micro F1:0.9719	Kappa:0.9688	
|Pred\Real|  9|  8|  7|...|  2|   1|  0|
|---------|---|---|---|---|---|----|---|
|        9|971|  4| 12|...|  1|   0|  0|
|        8|  4|924|  0|...|  2|   0|  0|
|        7|  8|  4|991|...| 13|   0|  1|
|      ...|...|...|...|...|...| ...|...|
|        2|  1|  3|  4|...|994|   2|  1|
|        1|  4|  0| 19|...|  9|1133|  1|
|        0|  4|  7|  0|...| 10|   0|974|

58 seconds  368.0 milliseconds.

Pipeline-Batch mode with parallelism = 4
-------------------------------- Metrics: --------------------------------
Accuracy:0.9719	Macro F1:0.9718	Micro F1:0.9719	Kappa:0.9688	
|Pred\Real|  9|  8|  7|...|  2|   1|  0|
|---------|---|---|---|---|---|----|---|
|        9|971|  4| 12|...|  1|   0|  0|
|        8|  4|924|  0|...|  2|   0|  0|
|        7|  8|  4|991|...| 13|   0|  1|
|      ...|...|...|...|...|...| ...|...|
|        2|  1|  3|  4|...|994|   2|  1|
|        1|  4|  0| 19|...|  9|1133|  1|
|        0|  4|  7|  0|...| 10|   0|974|

58 seconds  976.0 milliseconds.



30.1.2 流式任务的并行度设定

设定并行度的代码如下,对后面的KNN模型流式推理任务及Pipeline模型流式推理任务都起作用。

StreamOperator.setParallelism(4);


使用KNN流式预测组件KnnPredictStreamOp的代码如下,通过getKnnModel方法获取KNN模型,通过getTestStream方法获取流式测试集,然后使用流式评估组件,并使用JsonValueStreamOp组件从JSON格式的评估结果中抽取出Accuracy和Kappa指标。

getTestStream()
	.link(
		new KnnPredictStreamOp(getKnnModel())
			.setK(3)
			.setVectorCol(VECTOR_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
	)
	.link(
		new EvalMultiClassStreamOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
	)
	.link(
		new JsonValueStreamOp()
			.setSelectedCol("Data")
			.setReservedCols(new String[] {"Statistics"})
			.setOutputCols(new String[] {"Accuracy", "Kappa"})
			.setJsonPath(new String[] {"$.Accuracy", "$.Kappa"})
	)
	.print();
StreamOperator.execute();


通过getPipelineModel方法获得Pipeline模型,然后使用tranform方法进行推理,通过getTestStream方法获取流式式测试集,然后使用流式评估组件,并使用JsonValueStreamOp组件从JSON格式的评估结果中抽取出Accuracy和Kappa指标。具体代码如下:

getPipelineModel()
	.transform(getTestStream())
	.link(
		new EvalMultiClassStreamOp()
			.setLabelCol(LABEL_COL_NAME)
			.setPredictionCol(PREDICTION_COL_NAME)
	)
	.link(
		new JsonValueStreamOp()
			.setSelectedCol("Data")
			.setReservedCols(new String[] {"Statistics"})
			.setOutputCols(new String[] {"Accuracy", "Kappa"})
			.setJsonPath(new String[] {"$.Accuracy", "$.Kappa"})
	)
	.print();
StreamOperator.execute();


输出的结果如下,

Stream mode with Parallelism=4
Statistics|Accuracy|Kappa
----------|--------|-----
all|0.9725609756097561|0.9693623380936566
window|0.9725609756097561|0.9693623380936566
all|0.9602094240837696|0.9557033872444309
window|0.9537480063795853|0.9485054020758698
all|0.9587242026266416|0.9540765773312335
...........
window|0.9923273657289002|0.9914675395526459
all|0.9716763640355692|0.9685152093386061
window|0.9954853273137697|0.994977779793216
all|0.9727805695142379|0.9697434011327877
1 minutes  5 seconds  116.0 milliseconds.


Pipeline stream mode with Parallelism=4
Statistics|Accuracy|Kappa
----------|--------|-----
all|0.9711815561959655|0.9678247888212005
window|0.9711815561959655|0.9678247888212005
window|0.9579288025889967|0.9531556115309263
all|0.9649390243902439|0.9610094943741828
...........
window|0.9975062344139651|0.9972258542086075
all|0.9710621879255561|0.9678330337676703
window|0.9875930521091811|0.9861981574711463
all|0.9717851329354313|0.9686363979216531
all|0.9727884117951371|0.9697518971205008
window|0.9933333333333333|0.9925873864771966
1 minutes  15 seconds  386.0 milliseconds.