本章包括下面各节:
18.1 稠密向量与稀疏向量
18.2 使用聚类模型预测流式数据
18.3 流式聚类
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink; import org.apache.flink.api.java.tuple.Tuple2; import com.alibaba.alink.common.utils.Stopwatch; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.clustering.KMeansPredictBatchOp; import com.alibaba.alink.operator.batch.clustering.KMeansTrainBatchOp; import com.alibaba.alink.operator.batch.evaluation.EvalClusterBatchOp; import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp; import com.alibaba.alink.operator.batch.source.AkSourceBatchOp; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.clustering.KMeansPredictStreamOp; import com.alibaba.alink.operator.stream.clustering.StreamingKMeansStreamOp; import com.alibaba.alink.operator.stream.sink.AkSinkStreamOp; import com.alibaba.alink.operator.stream.source.AkSourceStreamOp; import com.alibaba.alink.params.shared.clustering.HasKMeansDistanceType.DistanceType; import com.alibaba.alink.pipeline.Pipeline; import com.alibaba.alink.pipeline.clustering.BisectingKMeans; import com.alibaba.alink.pipeline.clustering.KMeans; import java.io.File; import java.util.ArrayList; public class Chap18 { static final String DATA_DIR = Utils.ROOT_DIR + "mnist" + File.separator; static final String DENSE_TRAIN_FILE = "dense_train.ak"; static final String SPARSE_TRAIN_FILE = "sparse_train.ak"; static final String INIT_MODEL_FILE = "init_model.ak"; static final String TEMP_STREAM_FILE = "temp_stream.ak"; static final String VECTOR_COL_NAME = "vec"; static final String LABEL_COL_NAME = "label"; static final String PREDICTION_COL_NAME = "cluster_id"; public static void main(String[] args) throws Exception { BatchOperator.setParallelism(4); c_1(); c_2(); c_3(); } static void c_1() throws Exception { AkSourceBatchOp dense_source = new AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE); AkSourceBatchOp sparse_source = new AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); Stopwatch sw = new Stopwatch(); ArrayList <Tuple2 <String, Pipeline>> pipelineList = new ArrayList <>(); pipelineList.add(new Tuple2 <>("KMeans EUCLIDEAN", new Pipeline() .add( new KMeans() .setK(10) .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) ) )); pipelineList.add(new Tuple2 <>("KMeans COSINE", new Pipeline() .add( new KMeans() .setDistanceType(DistanceType.COSINE) .setK(10) .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) ) )); pipelineList.add(new Tuple2 <>("BisectingKMeans", new Pipeline() .add( new BisectingKMeans() .setK(10) .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) ) )); for (Tuple2 <String, Pipeline> pipelineTuple2 : pipelineList) { sw.reset(); sw.start(); pipelineTuple2.f1 .fit(dense_source) .transform(dense_source) .link( new EvalClusterBatchOp() .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setLabelCol(LABEL_COL_NAME) .lazyPrintMetrics(pipelineTuple2.f0 + " DENSE") ); BatchOperator.execute(); sw.stop(); System.out.println(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); pipelineTuple2.f1 .fit(sparse_source) .transform(sparse_source) .link( new EvalClusterBatchOp() .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setLabelCol(LABEL_COL_NAME) .lazyPrintMetrics(pipelineTuple2.f0 + " SPARSE") ); BatchOperator.execute(); sw.stop(); System.out.println(sw.getElapsedTimeSpan()); } } static void c_2() throws Exception { AkSourceBatchOp batch_source = new AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); AkSourceStreamOp stream_source = new AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); if (!new File(DATA_DIR + INIT_MODEL_FILE).exists()) { batch_source .sampleWithSize(100) .link( new KMeansTrainBatchOp() .setVectorCol(VECTOR_COL_NAME) .setK(10) ) .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + INIT_MODEL_FILE) ); BatchOperator.execute(); } AkSourceBatchOp init_model = new AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); new KMeansPredictBatchOp() .setPredictionCol(PREDICTION_COL_NAME) .linkFrom(init_model, batch_source) .link( new EvalClusterBatchOp() .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setLabelCol(LABEL_COL_NAME) .lazyPrintMetrics("Batch Prediction") ); BatchOperator.execute(); stream_source .link( new KMeansPredictStreamOp(init_model) .setPredictionCol(PREDICTION_COL_NAME) ) .link( new AkSinkStreamOp() .setFilePath(DATA_DIR + TEMP_STREAM_FILE) .setOverwriteSink(true) ); StreamOperator.execute(); new AkSourceBatchOp() .setFilePath(DATA_DIR + TEMP_STREAM_FILE) .link( new EvalClusterBatchOp() .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setLabelCol(LABEL_COL_NAME) .lazyPrintMetrics("Stream Prediction") ); BatchOperator.execute(); } static void c_3() throws Exception { AkSourceStreamOp stream_source = new AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE); AkSourceBatchOp init_model = new AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); StreamOperator<?> stream_pred = stream_source .link( new StreamingKMeansStreamOp(init_model) .setTimeInterval(1L) .setHalfLife(1) .setPredictionCol(PREDICTION_COL_NAME) ) .select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME +", " + VECTOR_COL_NAME); stream_pred.sample(0.001).print(); stream_pred .link( new AkSinkStreamOp() .setFilePath(DATA_DIR + TEMP_STREAM_FILE) .setOverwriteSink(true) ); StreamOperator.execute(); new AkSourceBatchOp() .setFilePath(DATA_DIR + TEMP_STREAM_FILE) .link( new EvalClusterBatchOp() .setVectorCol(VECTOR_COL_NAME) .setPredictionCol(PREDICTION_COL_NAME) .setLabelCol(LABEL_COL_NAME) .lazyPrintMetrics("StreamingKMeans") ); BatchOperator.execute(); } }