在本章的第2、3节介绍了使用Alink提供的深度学习组件KerasSequentialClassifier和KerasSequentialRegressor进行分类和回归模型的训练、预测。
实际应用中,经常需要使用TensorFlow或着PyTorch训练好的模型,对流式数据、批式数据进行预测。Alink提供了相应的流式、批式和Pipeline组件适配TensorFlow或着PyTorch模型。
本节重点介绍与PyTorch模型相关的操作。
本节所需的PyTorch模型文件mnist_model_pytorch.pt,已经被放到了OSS上,本节后面的实验会直接从网络读取该模型。https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt
如果读者有兴趣,可以在PyTorch环境,运行下面代码便可生成PyTorch模型,从而被Alink相关组件使用。注意:PyTorch模型执需要打包为".pt"文件,便于Alink相关组件导入模型。建议的打包示例代码在下面代码的最后部分。
import torch from torchvision import datasets from torchvision.transforms import ToTensor train_data = datasets.MNIST( root="data", train=True, download=True, transform=ToTensor() ) train_loader = torch.utils.data.dataloader.DataLoader(dataset=train_data, batch_size=64, shuffle=True) class Net(torch.nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = torch.nn.Sequential( torch.nn.Conv2d(1, 32, 3, 1, 1), torch.nn.ReLU(), torch.nn.MaxPool2d(2)) self.conv2 = torch.nn.Sequential( torch.nn.Conv2d(32, 64, 3, 1, 1), torch.nn.ReLU(), torch.nn.MaxPool2d(2) ) self.conv3 = torch.nn.Sequential( torch.nn.Conv2d(64, 64, 3, 1, 1), torch.nn.ReLU(), torch.nn.MaxPool2d(2) ) self.dense = torch.nn.Sequential( torch.nn.Linear(64 * 3 * 3, 128), torch.nn.ReLU(), torch.nn.Linear(128, 10) ) def forward(self, x): conv1_out = self.conv1(x) conv2_out = self.conv2(conv1_out) conv3_out = self.conv3(conv2_out) res = conv3_out.view(conv3_out.size(0), -1) out = self.dense(res) return out model = Net() print(model) optimizer = torch.optim.Adam(model.parameters()) loss_func = torch.nn.CrossEntropyLoss() for epoch in range(5): print('epoch {}'.format(epoch + 1)) train_loss = 0. train_acc = 0. for batch_x, batch_y in train_loader: batch_x, batch_y = torch.autograd.Variable(batch_x), torch.autograd.Variable(batch_y) out = model(batch_x) loss = loss_func(out, batch_y) train_loss += loss.item() pred = torch.max(out, 1)[1] train_correct = (pred == batch_y).sum() train_acc += train_correct.item() optimizer.zero_grad() loss.backward() optimizer.step() print('Train Loss: {:.6f}, Acc: {:.6f}'.format(train_loss / (len( train_data)), train_acc / (len(train_data)))) traced = torch.jit.trace(model, (torch.rand(1, 1, 28, 28))) torch.jit.save(traced, "mnist_model_pytorch.pt")
输出模型及训练信息如下:
Net( (conv1): Sequential( (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (conv2): Sequential( (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (conv3): Sequential( (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)) (1): ReLU() (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False) ) (dense): Sequential( (0): Linear(in_features=576, out_features=128, bias=True) (1): ReLU() (2): Linear(in_features=128, out_features=10, bias=True) ) ) epoch 1 Train Loss: 0.003327, Acc: 0.933000 epoch 2 Train Loss: 0.000876, Acc: 0.982667 epoch 3 Train Loss: 0.000592, Acc: 0.987783 epoch 4 Train Loss: 0.000466, Acc: 0.990467 epoch 5 Train Loss: 0.000403, Acc: 0.991733
使用TorchModelPredictBatchOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictbatchop .
使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorBatchOp组件。具体代码如下所示:
new AkSourceBatchOp() .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE) .link( new VectorToTensorBatchOp() .setTensorDataType("float") .setTensorShape(1, 1, 28, 28) .setSelectedCol("vec") .setOutputCol("tensor") .setReservedCols("label") ) .link( new TorchModelPredictBatchOp() .setModelPath( "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt") .setSelectedCols("tensor") .setOutputSchemaStr("output_1 FLOAT_TENSOR") ) .lazyPrint(3) .link( new UDFBatchOp() .setFunc(new GetMaxIndex()) .setSelectedCols("output_1") .setOutputCol("pred") ) .lazyPrint(3) .link( new EvalMultiClassBatchOp() .setLabelCol("label") .setPredictionCol("pred") .lazyPrintMetrics() ); BatchOperator.execute();
这里用到了一个自定义函数,具体定义如下:
public static class GetMaxIndex extends ScalarFunction { public int eval(FloatTensor tensor) { int k = 0; float max = tensor.getFloat(0, 0); for (int i = 1; i < 10; i++) { if (tensor.getFloat(0, i) > max) { k = i; max = tensor.getFloat(0, i); } } return k; } }
批式任务的运行结果为:
label|tensor|output_1 -----|------|-------- 0|FloatTensor(1,1,28,28) |FloatTensor(1,10) |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]] | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 9|FloatTensor(1,1,28,28) |FloatTensor(1,10) |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]] | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 6|FloatTensor(1,1,28,28) |FloatTensor(1,10) |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]] | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | label|tensor|output_1|pred -----|------|--------|---- 0|FloatTensor(1,1,28,28) |FloatTensor(1,10) |0 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[4044.1326 -3060.9893 -34.796455 ... -1278.3772 -2814.0508 -1284.4863]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 9|FloatTensor(1,1,28,28) |FloatTensor(1,10) |9 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-2608.7097 -439.29208 414.57578 ... -343.33176 209.20328 2627.7]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 6|FloatTensor(1,1,28,28) |FloatTensor(1,10) |6 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[700.5958 -2671.2515 -10.615548 ... -3413.485 915.7342 -1291.0757]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | -------------------------------- Metrics: -------------------------------- Accuracy:0.9903 Macro F1:0.9902 Micro F1:0.9903 Kappa:0.9892 |Pred\Real| 9| 8| 7|...| 2| 1| 0| |---------|---|---|----|---|----|----|---| | 9|992| 1| 4|...| 0| 0| 0| | 8| 2|965| 1|...| 0| 1| 1| | 7| 5| 2|1012|...| 2| 0| 1| | ...|...|...| ...|...| ...| ...|...| | 2| 2| 4| 9|...|1030| 3| 2| | 1| 0| 0| 2|...| 0|1128| 0| | 0| 0| 2| 0|...| 0| 0|973|
使用TorchModelPredictStreamOp组件,可以加载PyTorch模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/torchmodelpredictstreamop .
使用PyTorch模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorStreamOp组件。具体代码如下所示:
new AkSourceStreamOp() .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE) .link( new VectorToTensorStreamOp() .setTensorDataType("float") .setTensorShape(1, 1, 28, 28) .setSelectedCol("vec") .setOutputCol("tensor") .setReservedCols("label") ) .link( new TorchModelPredictStreamOp() .setModelPath( "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt") .setSelectedCols("tensor") .setOutputSchemaStr("output_1 FLOAT_TENSOR") ) .link( new UDFStreamOp() .setFunc(new GetMaxIndex()) .setSelectedCols("output_1") .setOutputCol("pred") ) .sample(0.001) .print(); StreamOperator.execute();
运行结果为:
label|tensor|output_1|pred -----|------|--------|---- 3|FloatTensor(1,1,28,28) |FloatTensor(1,10) |3 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1590.9832 -1487.7247 591.7295 ... -468.07892 671.602 -1350.5359]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 8|FloatTensor(1,1,28,28) |FloatTensor(1,10) |8 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-503.25494 -1483.4431 684.2502 ... -2311.5735 2921.0408 118.283745]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 6|FloatTensor(1,1,28,28) |FloatTensor(1,10) |6 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-375.05377 -1370.912 -769.6774 ... -3442.0344 -582.13983 -2177.5767]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | ......
学习了如何在批式任务和流式任务中使用PyTorch模型,我们很容易在Pipeline中使用PyTorch模型进行预测,只要将其中的批式/流式组件对应到Pipeline组件即可。具体代码如下:
new PipelineModel( new VectorToTensor() .setTensorDataType("float") .setTensorShape(1, 1, 28, 28) .setSelectedCol("vec") .setOutputCol("tensor") .setReservedCols("label"), new TorchModelPredictor() .setModelPath( "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_pytorch.pt") .setSelectedCols("tensor") .setOutputSchemaStr("output_1 FLOAT_TENSOR") ).save(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, true); BatchOperator.execute(); PipelineModel .load(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL) .transform( new AkSourceStreamOp() .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE) ) .link( new UDFStreamOp() .setFunc(new GetMaxIndex()) .setSelectedCols("output_1") .setOutputCol("pred") ) .sample(0.001) .print(); StreamOperator.execute();
运行结果为:
label|tensor|output_1|pred -----|------|--------|---- 4|FloatTensor(1,1,28,28) |FloatTensor(1,10) |4 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-993.6407 -458.57587 1282.9576 ... -2273.686 -2319.7793 -2746.323]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 1|FloatTensor(1,1,28,28) |FloatTensor(1,10) |1 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1004.3513 2189.4736 -777.72845 ... -1088.758 355.25262 -921.88556]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | 2|FloatTensor(1,1,28,28) |FloatTensor(1,10) |2 |[[[[0.0 0.0 0.0 ... 0.0 0.0 0.0]|[[-1155.4056 -317.33963 4966.4814 ... -910.1352 -2354.1606 -3428.952]]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... | | [0.0 0.0 0.0 ... 0.0 0.0 0.0]| | ... ... | ......
除了通过Alink任务使用PyTorch模型,也可以使用LocalPredictor进行嵌入式预测。示例代码如下,首先从数据集中抽取一行数据,输入数据的SchemaStr为“vec string, label int”;然后通过导入上一节保存的Pipeline模型,并设置输入数据的SchemaStr,得到LocalPredictor类型的实例localPredictor;如果不确定预测结果各列的含义,可以打印输出localPredictor的OutputSchema;使用localPredictor的map方法获得预测结果。
AkSourceBatchOp source = new AkSourceBatchOp() .setFilePath(Chap13.DATA_DIR + Chap13.DENSE_TEST_FILE); System.out.println(source.getSchema()); Row row = source.firstN(1).collect().get(0); LocalPredictor localPredictor = new LocalPredictor(Chap13.DATA_DIR + PIPELINE_PYTORCH_MODEL, "vec string, label int"); System.out.println(localPredictor.getOutputSchema()); Row r = localPredictor.map(row); System.out.println(r.getField(0).toString() + " | " + r.getField(2).toString());
运行结果为:
root |-- vec: STRING |-- label: INT root |-- label: INT |-- tensor: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>) |-- output_1: LEGACY(GenericType<com.alibaba.alink.common.linalg.tensor.FloatTensor>) 1 | [[-794.75903 2662.567 -658.8216 ... -173.1484 -263.41855 -712.4674]]