本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 MaxAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink; import org.apache.flink.types.Row; import com.alibaba.alink.common.utils.Stopwatch; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.dataproc.FirstNBatchOp; import com.alibaba.alink.operator.batch.dataproc.SplitBatchOp; import com.alibaba.alink.operator.batch.dataproc.StratifiedSampleBatchOp; import com.alibaba.alink.operator.batch.dataproc.WeightSampleBatchOp; import com.alibaba.alink.operator.batch.dataproc.vector.VectorAssemblerBatchOp; import com.alibaba.alink.operator.batch.dataproc.vector.VectorNormalizeBatchOp; import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; import com.alibaba.alink.operator.batch.source.MemSourceBatchOp; import com.alibaba.alink.operator.batch.statistics.VectorSummarizerBatchOp; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.dataproc.StratifiedSampleStreamOp; import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp; import com.alibaba.alink.params.dataproc.HasStrategy.Strategy; import com.alibaba.alink.pipeline.Pipeline; import com.alibaba.alink.pipeline.dataproc.Imputer; import com.alibaba.alink.pipeline.dataproc.MaxAbsScaler; import com.alibaba.alink.pipeline.dataproc.MinMaxScaler; import com.alibaba.alink.pipeline.dataproc.StandardScaler; import com.alibaba.alink.pipeline.dataproc.vector.VectorMaxAbsScaler; import com.alibaba.alink.pipeline.dataproc.vector.VectorMinMaxScaler; import com.alibaba.alink.pipeline.dataproc.vector.VectorStandardScaler; import java.io.File; public class Chap07 { static final String DATA_DIR = Utils.ROOT_DIR + "iris" + File.separator; static final String ORIGIN_FILE = "iris.data"; static final String TRAIN_FILE = "train.ak"; static final String TEST_FILE = "test.ak"; static final String SCHEMA_STRING = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"; static final String[] FEATURE_COL_NAMES = new String[] {"sepal_length", "sepal_width", "petal_length", "petal_width"}; static final String LABEL_COL_NAME = "category"; static final String VECTOR_COL_NAME = "vec"; static final String PREDICTION_COL_NAME = "pred"; public static void main(String[] args) throws Exception { BatchOperator.setParallelism(1); c_1_1(); c_1_2(); c_1_3(); c_1_4(); c_2(); c_3_1(); c_3_2(); c_3_3(); c_4_1(); c_4_2(); c_5(); } static void c_1_1() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source .link( new FirstNBatchOp() .setSize(5) ) .print(); source.firstN(5).print(); } static void c_1_2() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source .sampleWithSize(50) .lazyPrintStatistics("< after sample with size 50 >") .sample(0.1) .print(); source .lazyPrintStatistics("< origin data >") .sampleWithSize(150, true) .lazyPrintStatistics("< after sample with size 150 >") .sample(0.03, true) .print(); CsvSourceStreamOp source_stream = new CsvSourceStreamOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source_stream.sample(0.1).print(); StreamOperator.execute(); } static void c_1_3() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source .select("*, CASE category WHEN 'Iris-versicolor' THEN 1 " + "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight") .link( new WeightSampleBatchOp() .setRatio(0.4) .setWeightCol("weight") ) .groupBy("category", "category, COUNT(*) AS cnt") .print(); } static void c_1_4() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source .link( new StratifiedSampleBatchOp() .setStrataCol("category") .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") ) .groupBy("category", "category, COUNT(*) AS cnt") .print(); CsvSourceStreamOp source_stream = new CsvSourceStreamOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source_stream .link( new StratifiedSampleStreamOp() .setStrataCol("category") .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") ) .print(); StreamOperator.execute(); } static void c_2() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); System.out.println("schema of source:"); System.out.println(source.getSchema()); SplitBatchOp spliter = new SplitBatchOp().setFraction(0.9); source.link(spliter); System.out.println("schema of spliter's main output:"); System.out.println(spliter.getSchema()); System.out.println("count of spliter's side outputs:"); System.out.println(spliter.getSideOutputCount()); System.out.println("schema of spliter's side output :"); System.out.println(spliter.getSideOutput(0).getSchema()); spliter .lazyPrintStatistics("< Main Output >") .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + TRAIN_FILE) .setOverwriteSink(true) ); spliter.getSideOutput(0) .lazyPrintStatistics("< Side Output >") .link( new AkSinkBatchOp() .setFilePath(DATA_DIR + TEST_FILE) .setOverwriteSink(true) ); BatchOperator.execute(); } static void c_3_1() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); StandardScaler scaler = new StandardScaler().setSelectedCols(FEATURE_COL_NAMES); scaler .fit(source) .transform(source) .lazyPrintStatistics("< after Standard Scale >"); BatchOperator.execute(); } static void c_3_2() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); MinMaxScaler scaler = new MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES); scaler .fit(source) .transform(source) .lazyPrintStatistics("< after MinMax Scale >"); BatchOperator.execute(); } static void c_3_3() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); MaxAbsScaler scaler = new MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES); scaler .fit(source) .transform(source) .lazyPrintStatistics("< after MaxAbs Scale >"); BatchOperator.execute(); } static void c_4_1() throws Exception { BatchOperator <?> source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING) .link( new VectorAssemblerBatchOp() .setSelectedCols(FEATURE_COL_NAMES) .setOutputCol(VECTOR_COL_NAME) .setReservedCols(LABEL_COL_NAME) ); source.link( new VectorSummarizerBatchOp() .setSelectedCol(VECTOR_COL_NAME) .lazyPrintVectorSummary("< Origin data >") ); new VectorStandardScaler() .setSelectedCol(VECTOR_COL_NAME) .fit(source) .transform(source) .link( new VectorSummarizerBatchOp() .setSelectedCol(VECTOR_COL_NAME) .lazyPrintVectorSummary("< after Vector Standard Scale >") ); new VectorMinMaxScaler() .setSelectedCol(VECTOR_COL_NAME) .fit(source) .transform(source) .link( new VectorSummarizerBatchOp() .setSelectedCol(VECTOR_COL_NAME) .lazyPrintVectorSummary("< after Vector MinMax Scale >") ); new VectorMaxAbsScaler() .setSelectedCol(VECTOR_COL_NAME) .fit(source) .transform(source) .link( new VectorSummarizerBatchOp() .setSelectedCol(VECTOR_COL_NAME) .lazyPrintVectorSummary("< after Vector MaxAbs Scale >") ); BatchOperator.execute(); } static void c_4_2() throws Exception { BatchOperator <?> source = new CsvSourceBatchOp() .setFilePath(DATA_DIR + ORIGIN_FILE) .setSchemaStr(SCHEMA_STRING) .link( new VectorAssemblerBatchOp() .setSelectedCols(FEATURE_COL_NAMES) .setOutputCol(VECTOR_COL_NAME) .setReservedCols(LABEL_COL_NAME) ); source .link( new VectorNormalizeBatchOp() .setSelectedCol(VECTOR_COL_NAME) .setP(1.0) ) .firstN(5) .print(); } static void c_5() throws Exception { Row[] rows = new Row[] { Row.of("a", 10.0, 100), Row.of("b", -2.5, 9), Row.of("c", 100.2, 1), Row.of("d", -99.9, 100), Row.of(null, null, null) }; MemSourceBatchOp source = new MemSourceBatchOp(rows, new String[] {"col1", "col2", "col3"}); source.lazyPrint(-1, "< origin data >"); Pipeline pipeline = new Pipeline() .add( new Imputer() .setSelectedCols("col1") .setStrategy(Strategy.VALUE) .setFillValue("e") ) .add( new Imputer() .setSelectedCols("col2", "col3") .setStrategy(Strategy.MEAN) ); pipeline.fit(source) .transform(source) .print(); System.out.println(210/4); } }