本章包括下面各节:
18.1 稠密向量与稀疏向量
18.2 使用聚类模型预测流式数据
18.3 流式聚类
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(4) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "mnist" + os.sep DENSE_TRAIN_FILE = "dense_train.ak"; SPARSE_TRAIN_FILE = "sparse_train.ak"; INIT_MODEL_FILE = "init_model.ak"; TEMP_STREAM_FILE = "temp_stream.ak"; VECTOR_COL_NAME = "vec"; LABEL_COL_NAME = "label"; PREDICTION_COL_NAME = "cluster_id";
#c_1 dense_source = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE); sparse_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); sw = Stopwatch(); pipelineList = [ ["KMeans EUCLIDEAN", Pipeline()\ .add( KMeans()\ .setK(10)\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) ) ], ["KMeans COSINE", Pipeline()\ .add( KMeans()\ .setDistanceType('COSINE')\ .setK(10)\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) ) ], ["BisectingKMeans", Pipeline()\ .add( BisectingKMeans()\ .setK(10)\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME) ) ] ] for pipelineTuple2 in pipelineList : sw.reset(); sw.start(); pipelineTuple2[1]\ .fit(dense_source)\ .transform(dense_source)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics(pipelineTuple2[0] + " DENSE") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); pipelineTuple2[1]\ .fit(sparse_source)\ .transform(sparse_source)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics(pipelineTuple2[0] + " SPARSE") ); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan());
#c_2 batch_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) : batch_source\ .sampleWithSize(100)\ .link( KMeansTrainBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setK(10) )\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + INIT_MODEL_FILE) ); BatchOperator.execute(); init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); KMeansPredictBatchOp()\ .setPredictionCol(PREDICTION_COL_NAME)\ .linkFrom(init_model, batch_source)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics("Batch Prediction") ); BatchOperator.execute(); stream_source\ .link( KMeansPredictStreamOp(init_model)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .link( AkSinkStreamOp()\ .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\ .setOverwriteSink(True) ); StreamOperator.execute(); AkSourceBatchOp()\ .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics("Stream Prediction") ); BatchOperator.execute();
#c_3 pd.set_option('display.html.use_mathjax', False) stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); stream_pred = stream_source\ .link( StreamingKMeansStreamOp(init_model)\ .setTimeInterval(1)\ .setHalfLife(1)\ .setPredictionCol(PREDICTION_COL_NAME) )\ .select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME +", " + VECTOR_COL_NAME); stream_pred.sample(0.001).print(); stream_pred\ .link( AkSinkStreamOp()\ .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\ .setOverwriteSink(True) ); StreamOperator.execute(); AkSourceBatchOp()\ .setFilePath(DATA_DIR + TEMP_STREAM_FILE)\ .link( EvalClusterBatchOp()\ .setVectorCol(VECTOR_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .lazyPrintMetrics("StreamingKMeans") ); BatchOperator.execute();