Alink教程(Java版)

第25.5节 运行PyTorch模型


在本章的第2、3节介绍了使用Alink提供的深度学习组件KerasSequentialClassifier和KerasSequentialRegressor进行分类和回归模型的训练、预测。

实际应用中,经常需要使用TensorFlow或着PyTorch训练好的模型,对流式数据、批式数据进行预测。Alink提供了相应的流式、批式和Pipeline组件适配TensorFlow或着PyTorch模型。

本节重点介绍与PyTorch模型相关的操作。


25.5.1 生成PyTorch模型


本节所需的PyTorch模型文件mnist_model_pytorch.pt,已经被放到了OSS上,本节后面的实验会直接从网络读取该模型。https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt

如果读者有兴趣,可以在PyTorch环境,运行下面代码便可生成PyTorch模型,从而被Alink相关组件使用。注意:PyTorch模型执需要打包为".pt"文件,便于Alink相关组件导入模型。建议的打包示例代码在下面代码的最后部分。

import torch
from torchvision import datasets
from torchvision.transforms import ToTensor

train_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

train_loader = torch.utils.data.dataloader.DataLoader(dataset=train_data, batch_size=64, shuffle=True)

class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = torch.nn.Sequential(
            torch.nn.Conv2d(1, 32, 3, 1, 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2))
        self.conv2 = torch.nn.Sequential(
            torch.nn.Conv2d(32, 64, 3, 1, 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2)
        )
        self.conv3 = torch.nn.Sequential(
            torch.nn.Conv2d(64, 64, 3, 1, 1),
            torch.nn.ReLU(),
            torch.nn.MaxPool2d(2)
        )
        self.dense = torch.nn.Sequential(
            torch.nn.Linear(64 * 3 * 3, 128),
            torch.nn.ReLU(),
            torch.nn.Linear(128, 10)
        )

    def forward(self, x):
        conv1_out = self.conv1(x)
        conv2_out = self.conv2(conv1_out)
        conv3_out = self.conv3(conv2_out)
        res = conv3_out.view(conv3_out.size(0), -1)
        out = self.dense(res)
        return out

model = Net()
print(model)

optimizer = torch.optim.Adam(model.parameters())
loss_func = torch.nn.CrossEntropyLoss()

for epoch in range(5):
    print('epoch {}'.format(epoch + 1))
    train_loss = 0.
    train_acc = 0.
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = torch.autograd.Variable(batch_x), torch.autograd.Variable(batch_y)
        out = model(batch_x)
        loss = loss_func(out, batch_y)
        train_loss += loss.item()
        pred = torch.max(out, 1)[1]
        train_correct = (pred == batch_y).sum()
        train_acc += train_correct.item()
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print('Train Loss: {:.6f}, Acc: {:.6f}'.format(train_loss / (len(
        train_data)), train_acc / (len(train_data))))

    
traced = torch.jit.trace(model, (torch.rand(1, 1, 28, 28)))
torch.jit.save(traced, "mnist_model_pytorch.pt")

输出模型及训练信息如下:

Net(
  (conv1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (dense): Sequential(
    (0): Linear(in_features=576, out_features=128, bias=True)
    (1): ReLU()
    (2): Linear(in_features=128, out_features=10, bias=True)
  )
)
epoch 1
Train Loss: 0.003327, Acc: 0.933000
epoch 2
Train Loss: 0.000876, Acc: 0.982667
epoch 3
Train Loss: 0.000592, Acc: 0.987783
epoch 4
Train Loss: 0.000466, Acc: 0.990467
epoch 5
Train Loss: 0.000403, Acc: 0.991733


25.5.2 批式任务中使用PyTorch模型


使用TorchModelPredictBatchOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictbatchop .

使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorBatchOp组件。具体代码如下所示:

new AkSourceBatchOp()
	.setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
	.link(
		new VectorToTensorBatchOp()
			.setTensorDataType("float")
			.setTensorShape(1, 1, 28, 28)
			.setSelectedCol("vec")
			.setOutputCol("tensor")
			.setReservedCols("label")
	)
	.link(
		new TorchModelPredictBatchOp()
			.setModelPath(
				"https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
			.setSelectedCols("tensor")
			.setOutputSchemaStr("output_1 FLOAT_TENSOR")
	)
	.lazyPrint(3)
	.link(
		new UDFBatchOp()
			.setFunc(new GetMaxIndex())
			.setSelectedCols("output_1")
			.setOutputCol("pred")
	)
	.lazyPrint(3)
	.link(
		new EvalMultiClassBatchOp()
			.setLabelCol("label")
			.setPredictionCol("pred")
			.lazyPrintMetrics()
	);

BatchOperator.execute();

这里用到了一个自定义函数,具体定义如下:

public static class GetMaxIndex extends ScalarFunction {

	public int eval(FloatTensor tensor) {
		int k = 0;
		float max = tensor.getFloat(0, 0);
		for (int i = 1; i < 10; i++) {
			if (tensor.getFloat(0, i) > max) {
				k = i;
				max = tensor.getFloat(0, i);
			}
		}
		return k;
	}
}


批式任务的运行结果为:

label|tensor|output_1
-----|------|--------
0|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                       
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]]
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
9|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                  
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]]
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
6|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                   
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]]
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
label|tensor|output_1|pred
-----|------|--------|----
0|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                       |0
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
9|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                  |9
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
6|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                   |6
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
-------------------------------- Metrics: --------------------------------
Accuracy:0.9903	Macro F1:0.9902	Micro F1:0.9903	Kappa:0.9892	
|Pred\Real|  9|  8|   7|...|   2|   1|  0|
|---------|---|---|----|---|----|----|---|
|        9|992|  1|   4|...|   0|   0|  0|
|        8|  2|965|   1|...|   0|   1|  1|
|        7|  5|  2|1012|...|   2|   0|  1|
|      ...|...|...| ...|...| ...| ...|...|
|        2|  2|  4|   9|...|1030|   3|  2|
|        1|  0|  0|   2|...|   0|1128|  0|
|        0|  0|  2|   0|...|   0|   0|973|

25.5.3 流式任务中使用PyTorch模型


使用TorchModelPredictStreamOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictstreamop .

使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorStreamOp组件。具体代码如下所示:

new AkSourceStreamOp()
	.setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
	.link(
		new VectorToTensorStreamOp()
			.setTensorDataType("float")
			.setTensorShape(1, 1, 28, 28)
			.setSelectedCol("vec")
			.setOutputCol("tensor")
			.setReservedCols("label")
	)
	.link(
		new TorchModelPredictStreamOp()
			.setModelPath(
				"https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
			.setSelectedCols("tensor")
			.setOutputSchemaStr("output_1 FLOAT_TENSOR")
	)
	.link(
		new UDFStreamOp()
			.setFunc(new GetMaxIndex())
			.setSelectedCols("output_1")
			.setOutputCol("pred")
	)
	.sample(0.001)
	.print();

StreamOperator.execute();

运行结果为:

label|tensor|output_1|pred
-----|------|--------|----
3|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                   |3
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1590.9832 -1487.7247 591.7295 ... -468.07892 671.602 -1350.5359]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
8|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                     |8
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-503.25494 -1483.4431 684.2502 ... -2311.5735 2921.0408 118.283745]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
6|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                      |6
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-375.05377 -1370.912 -769.6774 ... -3442.0344 -582.13983 -2177.5767]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |

......

25.5.4 Pipeline中使用PyTorch模型

学习了如何在批式任务和流式任务中使用PyTorch模型,我们很容易在Pipeline中使用PyTorch模型进行预测,只要将其中的批式/流式组件对应到Pipeline组件即可。具体代码如下:

new PipelineModel(
	new VectorToTensor()
		.setTensorDataType("float")
		.setTensorShape(1, 1, 28, 28)
		.setSelectedCol("vec")
		.setOutputCol("tensor")
		.setReservedCols("label"),
	new TorchModelPredictor()
		.setModelPath(
			"https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt")
		.setSelectedCols("tensor")
		.setOutputSchemaStr("output_1 FLOAT_TENSOR")
).save(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, true);
BatchOperator.execute();

PipelineModel
	.load(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL)
	.transform(
		new AkSourceStreamOp()
			.setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE)
	)
	.link(
		new UDFStreamOp()
			.setFunc(new GetMaxIndex())
			.setSelectedCols("output_1")
			.setOutputCol("pred")
	)
	.sample(0.001)
	.print();
StreamOperator.execute();

运行结果为:

label|tensor|output_1|pred
-----|------|--------|----
4|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                    |4
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-993.6407 -458.57587 1282.9576 ... -2273.686 -2319.7793 -2746.323]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
1|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                     |1
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1004.3513 2189.4736 -777.72845 ... -1088.758 355.25262 -921.88556]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |
2|FloatTensor(1,1,28,28)          |FloatTensor(1,10)                                                     |2
 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1155.4056 -317.33963 4966.4814 ... -910.1352 -2354.1606 -3428.952]]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 |   ...                          |
 |   [0.0 0.0 0.0 ... 0.0 0.0 0.0]|
 | ... ...                        |

......

25.5.5 LocalPredictor中使用PyTorch模型


除了通过Alink任务使用PyTorch模型,也可以使用LocalPredictor进行嵌入式预测。示例代码如下,首先从数据集中抽取一行数据,输入数据的SchemaStr为“vec string, label int”;然后通过导入上一节保存的Pipeline模型,并设置输入数据的SchemaStr,得到LocalPredictor类型的实例localPredictor;如果不确定预测结果各列的含义,可以打印输出localPredictor的OutputSchema;使用localPredictor的map方法获得预测结果。

AkSourceBatchOp source = new AkSourceBatchOp()
	.setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE);

System.out.println(source.getSchema());

Row row = source.firstN(1).collect().get(0);

LocalPredictor localPredictor
	= new LocalPredictor(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, "vec string, label int");

System.out.println(localPredictor.getOutputSchema());

Row r = localPredictor.map(row);
System.out.println(r.getField(0).toString() + " | " + r.getField(2).toString());

运行结果为:

root
 |-- vec: STRING
 |-- label: INT

root
 |-- label: INT
 |-- tensor: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>)
 |-- output_1: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>)

1 | [[-794.75903 2662.567 -658.8216 ... -173.1484 -263.41855 -712.4674]]