本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 axAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充
7.6 Python数组、DataFrame形式的数据和Alink批式数据之间的相互转换
7.6.1 Python数组与DataFrame形式的数据之间的相互转换
7.6.2 将Alink批式数据转换为DataFrame形式的数据
7.6.3 将DataFrame形式的数据转换为Alink批式数据
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) pd.set_option('display.max_rows', 200) DATA_DIR = ROOT_DIR + "iris" + os.sep ORIGIN_FILE = "iris.data"; TRAIN_FILE = "train.ak"; TEST_FILE = "test.ak"; SCHEMA_STRING = "sepal_length double, sepal_width double, "\ + "petal_length double, petal_width double, category string" FEATURE_COL_NAMES = ["sepal_length", "sepal_width", "petal_length", "petal_width"] LABEL_COL_NAME = "category"; VECTOR_COL_NAME = "vec"; PREDICTION_COL_NAME = "pred";
#c_1_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .link( FirstNBatchOp().setSize(5) )\ .print(); source.firstN(5).print();
#c_1_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .sampleWithSize(50)\ .lazyPrintStatistics("< after sample with size 50 >")\ .sample(0.1)\ .print(); source\ .lazyPrintStatistics("< origin data >")\ .sampleWithSize(150, True)\ .lazyPrintStatistics("< after sample with size 150 >")\ .sample(0.03, True)\ .print(); source_stream = CsvSourceStreamOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source_stream.sample(0.1).print(); StreamOperator.execute();
# c_1_3 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .select("*, CASE category WHEN 'Iris-versicolor' THEN 1 " + "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight")\ .link( WeightSampleBatchOp()\ .setRatio(0.4)\ .setWeightCol("weight") )\ .groupBy("category", "category, COUNT(*) AS cnt")\ .print();
#c_1_4 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source\ .link( StratifiedSampleBatchOp()\ .setStrataCol("category")\ .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") )\ .groupBy("category", "category, COUNT(*) AS cnt")\ .print(); source_stream = CsvSourceStreamOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source_stream\ .link( StratifiedSampleStreamOp()\ .setStrataCol("category")\ .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8") )\ .print(); StreamOperator.execute();
#c_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); print("schema of source:"); print(source.getColNames()); spliter = SplitBatchOp().setFraction(0.9); source.link(spliter); print("schema of spliter's main output:"); print(spliter.getColNames()); print("count of spliter's side outputs:"); print(spliter.getSideOutputCount()); print("schema of spliter's side output :"); print(spliter.getSideOutput(0).getColNames()); spliter\ .lazyPrintStatistics("< Main Output >")\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TRAIN_FILE)\ .setOverwriteSink(True) ); spliter.getSideOutput(0)\ .lazyPrintStatistics("< Side Output >")\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TEST_FILE)\ .setOverwriteSink(True) ); BatchOperator.execute();
#c_3_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = StandardScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after Standard Scale >"); BatchOperator.execute();
#c_3_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after MinMax Scale >"); BatchOperator.execute();
#c_3_3 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING); source.lazyPrintStatistics("< Origin data >"); scaler = MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES); scaler\ .fit(source)\ .transform(source)\ .lazyPrintStatistics("< after MaxAbs Scale >"); BatchOperator.execute();
#c_4_1 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .link( VectorAssemblerBatchOp()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setOutputCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) ); source.link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< Origin data >") ); VectorStandardScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector Standard Scale >") ); VectorMinMaxScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector MinMax Scale >") ); VectorMaxAbsScaler()\ .setSelectedCol(VECTOR_COL_NAME)\ .fit(source)\ .transform(source)\ .link( VectorSummarizerBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .lazyPrintVectorSummary("< after Vector MaxAbs Scale >") ); BatchOperator.execute();
#c_4_2 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .link( VectorAssemblerBatchOp()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setOutputCol(VECTOR_COL_NAME)\ .setReservedCols([LABEL_COL_NAME]) ); source\ .link( VectorNormalizeBatchOp()\ .setSelectedCol(VECTOR_COL_NAME)\ .setP(1.0) )\ .firstN(5)\ .print();
#c_5 df = pd.DataFrame( [ ["a", 10.0, 100], ["b", -2.5, 9], ["c", 100.2, 1], ["d", -99.9, 100], [None, None, None] ] ) source = BatchOperator\ .fromDataframe(df, schemaStr='col1 string, col2 double, col3 double')\ .select("col1, col2, CAST(col3 AS INTEGER) AS col3") source.lazyPrint(-1, "< origin data >"); pipeline = Pipeline()\ .add( Imputer()\ .setSelectedCols(["col1"])\ .setStrategy('VALUE')\ .setFillValue("e") )\ .add( Imputer()\ .setSelectedCols(["col2", "col3"])\ .setStrategy('MEAN') ); pipeline.fit(source)\ .transform(source)\ .print();
#c_6 dict_arr = {'name':['Alice','Bob','Cindy'], 'value':[1,2,3]} pd.DataFrame(dict_arr)
arr_2D =[ ['Alice',1], ['Bob',2], ['Cindy',3] ] pd.DataFrame(arr_2D, columns=['name', 'value'] )
arr_2D =[ ['Alice',1], ['Bob',2], ['Cindy',3] ] pd.DataFrame(arr_2D, columns=['name', 'value'] )
# DataFrame <> BatchOperator source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string") source.firstN(5).print()
df_iris = source.collectToDataframe() df_iris.head()
iris = BatchOperator\ .fromDataframe( df_iris, "sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string" ); iris.firstN(5).print()