本章包括下面各节:
19.1 主成分的含义
19.2 两种计算方式
19.3 在聚类方面的应用
19.4 在分类方面的应用
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "mnist" + os.sep DENSE_TRAIN_FILE = "dense_train.ak"; DENSE_TEST_FILE = "dense_test.ak"; SPARSE_TRAIN_FILE = "sparse_train.ak"; SPARSE_TEST_FILE = "sparse_test.ak"; PCA_MODEL_FILE = "pca_model.ak"; VECTOR_COL_NAME = "vec"; LABEL_COL_NAME = "label"; PREDICTION_COL_NAME = "pred";
#c_1
df = pd.DataFrame(
[
["ALABAMA", 14.2, 25.2, 96.8, 278.3, 1135.5, 1881.9, 280.7],
["ALASKA", 10.8, 51.6, 96.8, 284.0, 1331.7, 3369.8, 753.3],
["ARIZONA", 9.5, 34.2, 138.2, 312.3, 2346.1, 4467.4, 439.5],
["ARKANSAS", 8.8, 27.6, 83.2, 203.4, 972.6, 1862.1, 183.4],
["CALIFORNIA", 11.5, 49.4, 287.0, 358.0, 2139.4, 3499.8, 663.5],
["COLORADO", 6.3, 42.0, 170.7, 292.9, 1935.2, 3903.2, 477.1],
["CONNECTICUT", 4.2, 16.8, 129.5, 131.8, 1346.0, 2620.7, 593.2],
["DELAWARE", 6.0, 24.9, 157.0, 194.2, 1682.6, 3678.4, 467.0],
["FLORIDA", 10.2, 39.6, 187.9, 449.1, 1859.9, 3840.5, 351.4],
["GEORGIA", 11.7, 31.1, 140.5, 256.5, 1351.1, 2170.2, 297.9],
["HAWAII", 7.2, 25.5, 128.0, 64.1, 1911.5, 3920.4, 489.4],
["IDAHO", 5.5, 19.4, 39.6, 172.5, 1050.8, 2599.6, 237.6],
["ILLINOIS", 9.9, 21.8, 211.3, 209.0, 1085.0, 2828.5, 528.6],
["INDIANA", 7.4, 26.5, 123.2, 153.5, 1086.2, 2498.7, 377.4],
["IOWA", 2.3, 10.6, 41.2, 89.8, 812.5, 2685.1, 219.9],
["KANSAS", 6.6, 22.0, 100.7, 180.5, 1270.4, 2739.3, 244.3],
["KENTUCKY", 10.1, 19.1, 81.1, 123.3, 872.2, 1662.1, 245.4],
["LOUISIANA", 15.5, 30.9, 142.9, 335.5, 1165.5, 2469.9, 337.7],
["MAINE", 2.4, 13.5, 38.7, 170.0, 1253.1, 2350.7, 246.9],
["MARYLAND", 8.0, 34.8, 292.1, 358.9, 1400.0, 3177.7, 428.5],
["MASSACHUSETTS", 3.1, 20.8, 169.1, 231.6, 1532.2, 2311.3, 1140.1],
["MICHIGAN", 9.3, 38.9, 261.9, 274.6, 1522.7, 3159.0, 545.5],
["MINNESOTA", 2.7, 19.5, 85.9, 85.8, 1134.7, 2559.3, 343.1],
["MISSISSIPPI", 14.3, 19.6, 65.7, 189.1, 915.6, 1239.9, 144.4],
["MISSOURI", 9.6, 28.3, 189.0, 233.5, 1318.3, 2424.2, 378.4],
["MONTANA", 5.4, 16.7, 39.2, 156.8, 804.9, 2773.2, 309.2],
["NEBRASKA", 3.9, 18.1, 64.7, 112.7, 760.0, 2316.1, 249.1],
["NEVADA", 15.8, 49.1, 323.1, 355.0, 2453.1, 4212.6, 559.2],
["NEW HAMPSHIRE", 3.2, 10.7, 23.2, 76.0, 1041.7, 2343.9, 293.4],
["NEW JERSEY", 5.6, 21.0, 180.4, 185.1, 1435.8, 2774.5, 511.5],
["NEW MEXICO", 8.8, 39.1, 109.6, 343.4, 1418.7, 3008.6, 259.5],
["NEW YORK", 10.7, 29.4, 472.6, 319.1, 1728.0, 2782.0, 745.8],
["NORTH CAROLINA", 10.6, 17.0, 61.3, 318.3, 1154.1, 2037.8, 192.1],
["NORTH DAKOTA", 0.9, 9.0, 13.3, 43.8, 446.1, 1843.0, 144.7],
["OHIO", 7.8, 27.3, 190.5, 181.1, 1216.0, 2696.8, 400.4],
["OKLAHOMA", 8.6, 29.2, 73.8, 205.0, 1288.2, 2228.1, 326.8],
["OREGON", 4.9, 39.9, 124.1, 286.9, 1636.4, 3506.1, 388.9],
["PENNSYLVANIA", 5.6, 19.0, 130.3, 128.0, 877.5, 1624.1, 333.2],
["RHODE ISLAND", 3.6, 10.5, 86.5, 201.0, 1489.5, 2844.1, 791.4],
["SOUTH CAROLINA", 11.9, 33.0, 105.9, 485.3, 1613.6, 2342.4, 245.1],
["SOUTH DAKOTA", 2.0, 13.5, 17.9, 155.7, 570.5, 1704.4, 147.5],
["TENNESSEE", 10.1, 29.7, 145.8, 203.9, 1259.7, 1776.5, 314.0],
["TEXAS", 13.3, 33.8, 152.4, 208.2, 1603.1, 2988.7, 397.6],
["UTAH", 3.5, 20.3, 68.8, 147.3, 1171.6, 3004.6, 334.5],
["VERMONT", 1.4, 15.9, 30.8, 101.2, 1348.2, 2201.0, 265.2],
["VIRGINIA", 9.0, 23.3, 92.1, 165.7, 986.2, 2521.2, 226.7],
["WASHINGTON", 4.3, 39.6, 106.2, 224.8, 1605.6, 3386.9, 360.3],
["WEST VIRGINIA", 6.0, 13.2, 42.2, 90.9, 597.4, 1341.7, 163.3],
["WISCONSIN", 2.8, 12.9, 52.2, 63.7, 846.9, 2614.2, 220.7],
["WYOMING", 5.4, 21.9, 39.7, 173.9, 811.6, 2772.2, 282.0]
]
)
schema_str = "state string, murder double, rape double, robbery double, "\
+ "assault double, burglary double, larceny double, auto double"
source = BatchOperator.fromDataframe(df, schema_str)
source.lazyPrint(10, "Origin data");
pca_result = PCA()\
.setK(4)\
.setSelectedCols(["murder", "rape", "robbery", "assault",
"burglary", "larceny", "auto"])\
.setPredictionCol(VECTOR_COL_NAME)\
.enableLazyPrintModelInfo()\
.fit(source)\
.transform(source)\
.link(
VectorToColumnsBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setSchemaStr("prin1 double, prin2 double, prin3 double, prin4 double")\
.setReservedCols(["state"])
)\
.lazyPrint(10, "state with principle components");
pca_result\
.select("state, prin1")\
.orderBy("prin1", limit = 100, order = 'desc')\
.lazyPrint(-1, "Order by prin1");
pca_result\
.select("state, prin2")\
.orderBy("prin2", limit = 100, order = 'desc')\
.lazyPrint(-1, "Order by prin2");
BatchOperator.execute();
#c_2
std_pca = Pipeline()\
.add(
StandardScaler()\
.setSelectedCols(["murder", "rape", "robbery", "assault",
"burglary", "larceny", "auto"])
)\
.add(
PCA()\
.setCalculationType('COV')\
.setK(4)\
.setSelectedCols(["murder", "rape", "robbery", "assault",
"burglary", "larceny", "auto"])\
.setPredictionCol(VECTOR_COL_NAME)\
.enableLazyPrintModelInfo()
);
std_pca\
.fit(source)\
.transform(source)\
.link(
VectorToColumnsBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setSchemaStr("prin1 double, prin2 double, "
+ "prin3 double, prin4 double")\
.setReservedCols(["state"])
)\
.lazyPrint(10, "state with principle components");
BatchOperator.execute();
#c_3
source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
source\
.link(
PcaTrainBatchOp()\
.setK(39)\
.setCalculationType('COV')\
.setVectorCol(VECTOR_COL_NAME)\
.lazyPrintModelInfo()
)\
.link(
AkSinkBatchOp()\
.setFilePath(DATA_DIR + PCA_MODEL_FILE)\
.setOverwriteSink(True)
);
BatchOperator.execute();
sw = Stopwatch();
kmeans = KMeans()\
.setK(10)\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME);
sw.reset();
sw.start();
kmeans\
.fit(source)\
.transform(source)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics("KMeans")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
pca_result = PcaPredictBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(VECTOR_COL_NAME)\
.linkFrom(
AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE),
source
);
sw.reset();
sw.start();
kmeans\
.fit(pca_result)\
.transform(pca_result)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics("KMeans + PCA")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
#c_4
useLocalEnv(4)
dense_train_data = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE);
dense_test_data = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE);
sparse_train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
sparse_test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE);
sw = Stopwatch();
sw.reset();
sw.start();
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(dense_train_data)\
.transform(dense_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("KnnClassifier Dense")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(sparse_train_data)\
.transform(sparse_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("KnnClassifier Sparse")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
Pipeline()\
.add(
PCA()\
.setK(39)\
.setCalculationType('COV')\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(VECTOR_COL_NAME)
)\
.add(
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.fit(dense_train_data)\
.transform(dense_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("Knn with PCA Dense")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
Pipeline()\
.add(
PCA()\
.setK(39)\
.setCalculationType('COV')\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(VECTOR_COL_NAME)
)\
.add(
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.fit(sparse_train_data)\
.transform(sparse_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("Knn with PCA Sparse")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
Pipeline()\
.add(
PCAModel()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(VECTOR_COL_NAME)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE))
)\
.add(
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.fit(dense_train_data)\
.transform(dense_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("Knn PCAModel Dense")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
Pipeline()\
.add(
PCAModel()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(VECTOR_COL_NAME)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE))
)\
.add(
KnnClassifier()\
.setK(3)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.fit(sparse_train_data)\
.transform(sparse_test_data)\
.link(
EvalMultiClassBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("Knn PCAModel Sparse")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
```python
```