本章包括下面各节:
13.1 数据准备
13.1.1 读取MNIST数据文件
13.1.2 稠密向量与稀疏向量
13.1.3 标签值的统计信息
13.2 Softmax算法
13.3 二分类器组合
13.4 多层感知器分类器(MLPC)
13.5 决策树与随机森林
13.6 K最近邻算法
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
pd.set_option('display.max_colwidth', 5000)
pd.set_option('display.html.use_mathjax', False)
DATA_DIR = ROOT_DIR + "mnist" + os.sep
DENSE_TRAIN_FILE = "dense_train.ak";
DENSE_TEST_FILE = "dense_test.ak";
SPARSE_TRAIN_FILE = "sparse_train.ak";
SPARSE_TEST_FILE = "sparse_test.ak";
TABLE_TRAIN_FILE = "table_train.ak";
TABLE_TEST_FILE = "table_test.ak";
VECTOR_COL_NAME = "vec";
LABEL_COL_NAME = "label";
PREDICTION_COL_NAME = "id_cluster";
#c_1
import numpy as np
import gzip, struct
def get_df(image_path, label_path):
with gzip.open(label_path) as flbl:
magic, num = struct.unpack(">II", flbl.read(8))
label = np.frombuffer(flbl.read(), dtype=np.int8)
label = label.reshape(len(label), 1)
with gzip.open(image_path, 'rb') as fimg:
magic, num, rows, cols = struct.unpack(">IIII", fimg.read(16))
image = np.frombuffer(fimg.read(), dtype=np.uint8).reshape(len(label), rows * cols)
return pd.DataFrame(np.hstack((label, image)))
schema_str = "label int"
for i in range(0, 784):
schema_str = schema_str + ", c_" + str(i) + " double"
if not(os.path.exists(DATA_DIR + TABLE_TRAIN_FILE)) :
BatchOperator\
.fromDataframe(
get_df(DATA_DIR + 'train-images-idx3-ubyte.gz',
DATA_DIR + 'train-labels-idx1-ubyte.gz'),
schema_str
)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)
)
BatchOperator.execute()
if not(os.path.exists(DATA_DIR + TABLE_TEST_FILE)) :
BatchOperator\
.fromDataframe(
get_df(DATA_DIR + 't10k-images-idx3-ubyte.gz',
DATA_DIR + 't10k-labels-idx1-ubyte.gz'),
schema_str
)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)
)
BatchOperator.execute()
feature_cols = [] for i in range(0, 784) : feature_cols.append("c_" + str(i)) if not(os.path.exists(DATA_DIR + DENSE_TRAIN_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + DENSE_TEST_FILE)) : AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .lazyPrint(3)\ .link( ColumnsToVectorBatchOp()\ .setSelectedCols(feature_cols)\ .setVectorCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) )\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TEST_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TEST_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + SPARSE_TRAIN_FILE)) : source = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TABLE_TRAIN_FILE)\ .link( AppendIdBatchOp().setIdCol("row_id") ); row_id_label = source\ .select("row_id AS id, " + LABEL_COL_NAME)\ .lazyPrint(3, "row_id_label"); row_id_vec = source\ .lazyPrint(3)\ .link( ColumnsToTripleBatchOp()\ .setSelectedCols(feature_cols)\ .setTripleColumnValueSchemaStr("col string, val double")\ .setReservedCols(["row_id"]) )\ .filter("val<>0")\ .lazyPrint(3)\ .select("row_id, val, CAST(SUBSTRING(col FROM 3) AS INT) AS col")\ .lazyPrint(3)\ .link( TripleToVectorBatchOp()\ .setTripleRowCol("row_id")\ .setTripleColumnCol("col")\ .setTripleValueCol("val")\ .setVectorCol(VECTOR_COL_NAME)\ .setVectorSize(784) )\ .lazyPrint(3); JoinBatchOp()\ .setJoinPredicate("row_id = id")\ .setSelectClause(LABEL_COL_NAME + ", " + VECTOR_COL_NAME)\ .linkFrom(row_id_vec, row_id_label)\ .lazyPrint(3)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE) ); BatchOperator.execute();
AkSourceBatchOp()\
.setFilePath(DATA_DIR + DENSE_TRAIN_FILE)\
.lazyPrint(1, "MNIST data")\
.link(
VectorSummarizerBatchOp()\
.setSelectedCol(VECTOR_COL_NAME)\
.lazyPrintVectorSummary()
);
AkSourceBatchOp()\
.setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\
.lazyPrint(1, "MNIST data")\
.link(
VectorSummarizerBatchOp()\
.setSelectedCol(VECTOR_COL_NAME)\
.lazyPrintVectorSummary()
);
AkSourceBatchOp()\
.setFilePath(DATA_DIR + SPARSE_TRAIN_FILE)\
.lazyPrintStatistics()\
.groupBy(LABEL_COL_NAME, LABEL_COL_NAME + ", COUNT(*) AS cnt")\
.orderBy("cnt", 100)\
.lazyPrint(-1);
BatchOperator.execute()
#c_2
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);
Softmax()\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.enableLazyPrintTrainInfo()\
.enableLazyPrintModelInfo()\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("Softmax")
);
BatchOperator.execute()
#c_3
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);
OneVsRest()\
.setClassifier(
LogisticRegression()\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.setNumClass(10)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("OneVsRest - LogisticRegression")
);
OneVsRest()\
.setClassifier(
LinearSvm()\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.setNumClass(10)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("OneVsRest - LinearSvm")
);
BatchOperator.execute();
#c_4
useLocalEnv(4)
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);
MultilayerPerceptronClassifier()\
.setLayers([784, 10])\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("MultilayerPerceptronClassifier {784, 10}")
);
BatchOperator.execute();
MultilayerPerceptronClassifier()\
.setLayers([784, 256, 128, 10])\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("MultilayerPerceptronClassifier {784, 256, 128, 10}")
);
BatchOperator.execute();
#c_5
useLocalEnv(4)
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TRAIN_FILE)
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TABLE_TEST_FILE)
featureColNames = train_data.getColNames()
featureColNames.remove(LABEL_COL_NAME)
train_data.lazyPrint(5)
BatchOperator.execute()
sw = Stopwatch()
for treeType in ['GINI', 'INFOGAIN', 'INFOGAINRATIO'] :
sw.reset()
sw.start()
DecisionTreeClassifier()\
.setTreeType(treeType)\
.setFeatureCols(featureColNames)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.enableLazyPrintModelInfo()\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("DecisionTreeClassifier " + treeType)
);
BatchOperator.execute()
sw.stop()
print(sw.getElapsedTimeSpan())
for numTrees in [2, 4, 8, 16, 32, 64, 128] :
sw.reset();
sw.start();
RandomForestClassifier()\
.setSubsamplingRatio(0.6)\
.setNumTreesOfInfoGain(numTrees)\
.setFeatureCols(featureColNames)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.enableLazyPrintModelInfo()\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("RandomForestClassifier : " + str(numTrees))
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
#c_6
useLocalEnv(4)
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("KnnClassifier - 3 - EUCLIDEAN")
);
BatchOperator.execute();
KnnClassifier()\
.setDistanceType('COSINE')\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("KnnClassifier - 3 - COSINE")
);
BatchOperator.execute();
KnnClassifier()\
.setK(7)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("KnnClassifier - 7 - EUCLIDEAN")
);
BatchOperator.execute();
```python
```