Alink教程(Python版)

第13章 常用的多分类算法

本章包括下面各节:
13.1 数据准备
13.1.1 读取MNIST数据文件
13.1.2 稠密向量与稀疏向量
13.1.3 标签值的统计信息
13.2 Softmax算法
13.3 二分类器组合
13.4 多层感知器分类器(MLPC)
13.5 决策树与随机森林
13.6 K最近邻算法

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 5000)
pd.set_option('display.html.use_mathjax', False)

DATA_DIR = ROOT_DIR + "mnist" + os.sep

DENSE_TRAIN_FILE = "dense_train.ak";
DENSE_TEST_FILE = "dense_test.ak";
SPARSE_TRAIN_FILE = "sparse_train.ak";
SPARSE_TEST_FILE = "sparse_test.ak";
TABLE_TRAIN_FILE = "table_train.ak";
TABLE_TEST_FILE = "table_test.ak";

VECTOR_COL_NAME = "vec";
LABEL_COL_NAME = "label";
PREDICTION_COL_NAME = "id_cluster";

#c_1

import numpy as np
import gzip, struct

def get_df(image_path, label_path):
    with gzip.open(label_path) as flbl:
        magic, num = struct.unpack(">II", flbl.read(8))
        label = np.frombuffer(flbl.read(), dtype=np.int8)
        label = label.reshape(len(label), 1)
    with gzip.open(image_path, 'rb') as fimg:
        magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
        image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows * cols)
    return pd.DataFrame(np.hstack((label, image)))

schema_str = "label int"
for i in range(0, 784):
    schema_str = schema_str + ", c_" + str(i) + " double"

if not(os.path.exists(DATA_DIR + TABLE_TRAIN_FILE)) :
    BatchOperator\
        .fromDataframe(
            get_df(DATA_DIR + 'train-images-idx3-ubyte.gz', 
                   DATA_DIR + 'train-labels-idx1-ubyte.gz'),
            schema_str
        )\
        .link(
            AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)
        )
    BatchOperator.execute()

if not(os.path.exists(DATA_DIR + TABLE_TEST_FILE)) :
    BatchOperator\
        .fromDataframe(
            get_df(DATA_DIR + 't10k-images-idx3-ubyte.gz', 
                   DATA_DIR + 't10k-labels-idx1-ubyte.gz'),
            schema_str
        )\
        .link(
            AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)
        )
    BatchOperator.execute()


feature_cols = [] for i in range(0, 784) : feature_cols.append("c_" + str(i)) if not(os.path.exists(DATA_DIR + DENSE_TRAIN_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + DENSE_TEST_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TEST_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TRAIN_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE) ); BatchOperator.execute();
AkSourceBatchOp()\
    .setFilePath(DATA_DIR + DENSE_TRAIN_FILE)\
    .lazyPrint(1, "MNIST data")\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary()
    );

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\
    .lazyPrint(1, "MNIST data")\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary()
    );

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\
    .lazyPrintStatistics()\
    .groupBy(LABEL_COL_NAME, LABEL_COL_NAME + ", COUNT(*) AS cnt")\
    .orderBy("cnt", 100)\
    .lazyPrint(-1);

BatchOperator.execute()
#c_2
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);

Softmax()\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .enableLazyPrintTrainInfo()\
    .enableLazyPrintModelInfo()\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("Softmax")
    );

BatchOperator.execute()
#c_3
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);

OneVsRest()\
    .setClassifier(
        LogisticRegression()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)
    )\
    .setNumClass(10)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("OneVsRest - LogisticRegression")
    );

OneVsRest()\
    .setClassifier(
        LinearSvm()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)
    )\
    .setNumClass(10)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("OneVsRest - LinearSvm")
    );

BatchOperator.execute();
#c_4

useLocalEnv(4)

train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);

MultilayerPerceptronClassifier()\
    .setLayers([784, 10])\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("MultilayerPerceptronClassifier {784, 10}")
    );
BatchOperator.execute();

MultilayerPerceptronClassifier()\
    .setLayers([784, 256, 128, 10])\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("MultilayerPerceptronClassifier {784, 256, 128, 10}")
    );
BatchOperator.execute();
#c_5

useLocalEnv(4)

train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)

featureColNames = train_data.getColNames()
featureColNames.remove(LABEL_COL_NAME)

train_data.lazyPrint(5)

BatchOperator.execute()

sw = Stopwatch()

for treeType in ['GINI', 'INFOGAIN', 'INFOGAINRATIO'] : 
    sw.reset()
    sw.start()
    DecisionTreeClassifier()\
        .setTreeType(treeType)\
        .setFeatureCols(featureColNames)\
        .setLabelCol(LABEL_COL_NAME)\
        .setPredictionCol(PREDICTION_COL_NAME)\
        .enableLazyPrintModelInfo()\
        .fit(train_data)\
        .transform(test_data)\
        .link(
            EvalMultiClassBatchOp()\
                .setLabelCol(LABEL_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)\
                .lazyPrintMetrics("DecisionTreeClassifier " + treeType)
        );
    BatchOperator.execute()
    sw.stop()
    print(sw.getElapsedTimeSpan())


for numTrees in [2, 4, 8, 16, 32, 64, 128] :
    sw.reset();
    sw.start();
    RandomForestClassifier()\
        .setSubsamplingRatio(0.6)\
        .setNumTreesOfInfoGain(numTrees)\
        .setFeatureCols(featureColNames)\
        .setLabelCol(LABEL_COL_NAME)\
        .setPredictionCol(PREDICTION_COL_NAME)\
        .enableLazyPrintModelInfo()\
        .fit(train_data)\
        .transform(test_data)\
        .link(
            EvalMultiClassBatchOp()\
                .setLabelCol(LABEL_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)\
                .lazyPrintMetrics("RandomForestClassifier : " + str(numTrees))
        );
    BatchOperator.execute();
    sw.stop();
    print(sw.getElapsedTimeSpan());
#c_6

useLocalEnv(4)

train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);

KnnClassifier()\
    .setK(3)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("KnnClassifier - 3 - EUCLIDEAN")
    );

BatchOperator.execute();

KnnClassifier()\
    .setDistanceType('COSINE')\
    .setK(3)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("KnnClassifier - 3 - COSINE")
    );

BatchOperator.execute();

KnnClassifier()\
    .setK(7)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalMultiClassBatchOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("KnnClassifier - 7 - EUCLIDEAN")
    );

BatchOperator.execute();