本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 MaxAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) pd.set_option('display.max_rows', 200) DATA_DIR = ROOT_DIR + "iris" + os.sep ORIGIN_FILE = "iris.data"; TRAIN_FILE = "train.ak"; TEST_FILE = "test.ak"; SCHEMA_STRING = "sepal_length double, sepal_width double, "\ + "petal_length double, petal_width double, category string" FEATURE_COL_NAMES = ["sepal_length", "sepal_width", "petal_length", "petal_width"] LABEL_COL_NAME = "category"; VECTOR_COL_NAME = "vec"; PREDICTION_COL_NAME = "pred";
#c_1_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .link( FirstNBatchOp().setSize(5) )\ .print(); source.firstN(5).print();
#c_1_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .sampleWithSize(50)\ .lazyPrintStatistics("< after sample with size 50 >")\ .sample(0.1)\ .print(); source\ .lazyPrintStatistics("< origin data >")\ .sampleWithSize(150, True)\ .lazyPrintStatistics("< after sample with size 150 >")\ .sample(0.03, True)\ .print(); source_stream = CsvSourceStreamOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source_stream.sample(0.1).print(); StreamOperator.execute();
# c_1_3 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .select("*, CASE category WHEN 'Iris-versicolor' THEN 1 " + "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight")\ .link( WeightSampleBatchOp()\ .setRatio(0.4)\ .setWeightCol("weight") )\ .groupBy("category", "category, COUNT(*) AS cnt")\ .print();
#c_1_4 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .link( StratifiedSampleBatchOp()\ .setStrataCol("category")\ .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") )\ .groupBy("category", "category, COUNT(*) AS cnt")\ .print(); source_stream = CsvSourceStreamOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source_stream\ .link( StratifiedSampleStreamOp()\ .setStrataCol("category")\ .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") )\ .print(); StreamOperator.execute();
#c_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); print("schema of source:"); print(source.getColNames()); spliter = SplitBatchOp().setFraction(0.9); source.link(spliter); print("schema of spliter's main output:"); print(spliter.getColNames()); print("count of spliter's side outputs:"); print(spliter.getSideOutputCount()); print("schema of spliter's side output :"); print(spliter.getSideOutput(0).getColNames()); spliter\ .lazyPrintStatistics("< Main Output >")\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TRAIN_FILE)\ .setOverwriteSink(True) ); spliter.getSideOutput(0)\ .lazyPrintStatistics("< Side Output >")\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TEST_FILE)\ .setOverwriteSink(True) ); BatchOperator.execute();
#c_3_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = StandardScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after Standard Scale >"); BatchOperator.execute();
#c_3_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after MinMax Scale >"); BatchOperator.execute();
#c_3_3 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after MaxAbs Scale >"); BatchOperator.execute();
#c_4_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .link( VectorAssemblerBatchOp()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setOutputCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) ); source.link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< Origin data >") ); VectorStandardScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector Standard Scale >") ); VectorMinMaxScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector MinMax Scale >") ); VectorMaxAbsScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector MaxAbs Scale >") ); BatchOperator.execute();
#c_4_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .link( VectorAssemblerBatchOp()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setOutputCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) ); source\ .link( VectorNormalizeBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .setP(1.0) )\ .firstN(5)\ .print();
#c_5 df = pd.DataFrame( [ ["a", 10.0, 100], ["b", -2.5, 9], ["c", 100.2, 1], ["d", -99.9, 100], [None, None, None] ] ) source = BatchOperator\ .fromDataframe(df, schemaStr='col1 string, col2 double, col3 double')\ .select("col1, col2, CAST(col3 AS INTEGER) AS col3") source.lazyPrint(-1, "< origin data >"); pipeline = Pipeline()\ .add( Imputer()\ .setSelectedCols(["col1"])\ .setStrategy('VALUE')\ .setFillValue("e") )\ .add( Imputer()\ .setSelectedCols(["col2", "col3"])\ .setStrategy('MEAN') ); pipeline.fit(source)\ .transform(source)\ .print();
#c_6 dict_arr = {'name':['Alice','Bob','Cindy'], 'value':[1,2,3]} pd.DataFrame(dict_arr)
arr_2D =[ ['Alice',1], ['Bob',2], ['Cindy',3] ] pd.DataFrame(arr_2D, columns=['name', 'value'] )
arr_2D =[ ['Alice',1], ['Bob',2], ['Cindy',3] ] pd.DataFrame(arr_2D, columns=['name', 'value'] )
# DataFrame <> BatchOperator source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string") source.firstN(5).print()
df_iris = source.collectToDataframe() df_iris.head()
iris = BatchOperator\ .fromDataframe( df_iris, "sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string" ); iris.firstN(5).print()