Alink教程(Python版)

第7章 基本数据处理

本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 MaxAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.max_rows', 200)

DATA_DIR = ROOT_DIR + "iris" + os.sep

ORIGIN_FILE = "iris.data";

TRAIN_FILE = "train.ak";
TEST_FILE = "test.ak";

SCHEMA_STRING = "sepal_length double, sepal_width double, "\
                + "petal_length double, petal_width double, category string"

FEATURE_COL_NAMES = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

LABEL_COL_NAME = "category";
VECTOR_COL_NAME = "vec";
PREDICTION_COL_NAME = "pred";

#c_1_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .link(
        FirstNBatchOp().setSize(5)
    )\
    .print();

source.firstN(5).print();

#c_1_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .sampleWithSize(50)\
    .lazyPrintStatistics("< after sample with size 50 >")\
    .sample(0.1)\
    .print();

source\
    .lazyPrintStatistics("< origin data >")\
    .sampleWithSize(150, True)\
    .lazyPrintStatistics("< after sample with size 150 >")\
    .sample(0.03, True)\
    .print();

source_stream = CsvSourceStreamOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source_stream.sample(0.1).print();

StreamOperator.execute();

# c_1_3

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .select("*, CASE category WHEN 'Iris-versicolor' THEN 1 "
            + "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight")\
    .link(
        WeightSampleBatchOp()\
            .setRatio(0.4)\
            .setWeightCol("weight")
    )\
    .groupBy("category", "category, COUNT(*) AS cnt")\
    .print();

#c_1_4

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .link(
        StratifiedSampleBatchOp()\
            .setStrataCol("category")\
            .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
    )\
    .groupBy("category", "category, COUNT(*) AS cnt")\
    .print();

source_stream = CsvSourceStreamOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source_stream\
    .link(
        StratifiedSampleStreamOp()\
            .setStrataCol("category")\
            .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
    )\
    .print();

StreamOperator.execute();

#c_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

print("schema of source:");
print(source.getColNames());

spliter = SplitBatchOp().setFraction(0.9);

source.link(spliter);

print("schema of spliter's main output:");
print(spliter.getColNames());

print("count of spliter's side outputs:");
print(spliter.getSideOutputCount());

print("schema of spliter's side output :");
print(spliter.getSideOutput(0).getColNames());

spliter\
    .lazyPrintStatistics("< Main Output >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + TRAIN_FILE)\
            .setOverwriteSink(True)
    );

spliter.getSideOutput(0)\
    .lazyPrintStatistics("< Side Output >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + TEST_FILE)\
            .setOverwriteSink(True)
    );

BatchOperator.execute();
#c_3_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = StandardScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after Standard Scale >");

BatchOperator.execute();

#c_3_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after MinMax Scale >");

BatchOperator.execute();

#c_3_3

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after MaxAbs Scale >");

BatchOperator.execute();

#c_4_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)\
    .link(
        VectorAssemblerBatchOp()\
            .setSelectedCols(FEATURE_COL_NAMES)\
            .setOutputCol(VECTOR_COL_NAME)\
            .setReservedCols([LABEL_COL_NAME])
    );

source.link(
    VectorSummarizerBatchOp()\
        .setSelectedCol(VECTOR_COL_NAME)\
        .lazyPrintVectorSummary("< Origin data >")
);

VectorStandardScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector Standard Scale >")
    );

VectorMinMaxScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector MinMax Scale >")
    );

VectorMaxAbsScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector MaxAbs Scale >")
    );

BatchOperator.execute();
#c_4_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)\
    .link(
        VectorAssemblerBatchOp()\
            .setSelectedCols(FEATURE_COL_NAMES)\
            .setOutputCol(VECTOR_COL_NAME)\
            .setReservedCols([LABEL_COL_NAME])
    );

source\
    .link(
        VectorNormalizeBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .setP(1.0)
    )\
    .firstN(5)\
    .print();

#c_5

df = pd.DataFrame(
    [
        ["a", 10.0, 100],
        ["b", -2.5, 9],
        ["c", 100.2, 1],
        ["d", -99.9, 100],
        [None, None, None]
    ]
)  
source = BatchOperator\
    .fromDataframe(df, schemaStr='col1 string, col2 double, col3 double')\
    .select("col1, col2, CAST(col3 AS INTEGER) AS col3")


source.lazyPrint(-1, "< origin data >");

pipeline = Pipeline()\
    .add(
        Imputer()\
            .setSelectedCols(["col1"])\
            .setStrategy('VALUE')\
            .setFillValue("e")
    )\
    .add(
        Imputer()\
            .setSelectedCols(["col2", "col3"])\
            .setStrategy('MEAN')
    );

pipeline.fit(source)\
    .transform(source)\
    .print();

#c_6

dict_arr = {'name':['Alice','Bob','Cindy'], 
            'value':[1,2,3]}

pd.DataFrame(dict_arr)
arr_2D =[
    ['Alice',1],
    ['Bob',2],
    ['Cindy',3]
]

pd.DataFrame(arr_2D, columns=['name', 'value'] )  
arr_2D =[
    ['Alice',1],
    ['Bob',2],
    ['Cindy',3]
]

pd.DataFrame(arr_2D, columns=['name', 'value'] )  
# DataFrame <> BatchOperator
source = CsvSourceBatchOp()\
    .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")
source.firstN(5).print()
df_iris = source.collectToDataframe()
df_iris.head()
iris = BatchOperator\
    .fromDataframe(
        df_iris,
        "sepal_length double, sepal_width double, petal_length double, " 
        + "petal_width double, category string"
    );

iris.firstN(5).print()