本章包括下面各节:
13.1 数据准备
13.1.1 读取MNIST数据文件
13.1.2 稠密向量与稀疏向量
13.1.3 标签值的统计信息
13.2 Softmax算法
13.3 二分类器组合
13.4 多层感知器分类器(MLPC)
13.5 决策树与随机森林
13.6 K最近邻算法
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 5000) pd.set_option('display.html.use_mathjax', False) DATA_DIR = ROOT_DIR + "mnist" + os.sep DENSE_TRAIN_FILE = "dense_train.ak"; DENSE_TEST_FILE = "dense_test.ak"; SPARSE_TRAIN_FILE = "sparse_train.ak"; SPARSE_TEST_FILE = "sparse_test.ak"; TABLE_TRAIN_FILE = "table_train.ak"; TABLE_TEST_FILE = "table_test.ak"; VECTOR_COL_NAME = "vec"; LABEL_COL_NAME = "label"; PREDICTION_COL_NAME = "id_cluster";
#c_1 import numpy as np import gzip, struct def get_df(image_path, label_path): with gzip.open(label_path) as flbl: magic, num = struct.unpack(">II", flbl.read(8)) label = np.frombuffer(flbl.read(), dtype=np.int8) label = label.reshape(len(label), 1) with gzip.open(image_path, 'rb') as fimg: magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16)) image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows * cols) return pd.DataFrame(np.hstack((label, image))) schema_str = "label int" for i in range(0, 784): schema_str = schema_str + ", c_" + str(i) + " double" if not(os.path.exists(DATA_DIR + TABLE_TRAIN_FILE)) : BatchOperator\ .fromDataframe( get_df(DATA_DIR + 'train-images-idx3-ubyte.gz', DATA_DIR + 'train-labels-idx1-ubyte.gz'), schema_str )\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE) ) BatchOperator.execute() if not(os.path.exists(DATA_DIR + TABLE_TEST_FILE)) : BatchOperator\ .fromDataframe( get_df(DATA_DIR + 't10k-images-idx3-ubyte.gz', DATA_DIR + 't10k-labels-idx1-ubyte.gz'), schema_str )\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE) ) BatchOperator.execute()
feature_cols = [] for i in range(0, 784) : feature_cols.append("c_" + str(i)) if not(os.path.exists(DATA_DIR + DENSE_TRAIN_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + DENSE_TEST_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TEST_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TRAIN_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE) ); BatchOperator.execute();
AkSourceBatchOp()\ .setFilePath(DATA_DIR + DENSE_TRAIN_FILE)\ .lazyPrint(1, "MNIST data")\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary() ); AkSourceBatchOp()\ .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\ .lazyPrint(1, "MNIST data")\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary() ); AkSourceBatchOp()\ .setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\ .lazyPrintStatistics()\ .groupBy(LABEL_COL_NAME, LABEL_COL_NAME + ", COUNT(*) AS cnt")\ .orderBy("cnt", 100)\ .lazyPrint(-1); BatchOperator.execute()
#c_2 train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE); Softmax()\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .enableLazyPrintTrainInfo()\ .enableLazyPrintModelInfo()\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("Softmax") ); BatchOperator.execute()
#c_3 train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE); OneVsRest()\ .setClassifier( LogisticRegression()\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .setNumClass(10)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("OneVsRest - LogisticRegression") ); OneVsRest()\ .setClassifier( LinearSvm()\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .setNumClass(10)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("OneVsRest - LinearSvm") ); BatchOperator.execute();
#c_4 useLocalEnv(4) train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE); MultilayerPerceptronClassifier()\ .setLayers([784, 10])\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("MultilayerPerceptronClassifier {784, 10}") ); BatchOperator.execute(); MultilayerPerceptronClassifier()\ .setLayers([784, 256, 128, 10])\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("MultilayerPerceptronClassifier {784, 256, 128, 10}") ); BatchOperator.execute();
#c_5 useLocalEnv(4) train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE) test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE) featureColNames = train_data.getColNames() featureColNames.remove(LABEL_COL_NAME) train_data.lazyPrint(5) BatchOperator.execute() sw = Stopwatch() for treeType in ['GINI', 'INFOGAIN', 'INFOGAINRATIO'] : sw.reset() sw.start() DecisionTreeClassifier()\ .setTreeType(treeType)\ .setFeatureCols(featureColNames)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .enableLazyPrintModelInfo()\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("DecisionTreeClassifier " + treeType) ); BatchOperator.execute() sw.stop() print(sw.getElapsedTimeSpan()) for numTrees in [2, 4, 8, 16, 32, 64, 128] : sw.reset(); sw.start(); RandomForestClassifier()\ .setSubsamplingRatio(0.6)\ .setNumTreesOfInfoGain(numTrees)\ .setFeatureCols(featureColNames)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .enableLazyPrintModelInfo()\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("RandomForestClassifier : " + str(numTrees)) ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan());
#c_6 useLocalEnv(4) train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE); KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("KnnClassifier - 3 - EUCLIDEAN") ); BatchOperator.execute(); KnnClassifier()\ .setDistanceType('COSINE')\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("KnnClassifier - 3 - COSINE") ); BatchOperator.execute(); KnnClassifier()\ .setK(7)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(train_data)\ .transform(test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("KnnClassifier - 7 - EUCLIDEAN") ); BatchOperator.execute();