Alink教程(Python版)

第17章 常用的聚类算法

本章包括下面各节:
17.1 聚类评估指标
17.1.1 基本评估指标
17.1.2 基于标签值的评估指标
17.2 K-Means聚类算法
17.2.1 算法简介
17.2.2 K-Means实例
17.3 高斯混合模型算法
17.3.1 算法介绍
17.3.2 GMM实例
17.4 二分K-Means聚类算法
17.5 基于经纬度的聚类

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_rows', 200)

DATA_DIR = ROOT_DIR + "iris" + os.sep

ORIGIN_FILE = "iris.data";
VECTOR_FILE = "iris_vec.ak";

SCHEMA_STRING = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";

FEATURE_COL_NAMES = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

LABEL_COL_NAME = "category";
VECTOR_COL_NAME = "vec";
PREDICTION_COL_NAME = "cluster_id";

#c_1_2
if not(os.path.exists(DATA_DIR + VECTOR_FILE)) :
    CsvSourceBatchOp()\
        .setFilePath(DATA_DIR + ORIGIN_FILE)\
        .setSchemaStr(SCHEMA_STRING)\
        .link(
            VectorAssemblerBatchOp()\
                .setSelectedCols(FEATURE_COL_NAMES)\
                .setOutputCol(VECTOR_COL_NAME)\
                .setReservedCols(LABEL_COL_NAME)
        )\
        .link(
            AkSinkBatchOp().setFilePath(DATA_DIR + VECTOR_FILE)
        );
    BatchOperator.execute()

source = AkSourceBatchOp().setFilePath(DATA_DIR + VECTOR_FILE);

source.lazyPrint(5);

kmeans_model = KMeansTrainBatchOp()\
    .setK(2)\
    .setVectorCol(VECTOR_COL_NAME);

kmeans_pred = KMeansPredictBatchOp().setPredictionCol(PREDICTION_COL_NAME);

source.link(kmeans_model);
kmeans_pred.linkFrom(kmeans_model, source);

kmeans_model.lazyPrintModelInfo();

kmeans_pred.lazyPrint(5);

kmeans_pred\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .lazyPrintMetrics("KMeans EUCLIDEAN")
    );

kmeans_pred\
    .orderBy(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME, 200)\
    .lazyPrint(-1, "all data");

BatchOperator.execute()

KMeans()\
    .setK(2)\
    .setDistanceType('COSINE')\
    .setVectorCol(VECTOR_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .enableLazyPrintModelInfo()\
    .fit(source)\
    .transform(source)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("KMeans COSINE")
    );
BatchOperator.execute()
#c_2_2
source = AkSourceBatchOp().setFilePath(DATA_DIR + VECTOR_FILE);

GaussianMixture()\
    .setK(2)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .enableLazyPrintModelInfo()\
    .fit(source)\
    .transform(source)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("GaussianMixture 2")
    );
BatchOperator.execute()

#c_3
source = AkSourceBatchOp().setFilePath(DATA_DIR + VECTOR_FILE);

BisectingKMeans()\
    .setK(3)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .enableLazyPrintModelInfo("BiSecting KMeans EUCLIDEAN")\
    .fit(source)\
    .transform(source)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("Bisecting KMeans EUCLIDEAN")
    );
BatchOperator.execute();

BisectingKMeans()\
    .setDistanceType('COSINE')\
    .setK(3)\
    .setVectorCol(VECTOR_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .enableLazyPrintModelInfo("BiSecting KMeans COSINE")\
    .fit(source)\
    .transform(source)\
    .link(
        EvalClusterBatchOp()\
            .setDistanceType("COSINE")\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("Bisecting KMeans COSINE")
    );
BatchOperator.execute();

#c_4

df = pd.DataFrame(
    [
        ["Alabama", "South", "East South Central", -86.7509, 32.5901],
        ["Alaska", "West", "Pacific", -127.25, 49.25],
        ["Arizona", "West", "Mountain", -111.625, 34.2192],
        ["Arkansas", "South", "West South Central", -92.2992, 34.7336],
        ["California", "West", "Pacific", -119.773, 36.5341],
        ["Colorado", "West", "Mountain", -105.513, 38.6777],
        ["Connecticut", "Northeast", "New England", -72.3573, 41.5928],
        ["Delaware", "South", "South Atlantic", -74.9841, 38.6777],
        ["Florida", "South", "South Atlantic", -81.685, 27.8744],
        ["Georgia", "South", "South Atlantic", -83.3736, 32.3329],
        ["Hawaii", "West", "Pacific", -126.25, 31.75],
        ["Idaho", "West", "Mountain", -113.93, 43.5648],
        ["Illinois", "North Central", "East North Central", -89.3776, 40.0495],
        ["Indiana", "North Central", "East North Central", -86.0808, 40.0495],
        ["Iowa", "North Central", "West North Central", -93.3714, 41.9358],
        ["Kansas", "North Central", "West North Central", -98.1156, 38.4204],
        ["Kentucky", "South", "East South Central", -84.7674, 37.3915],
        ["Louisiana", "South", "West South Central", -92.2724, 30.6181],
        ["Maine", "Northeast", "New England", -68.9801, 45.6226],
        ["Maryland", "South", "South Atlantic", -76.6459, 39.2778],
        ["Massachusetts", "Northeast", "New England", -71.58, 42.3645],
        ["Michigan", "North Central", "East North Central", -84.687, 43.1361],
        ["Minnesota", "North Central", "West North Central", -94.6043, 46.3943],
        ["Mississippi", "South", "East South Central", -89.8065, 32.6758],
        ["Missouri", "North Central", "West North Central", -92.5137, 38.3347],
        ["Montana", "West", "Mountain", -109.32, 46.823],
        ["Nebraska", "North Central", "West North Central", -99.5898, 41.3356],
        ["Nevada", "West", "Mountain", -116.851, 39.1063],
        ["New Hampshire", "Northeast", "New England", -71.3924, 43.3934],
        ["New Jersey", "Northeast", "Middle Atlantic", -74.2336, 39.9637],
        ["New Mexico", "West", "Mountain", -105.942, 34.4764],
        ["New York", "Northeast", "Middle Atlantic", -75.1449, 43.1361],
        ["North Carolina", "South", "South Atlantic", -78.4686, 35.4195],
        ["North Dakota", "North Central", "West North Central", -100.099, 47.2517],
        ["Ohio", "North Central", "East North Central", -82.5963, 40.221],
        ["Oklahoma", "South", "West South Central", -97.1239, 35.5053],
        ["Oregon", "West", "Pacific", -120.068, 43.9078],
        ["Pennsylvania", "Northeast", "Middle Atlantic", -77.45, 40.9069],
        ["Rhode Island", "Northeast", "New England", -71.1244, 41.5928],
        ["South Carolina", "South", "South Atlantic", -80.5056, 33.619],
        ["South Dakota", "North Central", "West North Central", -99.7238, 44.3365],
        ["Tennessee", "South", "East South Central", -86.456, 35.6767],
        ["Texas", "South", "West South Central", -98.7857, 31.3897],
        ["Utah", "West", "Mountain", -111.33, 39.1063],
        ["Vermont", "Northeast", "New England", -72.545, 44.2508],
        ["Virginia", "South", "South Atlantic", -78.2005, 37.563],
        ["Washington", "West", "Pacific", -119.746, 47.4231],
        ["West Virginia", "South", "South Atlantic", -80.6665, 38.4204],
        ["Wisconsin", "North Central", "East North Central", -89.9941, 44.5937],
        ["Wyoming", "West", "Mountain", -107.256, 43.0504]
    ]
)
schema_str = "State string, Region string, Division string, longitude double, latitude double"
source = BatchOperator.fromDataframe(df, schema_str)

source.lazyPrint(5);

source.select("Region").distinct().lazyPrint(-1);

source.select("Division").distinct().lazyPrint(-1);

source\
    .groupBy("Region, Division", "Region, Division, COUNT(*) AS numStates")\
    .orderBy("Region, Division", 100)\
    .lazyPrint(-1);

for nClusters in [2, 4] :
    pred = GeoKMeans()\
        .setLongitudeCol("longitude")\
        .setLatitudeCol("latitude")\
        .setPredictionCol(PREDICTION_COL_NAME)\
        .setK(nClusters)\
        .fit(source)\
        .transform(source);

    pred.link(
        EvalClusterBatchOp()\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol("Region")\
            .lazyPrintMetrics(str(nClusters) + " with Region")
    );
    pred.link(
        EvalClusterBatchOp()\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol("Division")\
            .lazyPrintMetrics(str(nClusters) + " with Division")
    );
    BatchOperator.execute()