本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 MaxAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink;
import org.apache.flink.types.Row;
import com.alibaba.alink.common.utils.Stopwatch;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.dataproc.FirstNBatchOp;
import com.alibaba.alink.operator.batch.dataproc.SplitBatchOp;
import com.alibaba.alink.operator.batch.dataproc.StratifiedSampleBatchOp;
import com.alibaba.alink.operator.batch.dataproc.WeightSampleBatchOp;
import com.alibaba.alink.operator.batch.dataproc.vector.VectorAssemblerBatchOp;
import com.alibaba.alink.operator.batch.dataproc.vector.VectorNormalizeBatchOp;
import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
import com.alibaba.alink.operator.batch.statistics.VectorSummarizerBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.dataproc.StratifiedSampleStreamOp;
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
import com.alibaba.alink.params.dataproc.HasStrategy.Strategy;
import com.alibaba.alink.pipeline.Pipeline;
import com.alibaba.alink.pipeline.dataproc.Imputer;
import com.alibaba.alink.pipeline.dataproc.MaxAbsScaler;
import com.alibaba.alink.pipeline.dataproc.MinMaxScaler;
import com.alibaba.alink.pipeline.dataproc.StandardScaler;
import com.alibaba.alink.pipeline.dataproc.vector.VectorMaxAbsScaler;
import com.alibaba.alink.pipeline.dataproc.vector.VectorMinMaxScaler;
import com.alibaba.alink.pipeline.dataproc.vector.VectorStandardScaler;
import java.io.File;
public class Chap07 {
static final String DATA_DIR = Utils.ROOT_DIR + "iris" + File.separator;
static final String ORIGIN_FILE = "iris.data";
static final String TRAIN_FILE = "train.ak";
static final String TEST_FILE = "test.ak";
static final String SCHEMA_STRING
= "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
static final String[] FEATURE_COL_NAMES
= new String[] {"sepal_length", "sepal_width", "petal_length", "petal_width"};
static final String LABEL_COL_NAME = "category";
static final String VECTOR_COL_NAME = "vec";
static final String PREDICTION_COL_NAME = "pred";
public static void main(String[] args) throws Exception {
BatchOperator.setParallelism(1);
c_1_1();
c_1_2();
c_1_3();
c_1_4();
c_2();
c_3_1();
c_3_2();
c_3_3();
c_4_1();
c_4_2();
c_5();
}
static void c_1_1() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source
.link(
new FirstNBatchOp()
.setSize(5)
)
.print();
source.firstN(5).print();
}
static void c_1_2() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source
.sampleWithSize(50)
.lazyPrintStatistics("< after sample with size 50 >")
.sample(0.1)
.print();
source
.lazyPrintStatistics("< origin data >")
.sampleWithSize(150, true)
.lazyPrintStatistics("< after sample with size 150 >")
.sample(0.03, true)
.print();
CsvSourceStreamOp source_stream =
new CsvSourceStreamOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source_stream.sample(0.1).print();
StreamOperator.execute();
}
static void c_1_3() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source
.select("*, CASE category WHEN 'Iris-versicolor' THEN 1 "
+ "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight")
.link(
new WeightSampleBatchOp()
.setRatio(0.4)
.setWeightCol("weight")
)
.groupBy("category", "category, COUNT(*) AS cnt")
.print();
}
static void c_1_4() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source
.link(
new StratifiedSampleBatchOp()
.setStrataCol("category")
.setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
)
.groupBy("category", "category, COUNT(*) AS cnt")
.print();
CsvSourceStreamOp source_stream =
new CsvSourceStreamOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source_stream
.link(
new StratifiedSampleStreamOp()
.setStrataCol("category")
.setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
)
.print();
StreamOperator.execute();
}
static void c_2() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
System.out.println("schema of source:");
System.out.println(source.getSchema());
SplitBatchOp spliter = new SplitBatchOp().setFraction(0.9);
source.link(spliter);
System.out.println("schema of spliter's main output:");
System.out.println(spliter.getSchema());
System.out.println("count of spliter's side outputs:");
System.out.println(spliter.getSideOutputCount());
System.out.println("schema of spliter's side output :");
System.out.println(spliter.getSideOutput(0).getSchema());
spliter
.lazyPrintStatistics("< Main Output >")
.link(
new AkSinkBatchOp()
.setFilePath(DATA_DIR + TRAIN_FILE)
.setOverwriteSink(true)
);
spliter.getSideOutput(0)
.lazyPrintStatistics("< Side Output >")
.link(
new AkSinkBatchOp()
.setFilePath(DATA_DIR + TEST_FILE)
.setOverwriteSink(true)
);
BatchOperator.execute();
}
static void c_3_1() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source.lazyPrintStatistics("< Origin data >");
StandardScaler scaler = new StandardScaler().setSelectedCols(FEATURE_COL_NAMES);
scaler
.fit(source)
.transform(source)
.lazyPrintStatistics("< after Standard Scale >");
BatchOperator.execute();
}
static void c_3_2() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source.lazyPrintStatistics("< Origin data >");
MinMaxScaler scaler = new MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES);
scaler
.fit(source)
.transform(source)
.lazyPrintStatistics("< after MinMax Scale >");
BatchOperator.execute();
}
static void c_3_3() throws Exception {
CsvSourceBatchOp source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING);
source.lazyPrintStatistics("< Origin data >");
MaxAbsScaler scaler = new MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES);
scaler
.fit(source)
.transform(source)
.lazyPrintStatistics("< after MaxAbs Scale >");
BatchOperator.execute();
}
static void c_4_1() throws Exception {
BatchOperator <?> source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING)
.link(
new VectorAssemblerBatchOp()
.setSelectedCols(FEATURE_COL_NAMES)
.setOutputCol(VECTOR_COL_NAME)
.setReservedCols(LABEL_COL_NAME)
);
source.link(
new VectorSummarizerBatchOp()
.setSelectedCol(VECTOR_COL_NAME)
.lazyPrintVectorSummary("< Origin data >")
);
new VectorStandardScaler()
.setSelectedCol(VECTOR_COL_NAME)
.fit(source)
.transform(source)
.link(
new VectorSummarizerBatchOp()
.setSelectedCol(VECTOR_COL_NAME)
.lazyPrintVectorSummary("< after Vector Standard Scale >")
);
new VectorMinMaxScaler()
.setSelectedCol(VECTOR_COL_NAME)
.fit(source)
.transform(source)
.link(
new VectorSummarizerBatchOp()
.setSelectedCol(VECTOR_COL_NAME)
.lazyPrintVectorSummary("< after Vector MinMax Scale >")
);
new VectorMaxAbsScaler()
.setSelectedCol(VECTOR_COL_NAME)
.fit(source)
.transform(source)
.link(
new VectorSummarizerBatchOp()
.setSelectedCol(VECTOR_COL_NAME)
.lazyPrintVectorSummary("< after Vector MaxAbs Scale >")
);
BatchOperator.execute();
}
static void c_4_2() throws Exception {
BatchOperator <?> source =
new CsvSourceBatchOp()
.setFilePath(DATA_DIR + ORIGIN_FILE)
.setSchemaStr(SCHEMA_STRING)
.link(
new VectorAssemblerBatchOp()
.setSelectedCols(FEATURE_COL_NAMES)
.setOutputCol(VECTOR_COL_NAME)
.setReservedCols(LABEL_COL_NAME)
);
source
.link(
new VectorNormalizeBatchOp()
.setSelectedCol(VECTOR_COL_NAME)
.setP(1.0)
)
.firstN(5)
.print();
}
static void c_5() throws Exception {
Row[] rows = new Row[] {
Row.of("a", 10.0, 100),
Row.of("b", -2.5, 9),
Row.of("c", 100.2, 1),
Row.of("d", -99.9, 100),
Row.of(null, null, null)
};
MemSourceBatchOp source
= new MemSourceBatchOp(rows, new String[] {"col1", "col2", "col3"});
source.lazyPrint(-1, "< origin data >");
Pipeline pipeline = new Pipeline()
.add(
new Imputer()
.setSelectedCols("col1")
.setStrategy(Strategy.VALUE)
.setFillValue("e")
)
.add(
new Imputer()
.setSelectedCols("col2", "col3")
.setStrategy(Strategy.MEAN)
);
pipeline.fit(source)
.transform(source)
.print();
System.out.println(210/4);
}
}