Alink教程(Python版)

第25.4节 运行TensorFlow模型

在本章的第2、3节介绍了使用Alink提供的深度学习组件KerasSequentialClassifier和KerasSequentialRegressor进行分类和回归模型的训练、预测。

实际应用中,经常需要使用TensorFlow或着PyTorch训练好的模型,对流式数据、批式数据进行预测。Alink提供了相应的流式、批式和Pipeline组件适配TensorFlow或着PyTorch模型。

本节重点介绍与TensorFlow模型相关的操作。


25.4.1 生成TensorFlow模型

本节所需的TensorFlow模型压缩文件mnist_model_tf.zip,已经被放到了OSS上,本节后面的实验会直接从网络读取该模型。https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_tf.zip

如果读者有兴趣,可以在TensorFlow环境,运行下面代码便可生成TensorFlow模型,从而被Alink相关组件使用。注意:TensorFlow模型执行完save操作会被保存到一个文件夹,需要将其压缩为zip文件,便于Alink相关组件导入模型。建议的压缩示例代码在下面代码的最后部分。

import tensorflow as tf
from tensorflow import keras

mnist = keras.datasets.mnist

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

test_images,train_images = test_images.reshape((10000,28,28,1)),train_images.reshape(60000,28,28,1)
test_images,train_images = test_images/255.0,train_images/255.0
model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(20,(5,5),padding="SAME",activation="relu"),
    tf.keras.layers.MaxPool2D(2,2,padding="SAME"),
    tf.keras.layers.Conv2D(40,(5,5),padding="SAME",activation="relu"),
    tf.keras.layers.MaxPool2D(2, 2,padding="SAME"),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(512,activation="relu"),
    tf.keras.layers.Dense(10,activation="softmax")
])
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.fit(train_images,train_labels,epochs=5)
test_loss, test_acc = model.evaluate(test_images, test_labels)
print(test_loss)
print(test_acc)

dir_name = "mnist_model_tf"
model.save(dir_name)

import shutil
shutil.make_archive(base_name=dir_name, format='zip', root_dir=dir_name)


该段脚本的执行输出如下,测试集上预测精确率为98.75%.

Train on 60000 samples
Epoch 1/5
60000/60000 [==============================] - 5s 75us/sample - loss: 0.1095 - accuracy: 0.9660
Epoch 2/5
60000/60000 [==============================] - 4s 70us/sample - loss: 0.0376 - accuracy: 0.9883
Epoch 3/5
60000/60000 [==============================] - 4s 70us/sample - loss: 0.0255 - accuracy: 0.9917
Epoch 4/5
60000/60000 [==============================] - 4s 70us/sample - loss: 0.0176 - accuracy: 0.9942
Epoch 5/5
60000/60000 [==============================] - 4s 70us/sample - loss: 0.0140 - accuracy: 0.9951
10000/10000 [==============================] - 1s 59us/sample - loss: 0.0473 - accuracy: 0.9875
0.0473310407480522
0.9875


25.4.2 批式任务中使用TensorFlow模型


使用TFSavedModelPredictBatchOp组件,可以加载TF模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/tfsavedmodelpredictbatchop .

由于TensorFlow模型训练前对每个数据都除以255,所以批式任务也要执行此操作,可以使用VectorFunctionBatchOp组件,设置函数名称(FuncName)为"Scale",系数为1.0 / 255.0。另外,使用TensorFlow模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorBatchOp组件。具体代码如下所示:

AkSourceBatchOp()\
    .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)\
    .link(\
        VectorFunctionBatchOp()\
            .setSelectedCol("vec")\
            .setFuncName("Scale")\
            .setWithVariable(1.0 / 255.0)
    )\
    .link(\
        VectorToTensorBatchOp()\
            .setTensorDataType("float")\
            .setTensorShape([1, 28, 28, 1])\
            .setSelectedCol("vec")\
            .setOutputCol("input_1")\
            .setReservedCols(["label"])
    )\
    .link(\
        TFSavedModelPredictBatchOp()\
            .setModelPath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_tf.zip")\
            .setSelectedCols(["input_1"])\
            .setOutputSchemaStr("output_1 FLOAT_TENSOR")\
    )\
    .lazyPrint(3)\
    .link(\
        UDFBatchOp()\
            .setFunc(get_max_index)
            .setSelectedCols(["output_1"])
            .setOutputCol("pred")
    )\
    .lazyPrint(3)\
    .link(\
        EvalMultiClassBatchOp()\
            .setLabelCol("label")\
            .setPredictionCol("pred")\
            .lazyPrintMetrics()
    )

BatchOperator.execute()

这里用到了一个自定义函数,具体定义如下:

import numpy as np

@udf(input_types=[AlinkDataTypes.TENSOR()], result_type=AlinkDataTypes.INT()) 
def get_max_index(tensor: np.ndarray):
    return tensor.argmax().item()


批式任务的运行结果为:

-------------------------------- Metrics: --------------------------------
Accuracy:0.9917	Macro F1:0.9917	Micro F1:0.9917	Kappa:0.9908	
|Pred\Real|  9|  8|   7|...|   2|   1|  0|
|---------|---|---|----|---|----|----|---|
|        9|995|  2|   1|...|   0|   0|  0|
|        8|  4|965|   1|...|   1|   2|  0|
|        7|  2|  0|1019|...|   8|   1|  1|
|      ...|...|...| ...|...| ...| ...|...|
|        2|  0|  2|   3|...|1022|   1|  0|
|        1|  0|  0|   2|...|   1|1127|  0|
|        0|  0|  1|   1|...|   0|   0|976|

25.4.3 流式任务中使用TensorFlow模型


使用TFSavedModelPredictStreamOp组件,可以加载TF模型进行批式预测。关于该组件的详细说明参见Alink文档 https://www.yuque.com/pinshu/alink_doc/tfsavedmodelpredictstreamop .

由于TensorFlow模型训练前对每个数据都除以255,所以流式任务也要执行此操作,可以使用VectorFunctionStreamOp组件,设置函数名称(FuncName)为"Scale",系数为1.0 / 255.0。另外,使用TensorFlow模型前,还需要将输入数据列的类型转换为Tensor格式,可以使用VectorToTensorStreamOp组件。具体代码如下所示:

AkSourceStreamOp()\
    .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)\
    .link(\
        VectorFunctionStreamOp()\
            .setSelectedCol("vec")\
            .setFuncName("Scale")\
            .setWithVariable(1.0 / 255.0)
    )\
    .link(\
        VectorToTensorStreamOp()\
            .setTensorDataType("float")\
            .setTensorShape([1, 28, 28, 1])\
            .setSelectedCol("vec")\
            .setOutputCol("input_1")\
            .setReservedCols(["label"])
    )\
    .link(\
        TFSavedModelPredictStreamOp()\
            .setModelPath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_tf.zip")\
            .setSelectedCols(["input_1"])\
            .setOutputSchemaStr("output_1 FLOAT_TENSOR")
    )\
    .link(\
        UDFStreamOp()\
            .setFunc(get_max_index)\
            .setSelectedCols(["output_1"])\
            .setOutputCol("pred")
    )\
    .sample(0.001)\
    .print()

StreamOperator.execute()

运行结果为:


25.4.4 Pipeline中使用TensorFlow模型


学习了如何在批式任务和流式任务中使用TensorFlow模型,我们很容易在Pipeline中使用TensorFlow模型进行预测,只要将其中的批式/流式组件对应到Pipeline组件即可。具体代码如下:

PipelineModel(\
    VectorFunction()\
        .setSelectedCol("vec")\
        .setFuncName("Scale")\
        .setWithVariable(1.0 / 255.0),\
    VectorToTensor()\
        .setTensorDataType("float")\
        .setTensorShape([1, 28, 28, 1])\
        .setSelectedCol("vec")\
        .setOutputCol("input_1")\
        .setReservedCols(["label"]),\
    TFSavedModelPredictor()\
        .setModelPath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/mnist_model_tf.zip")\
        .setSelectedCols(["input_1"])\
        .setOutputSchemaStr("output_1 FLOAT_TENSOR")\
).save(Chap13_DATA_DIR + PIPELINE_TF_MODEL, True)
BatchOperator.execute()

PipelineModel\
    .load(Chap13_DATA_DIR + PIPELINE_TF_MODEL)\
    .transform(\
        AkSourceStreamOp()\
            .setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)
    )\
    .link(\
        UDFStreamOp()\
            .setFunc(get_max_index)\
            .setSelectedCols(["output_1"])\
            .setOutputCol("pred")
    )\
    .sample(0.001)\
    .print()
StreamOperator.execute()

运行结果为:


25.4.5 LocalPredictor中使用TensorFlow模型


除了通过Alink任务使用TensorFlow模型,也可以使用LocalPredictor进行嵌入式预测。示例代码如下,首先从数据集中抽取一行数据,输入数据的SchemaStr为“vec string, label int”;然后通过导入上一节保存的Pipeline模型,并设置输入数据的SchemaStr,得到LocalPredictor类型的实例localPredictor;如果不确定预测结果各列的含义,可以打印输出localPredictor的OutputSchema;使用localPredictor的map方法获得预测结果。

source = AkSourceBatchOp().setFilePath(Chap13_DATA_DIR + Chap13_DENSE_TEST_FILE)

print(source.getSchemaStr())

df = source.firstN(1).collectToDataframe()

row = [df.iat[0,0], df.iat[0,1].item()]

localPredictor = LocalPredictor(Chap13_DATA_DIR + PIPELINE_TF_MODEL, "vec string, label int")

print(localPredictor.getOutputSchemaStr())

r = localPredictor.map(row)
print(str(r[0]) + " | " + str(r[2]))

运行结果为:

vec VARCHAR, label INT
label INT, input_1 ANY<com.alibaba.alink.common.linalg.tensor.FloatTensor>, output_1 ANY<com.alibaba.alink.common.linalg.tensor.FloatTensor>
2 | FloatTensor(1,10)
[[1.9372295E-11 4.4042978E-10 1.0 ... 9.506602E-13 3.3328297E-14 3.0861563E-15]]