本章包括下面各节:
18.1 稠密向量与稀疏向量
18.2 使用聚类模型预测流式数据
18.3 流式聚类
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(4) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "mnist" + os.sep DENSE_TRAIN_FILE = "dense_train.ak"; SPARSE_TRAIN_FILE = "sparse_train.ak"; INIT_MODEL_FILE = "init_model.ak"; TEMP_STREAM_FILE = "temp_stream.ak"; VECTOR_COL_NAME = "vec"; LABEL_COL_NAME = "label"; PREDICTION_COL_NAME = "cluster_id";
#c_1
dense_source = AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE);
sparse_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
sw = Stopwatch();
pipelineList = [
["KMeans EUCLIDEAN",
Pipeline()\
.add(
KMeans()\
.setK(10)\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)
],
["KMeans COSINE",
Pipeline()\
.add(
KMeans()\
.setDistanceType('COSINE')\
.setK(10)\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)
],
["BisectingKMeans",
Pipeline()\
.add(
BisectingKMeans()\
.setK(10)\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)
)
]
]
for pipelineTuple2 in pipelineList :
sw.reset();
sw.start();
pipelineTuple2[1]\
.fit(dense_source)\
.transform(dense_source)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics(pipelineTuple2[0] + " DENSE")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
sw.reset();
sw.start();
pipelineTuple2[1]\
.fit(sparse_source)\
.transform(sparse_source)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics(pipelineTuple2[0] + " SPARSE")
);
BatchOperator.execute();
sw.stop();
print(sw.getElapsedTimeSpan());
#c_2
batch_source = AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) :
batch_source\
.sampleWithSize(100)\
.link(
KMeansTrainBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setK(10)
)\
.link(
AkSinkBatchOp()\
.setFilePath(DATA_DIR + INIT_MODEL_FILE)
);
BatchOperator.execute();
init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);
KMeansPredictBatchOp()\
.setPredictionCol(PREDICTION_COL_NAME)\
.linkFrom(init_model, batch_source)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics("Batch Prediction")
);
BatchOperator.execute();
stream_source\
.link(
KMeansPredictStreamOp(init_model)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.link(
AkSinkStreamOp()\
.setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
.setOverwriteSink(True)
);
StreamOperator.execute();
AkSourceBatchOp()\
.setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics("Stream Prediction")
);
BatchOperator.execute();
#c_3
pd.set_option('display.html.use_mathjax', False)
stream_source = AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
init_model = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);
stream_pred = stream_source\
.link(
StreamingKMeansStreamOp(init_model)\
.setTimeInterval(1)\
.setHalfLife(1)\
.setPredictionCol(PREDICTION_COL_NAME)
)\
.select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME +", " + VECTOR_COL_NAME);
stream_pred.sample(0.001).print();
stream_pred\
.link(
AkSinkStreamOp()\
.setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
.setOverwriteSink(True)
);
StreamOperator.execute();
AkSourceBatchOp()\
.setFilePath(DATA_DIR + TEMP_STREAM_FILE)\
.link(
EvalClusterBatchOp()\
.setVectorCol(VECTOR_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.lazyPrintMetrics("StreamingKMeans")
);
BatchOperator.execute();
```python
```