Alink教程(Python版)

第18章 批式与流式聚类

本章包括下面各节:
18.1 稠密向量与稀疏向量
18.2 使用聚类模型预测流式数据
18.3 流式聚类

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(4)

from utils import *
import os
import pandas as pd

DATA_DIR = ROOT_DIR + "mnist" + os.sep

DENSE_TRAIN_FILE = "dense_train.ak";
SPARSE_TRAIN_FILE = "sparse_train.ak";

INIT_MODEL_FILE = "init_model.ak";
TEMP_STREAM_FILE = "temp_stream.ak";

VECTOR_COL_NAME = "vec";
LABEL_COL_NAME = "label";
PREDICTION_COL_NAME = "cluster_id";

#c_1
dense_source = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE);
sparse_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
sw = Stopwatch();

pipelineList = [
    ["KMeans EUCLIDEAN",
     Pipeline()\
        .add(
            KMeans()\
                .setK(10)\
                .setVectorCol(VECTOR_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)
        )
    ],
    ["KMeans COSINE",
     Pipeline()\
        .add(
            KMeans()\
                .setDistanceType('COSINE')\
                .setK(10)\
                .setVectorCol(VECTOR_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)
        )
    ],
    ["BisectingKMeans",
     Pipeline()\
        .add(
            BisectingKMeans()\
                .setK(10)\
                .setVectorCol(VECTOR_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)
        )
    ]
]

for pipelineTuple2 in pipelineList :
    sw.reset();
    sw.start();
    pipelineTuple2[1]\
    .fit(dense_source)\
    .transform(dense_source)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics(pipelineTuple2[0] + " DENSE")
    );
    BatchOperator.execute();
    sw.stop();
    print(sw.getElapsedTimeSpan());

    sw.reset();
    sw.start();
    pipelineTuple2[1]\
        .fit(sparse_source)\
        .transform(sparse_source)\
        .link(
            EvalClusterBatchOp()\
                .setVectorCol(VECTOR_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)\
                .setLabelCol(LABEL_COL_NAME)\
                .lazyPrintMetrics(pipelineTuple2[0] + " SPARSE")
        );
    BatchOperator.execute();
    sw.stop();
    print(sw.getElapsedTimeSpan());

#c_2
batch_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);

if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) :
    batch_source\
        .sampleWithSize(100)\
        .link(
            KMeansTrainBatchOp()\
                .setVectorCol(VECTOR_COL_NAME)\
                .setK(10)
        )\
        .link(
            AkSinkBatchOp()\
                .setFilePath(DATA_DIR + INIT_MODEL_FILE)
        );
    BatchOperator.execute();


init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

KMeansPredictBatchOp()\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .linkFrom(init_model, batch_source)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("Batch Prediction")
    );
BatchOperator.execute();

stream_source\
    .link(
        KMeansPredictStreamOp(init_model)\
            .setPredictionCol(PREDICTION_COL_NAME)
    )\
    .link(
        AkSinkStreamOp()\
            .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
            .setOverwriteSink(True)
    );
StreamOperator.execute();

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("Stream Prediction")
    );
BatchOperator.execute();
#c_3

pd.set_option('display.html.use_mathjax', False)

stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);

init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

stream_pred = stream_source\
    .link(
        StreamingKMeansStreamOp(init_model)\
            .setTimeInterval(1)\
            .setHalfLife(1)\
            .setPredictionCol(PREDICTION_COL_NAME)
    )\
    .select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME +", " + VECTOR_COL_NAME);

stream_pred.sample(0.001).print();

stream_pred\
    .link(
        AkSinkStreamOp()\
            .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
            .setOverwriteSink(True)
    );
StreamOperator.execute();

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
    .link(
        EvalClusterBatchOp()\
            .setVectorCol(VECTOR_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .lazyPrintMetrics("StreamingKMeans")
    );
BatchOperator.execute();