Alink教程(Java版)

第18章 批式与流式聚类

本章包括下面各节:
18.1 稠密向量与稀疏向量
18.2 使用聚类模型预测流式数据
18.3 流式聚类

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。

package com.alibaba.alink;

import org.apache.flink.api.java.tuple.Tuple2;

import com.alibaba.alink.common.utils.Stopwatch;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.clustering.KMeansPredictBatchOp;
import com.alibaba.alink.operator.batch.clustering.KMeansTrainBatchOp;
import com.alibaba.alink.operator.batch.evaluation.EvalClusterBatchOp;
import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp;
import com.alibaba.alink.operator.batch.source.AkSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.clustering.KMeansPredictStreamOp;
import com.alibaba.alink.operator.stream.clustering.StreamingKMeansStreamOp;
import com.alibaba.alink.operator.stream.sink.AkSinkStreamOp;
import com.alibaba.alink.operator.stream.source.AkSourceStreamOp;
import com.alibaba.alink.params.shared.clustering.HasKMeansDistanceType.DistanceType;
import com.alibaba.alink.pipeline.Pipeline;
import com.alibaba.alink.pipeline.clustering.BisectingKMeans;
import com.alibaba.alink.pipeline.clustering.KMeans;

import java.io.File;
import java.util.ArrayList;

public class Chap18 {

	static final String DATA_DIR = Utils.ROOT_DIR + "mnist" + File.separator;

	static final String DENSE_TRAIN_FILE = "dense_train.ak";
	static final String SPARSE_TRAIN_FILE = "sparse_train.ak";

	static final String INIT_MODEL_FILE = "init_model.ak";
	static final String TEMP_STREAM_FILE = "temp_stream.ak";

	static final String VECTOR_COL_NAME = "vec";
	static final String LABEL_COL_NAME = "label";
	static final String PREDICTION_COL_NAME = "cluster_id";

	public static void main(String[] args) throws Exception {

		BatchOperator.setParallelism(4);

		c_1();

		c_2();

		c_3();

	}

	static void c_1() throws Exception {

		AkSourceBatchOp dense_source = new AkSourceBatchOp().setFilePath(DATA_DIR + DENSE_TRAIN_FILE);
		AkSourceBatchOp sparse_source = new AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
		Stopwatch sw = new Stopwatch();

		ArrayList <Tuple2 <String, Pipeline>> pipelineList = new ArrayList <>();
		pipelineList.add(new Tuple2 <>("KMeans EUCLIDEAN",
			new Pipeline()
				.add(
					new KMeans()
						.setK(10)
						.setVectorCol(VECTOR_COL_NAME)
						.setPredictionCol(PREDICTION_COL_NAME)
				)
		));
		pipelineList.add(new Tuple2 <>("KMeans COSINE",
			new Pipeline()
				.add(
					new KMeans()
						.setDistanceType(DistanceType.COSINE)
						.setK(10)
						.setVectorCol(VECTOR_COL_NAME)
						.setPredictionCol(PREDICTION_COL_NAME)
				)
		));
		pipelineList.add(new Tuple2 <>("BisectingKMeans",
			new Pipeline()
				.add(
					new BisectingKMeans()
						.setK(10)
						.setVectorCol(VECTOR_COL_NAME)
						.setPredictionCol(PREDICTION_COL_NAME)
				)
		));

		for (Tuple2 <String, Pipeline> pipelineTuple2 : pipelineList) {
			sw.reset();
			sw.start();
			pipelineTuple2.f1
				.fit(dense_source)
				.transform(dense_source)
				.link(
					new EvalClusterBatchOp()
						.setVectorCol(VECTOR_COL_NAME)
						.setPredictionCol(PREDICTION_COL_NAME)
						.setLabelCol(LABEL_COL_NAME)
						.lazyPrintMetrics(pipelineTuple2.f0 + " DENSE")
				);
			BatchOperator.execute();
			sw.stop();
			System.out.println(sw.getElapsedTimeSpan());

			sw.reset();
			sw.start();
			pipelineTuple2.f1
				.fit(sparse_source)
				.transform(sparse_source)
				.link(
					new EvalClusterBatchOp()
						.setVectorCol(VECTOR_COL_NAME)
						.setPredictionCol(PREDICTION_COL_NAME)
						.setLabelCol(LABEL_COL_NAME)
						.lazyPrintMetrics(pipelineTuple2.f0 + " SPARSE")
				);
			BatchOperator.execute();
			sw.stop();
			System.out.println(sw.getElapsedTimeSpan());
		}

	}

	static void c_2() throws Exception {
		AkSourceBatchOp batch_source = new AkSourceBatchOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);
		AkSourceStreamOp stream_source = new AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);

		if (!new File(DATA_DIR + INIT_MODEL_FILE).exists()) {
			batch_source
				.sampleWithSize(100)
				.link(
					new KMeansTrainBatchOp()
						.setVectorCol(VECTOR_COL_NAME)
						.setK(10)
				)
				.link(
					new AkSinkBatchOp()
						.setFilePath(DATA_DIR + INIT_MODEL_FILE)
				);
			BatchOperator.execute();
		}

		AkSourceBatchOp init_model = new AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

		new KMeansPredictBatchOp()
			.setPredictionCol(PREDICTION_COL_NAME)
			.linkFrom(init_model, batch_source)
			.link(
				new EvalClusterBatchOp()
					.setVectorCol(VECTOR_COL_NAME)
					.setPredictionCol(PREDICTION_COL_NAME)
					.setLabelCol(LABEL_COL_NAME)
					.lazyPrintMetrics("Batch Prediction")
			);
		BatchOperator.execute();

		stream_source
			.link(
				new KMeansPredictStreamOp(init_model)
					.setPredictionCol(PREDICTION_COL_NAME)
			)
			.link(
				new AkSinkStreamOp()
					.setFilePath(DATA_DIR + TEMP_STREAM_FILE)
					.setOverwriteSink(true)
			);
		StreamOperator.execute();

		new AkSourceBatchOp()
			.setFilePath(DATA_DIR + TEMP_STREAM_FILE)
			.link(
				new EvalClusterBatchOp()
					.setVectorCol(VECTOR_COL_NAME)
					.setPredictionCol(PREDICTION_COL_NAME)
					.setLabelCol(LABEL_COL_NAME)
					.lazyPrintMetrics("Stream Prediction")
			);
		BatchOperator.execute();
	}

	static void c_3() throws Exception {
		AkSourceStreamOp stream_source
			= new AkSourceStreamOp().setFilePath(DATA_DIR + SPARSE_TRAIN_FILE);

		AkSourceBatchOp init_model
			= new AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

		StreamOperator<?> stream_pred =  stream_source
			.link(
				new StreamingKMeansStreamOp(init_model)
					.setTimeInterval(1L)
					.setHalfLife(1)
					.setPredictionCol(PREDICTION_COL_NAME)
			)
			.select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME +", " + VECTOR_COL_NAME);

		stream_pred.sample(0.001).print();

		stream_pred
			.link(
				new AkSinkStreamOp()
					.setFilePath(DATA_DIR + TEMP_STREAM_FILE)
					.setOverwriteSink(true)
			);
		StreamOperator.execute();

		new AkSourceBatchOp()
			.setFilePath(DATA_DIR + TEMP_STREAM_FILE)
			.link(
				new EvalClusterBatchOp()
					.setVectorCol(VECTOR_COL_NAME)
					.setPredictionCol(PREDICTION_COL_NAME)
					.setLabelCol(LABEL_COL_NAME)
					.lazyPrintMetrics("StreamingKMeans")
			);
		BatchOperator.execute();
	}

}