本章包括下面各节:
19.1 主成分的含义
19.2 两种计算方式
19.3 在聚类方面的应用
19.4 在分类方面的应用
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "mnist" + os.sep DENSE_TRAIN_FILE = "dense_train.ak"; DENSE_TEST_FILE = "dense_test.ak"; SPARSE_TRAIN_FILE = "sparse_train.ak"; SPARSE_TEST_FILE = "sparse_test.ak"; PCA_MODEL_FILE = "pca_model.ak"; VECTOR_COL_NAME = "vec"; LABEL_COL_NAME = "label"; PREDICTION_COL_NAME = "pred";
#c_1 df = pd.DataFrame( [ ["ALABAMA", 14.2, 25.2, 96.8, 278.3, 1135.5, 1881.9, 280.7], ["ALASKA", 10.8, 51.6, 96.8, 284.0, 1331.7, 3369.8, 753.3], ["ARIZONA", 9.5, 34.2, 138.2, 312.3, 2346.1, 4467.4, 439.5], ["ARKANSAS", 8.8, 27.6, 83.2, 203.4, 972.6, 1862.1, 183.4], ["CALIFORNIA", 11.5, 49.4, 287.0, 358.0, 2139.4, 3499.8, 663.5], ["COLORADO", 6.3, 42.0, 170.7, 292.9, 1935.2, 3903.2, 477.1], ["CONNECTICUT", 4.2, 16.8, 129.5, 131.8, 1346.0, 2620.7, 593.2], ["DELAWARE", 6.0, 24.9, 157.0, 194.2, 1682.6, 3678.4, 467.0], ["FLORIDA", 10.2, 39.6, 187.9, 449.1, 1859.9, 3840.5, 351.4], ["GEORGIA", 11.7, 31.1, 140.5, 256.5, 1351.1, 2170.2, 297.9], ["HAWAII", 7.2, 25.5, 128.0, 64.1, 1911.5, 3920.4, 489.4], ["IDAHO", 5.5, 19.4, 39.6, 172.5, 1050.8, 2599.6, 237.6], ["ILLINOIS", 9.9, 21.8, 211.3, 209.0, 1085.0, 2828.5, 528.6], ["INDIANA", 7.4, 26.5, 123.2, 153.5, 1086.2, 2498.7, 377.4], ["IOWA", 2.3, 10.6, 41.2, 89.8, 812.5, 2685.1, 219.9], ["KANSAS", 6.6, 22.0, 100.7, 180.5, 1270.4, 2739.3, 244.3], ["KENTUCKY", 10.1, 19.1, 81.1, 123.3, 872.2, 1662.1, 245.4], ["LOUISIANA", 15.5, 30.9, 142.9, 335.5, 1165.5, 2469.9, 337.7], ["MAINE", 2.4, 13.5, 38.7, 170.0, 1253.1, 2350.7, 246.9], ["MARYLAND", 8.0, 34.8, 292.1, 358.9, 1400.0, 3177.7, 428.5], ["MASSACHUSETTS", 3.1, 20.8, 169.1, 231.6, 1532.2, 2311.3, 1140.1], ["MICHIGAN", 9.3, 38.9, 261.9, 274.6, 1522.7, 3159.0, 545.5], ["MINNESOTA", 2.7, 19.5, 85.9, 85.8, 1134.7, 2559.3, 343.1], ["MISSISSIPPI", 14.3, 19.6, 65.7, 189.1, 915.6, 1239.9, 144.4], ["MISSOURI", 9.6, 28.3, 189.0, 233.5, 1318.3, 2424.2, 378.4], ["MONTANA", 5.4, 16.7, 39.2, 156.8, 804.9, 2773.2, 309.2], ["NEBRASKA", 3.9, 18.1, 64.7, 112.7, 760.0, 2316.1, 249.1], ["NEVADA", 15.8, 49.1, 323.1, 355.0, 2453.1, 4212.6, 559.2], ["NEW HAMPSHIRE", 3.2, 10.7, 23.2, 76.0, 1041.7, 2343.9, 293.4], ["NEW JERSEY", 5.6, 21.0, 180.4, 185.1, 1435.8, 2774.5, 511.5], ["NEW MEXICO", 8.8, 39.1, 109.6, 343.4, 1418.7, 3008.6, 259.5], ["NEW YORK", 10.7, 29.4, 472.6, 319.1, 1728.0, 2782.0, 745.8], ["NORTH CAROLINA", 10.6, 17.0, 61.3, 318.3, 1154.1, 2037.8, 192.1], ["NORTH DAKOTA", 0.9, 9.0, 13.3, 43.8, 446.1, 1843.0, 144.7], ["OHIO", 7.8, 27.3, 190.5, 181.1, 1216.0, 2696.8, 400.4], ["OKLAHOMA", 8.6, 29.2, 73.8, 205.0, 1288.2, 2228.1, 326.8], ["OREGON", 4.9, 39.9, 124.1, 286.9, 1636.4, 3506.1, 388.9], ["PENNSYLVANIA", 5.6, 19.0, 130.3, 128.0, 877.5, 1624.1, 333.2], ["RHODE ISLAND", 3.6, 10.5, 86.5, 201.0, 1489.5, 2844.1, 791.4], ["SOUTH CAROLINA", 11.9, 33.0, 105.9, 485.3, 1613.6, 2342.4, 245.1], ["SOUTH DAKOTA", 2.0, 13.5, 17.9, 155.7, 570.5, 1704.4, 147.5], ["TENNESSEE", 10.1, 29.7, 145.8, 203.9, 1259.7, 1776.5, 314.0], ["TEXAS", 13.3, 33.8, 152.4, 208.2, 1603.1, 2988.7, 397.6], ["UTAH", 3.5, 20.3, 68.8, 147.3, 1171.6, 3004.6, 334.5], ["VERMONT", 1.4, 15.9, 30.8, 101.2, 1348.2, 2201.0, 265.2], ["VIRGINIA", 9.0, 23.3, 92.1, 165.7, 986.2, 2521.2, 226.7], ["WASHINGTON", 4.3, 39.6, 106.2, 224.8, 1605.6, 3386.9, 360.3], ["WEST VIRGINIA", 6.0, 13.2, 42.2, 90.9, 597.4, 1341.7, 163.3], ["WISCONSIN", 2.8, 12.9, 52.2, 63.7, 846.9, 2614.2, 220.7], ["WYOMING", 5.4, 21.9, 39.7, 173.9, 811.6, 2772.2, 282.0] ] ) schema_str = "state string, murder double, rape double, robbery double, "\ + "assault double, burglary double, larceny double, auto double" source = BatchOperator.fromDataframe(df, schema_str) source.lazyPrint(10, "Origin data"); pca_result = PCA()\ .setK(4)\ .setSelectedCols(["murder", "rape", "robbery", "assault", "burglary", "larceny", "auto"])\ .setPredictionCol(VECTOR_COL_NAME)\ .enableLazyPrintModelInfo()\ .fit(source)\ .transform(source)\ .link( VectorToColumnsBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setSchemaStr("prin1 double, prin2 double, prin3 double, prin4 double")\ .setReservedCols(["state"]) )\ .lazyPrint(10, "state with principle components"); pca_result\ .select("state, prin1")\ .orderBy("prin1", limit = 100, order = 'desc')\ .lazyPrint(-1, "Order by prin1"); pca_result\ .select("state, prin2")\ .orderBy("prin2", limit = 100, order = 'desc')\ .lazyPrint(-1, "Order by prin2"); BatchOperator.execute();
#c_2 std_pca = Pipeline()\ .add( StandardScaler()\ .setSelectedCols(["murder", "rape", "robbery", "assault", "burglary", "larceny", "auto"]) )\ .add( PCA()\ .setCalculationType('COV')\ .setK(4)\ .setSelectedCols(["murder", "rape", "robbery", "assault", "burglary", "larceny", "auto"])\ .setPredictionCol(VECTOR_COL_NAME)\ .enableLazyPrintModelInfo() ); std_pca\ .fit(source)\ .transform(source)\ .link( VectorToColumnsBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setSchemaStr("prin1 double, prin2 double, " + "prin3 double, prin4 double")\ .setReservedCols(["state"]) )\ .lazyPrint(10, "state with principle components"); BatchOperator.execute();
#c_3 source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); source\ .link( PcaTrainBatchOp()\ .setK(39)\ .setCalculationType('COV')\ .setVectorCol(VECTOR_COL_NAME)\ .lazyPrintModelInfo() )\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + PCA_MODEL_FILE)\ .setOverwriteSink(True) ); BatchOperator.execute(); sw = Stopwatch(); kmeans = KMeans()\ .setK(10)\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME); sw.reset(); sw.start(); kmeans\ .fit(source)\ .transform(source)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics("KMeans") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); pca_result = PcaPredictBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(VECTOR_COL_NAME)\ .linkFrom( AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE), source ); sw.reset(); sw.start(); kmeans\ .fit(pca_result)\ .transform(pca_result)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics("KMeans + PCA") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan());
#c_4 useLocalEnv(4) dense_train_data = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE); dense_test_data = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TEST_FILE); sparse_train_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); sparse_test_data = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TEST_FILE); sw = Stopwatch(); sw.reset(); sw.start(); KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(dense_train_data)\ .transform(dense_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("KnnClassifier Dense") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .fit(sparse_train_data)\ .transform(sparse_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("KnnClassifier Sparse") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); Pipeline()\ .add( PCA()\ .setK(39)\ .setCalculationType('COV')\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(VECTOR_COL_NAME) )\ .add( KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .fit(dense_train_data)\ .transform(dense_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("Knn with PCA Dense") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); Pipeline()\ .add( PCA()\ .setK(39)\ .setCalculationType('COV')\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(VECTOR_COL_NAME) )\ .add( KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .fit(sparse_train_data)\ .transform(sparse_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("Knn with PCA Sparse") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); Pipeline()\ .add( PCAModel()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(VECTOR_COL_NAME)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE)) )\ .add( KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .fit(dense_train_data)\ .transform(dense_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("Knn PCAModel Dense") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); Pipeline()\ .add( PCAModel()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(VECTOR_COL_NAME)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + PCA_MODEL_FILE)) )\ .add( KnnClassifier()\ .setK(3)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .fit(sparse_train_data)\ .transform(sparse_test_data)\ .link( EvalMultiClassBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics("Knn PCAModel Sparse") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan());