Alink教程(Python版)

第7章 基本数据处理

本章包括下面各节:
7.1 采样
7.1.1 取“前”_N_个数据
7.1.2 随机采样
7.1.3 加权采样
7.1.4 分层采样
7.2 数据划分
7.3 数值尺度变换
7.3.1 标准化
7.3.2 MinMaxScale
7.3.3 axAbsScale
7.4 向量的尺度变换
7.4.1 StandardScale、MinMaxScale、MaxAbsScale
7.4.2 正则化
7.5 缺失值填充
7.6 Python数组、DataFrame形式的数据和Alink批式数据之间的相互转换
7.6.1 Python数组与DataFrame形式的数据之间的相互转换
7.6.2 将Alink批式数据转换为DataFrame形式的数据
7.6.3 将DataFrame形式的数据转换为Alink批式数据

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.max_rows', 200)

DATA_DIR = ROOT_DIR + "iris" + os.sep

ORIGIN_FILE = "iris.data";

TRAIN_FILE = "train.ak";
TEST_FILE = "test.ak";

SCHEMA_STRING = "sepal_length double, sepal_width double, "\
                + "petal_length double, petal_width double, category string"

FEATURE_COL_NAMES = ["sepal_length", "sepal_width", "petal_length", "petal_width"]

LABEL_COL_NAME = "category";
VECTOR_COL_NAME = "vec";
PREDICTION_COL_NAME = "pred";

#c_1_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .link(
        FirstNBatchOp().setSize(5)
    )\
    .print();

source.firstN(5).print();

#c_1_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .sampleWithSize(50)\
    .lazyPrintStatistics("< after sample with size 50 >")\
    .sample(0.1)\
    .print();

source\
    .lazyPrintStatistics("< origin data >")\
    .sampleWithSize(150, True)\
    .lazyPrintStatistics("< after sample with size 150 >")\
    .sample(0.03, True)\
    .print();

source_stream = CsvSourceStreamOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source_stream.sample(0.1).print();

StreamOperator.execute();

# c_1_3

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .select("*, CASE category WHEN 'Iris-versicolor' THEN 1 "
            + "WHEN 'Iris-setosa' THEN 2 ELSE 4 END AS weight")\
    .link(
        WeightSampleBatchOp()\
            .setRatio(0.4)\
            .setWeightCol("weight")
    )\
    .groupBy("category", "category, COUNT(*) AS cnt")\
    .print();

#c_1_4

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source\
    .link(
        StratifiedSampleBatchOp()\
            .setStrataCol("category")\
            .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
    )\
    .groupBy("category", "category, COUNT(*) AS cnt")\
    .print();

source_stream = CsvSourceStreamOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source_stream\
    .link(
        StratifiedSampleStreamOp()\
            .setStrataCol("category")\
            .setStrataRatios("Iris-versicolor:0.2,Iris-setosa:0.4,Iris-virginica:0.8")
    )\
    .print();

StreamOperator.execute();

#c_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

print("schema of source:");
print(source.getColNames());

spliter = SplitBatchOp().setFraction(0.9);

source.link(spliter);

print("schema of spliter's main output:");
print(spliter.getColNames());

print("count of spliter's side outputs:");
print(spliter.getSideOutputCount());

print("schema of spliter's side output :");
print(spliter.getSideOutput(0).getColNames());

spliter\
    .lazyPrintStatistics("< Main Output >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + TRAIN_FILE)\
            .setOverwriteSink(True)
    );

spliter.getSideOutput(0)\
    .lazyPrintStatistics("< Side Output >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + TEST_FILE)\
            .setOverwriteSink(True)
    );

BatchOperator.execute();
#c_3_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = StandardScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after Standard Scale >");

BatchOperator.execute();

#c_3_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = MinMaxScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after MinMax Scale >");

BatchOperator.execute();

#c_3_3

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING);

source.lazyPrintStatistics("< Origin data >");

scaler = MaxAbsScaler().setSelectedCols(FEATURE_COL_NAMES);

scaler\
    .fit(source)\
    .transform(source)\
    .lazyPrintStatistics("< after MaxAbs Scale >");

BatchOperator.execute();

#c_4_1

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)\
    .link(
        VectorAssemblerBatchOp()\
            .setSelectedCols(FEATURE_COL_NAMES)\
            .setOutputCol(VECTOR_COL_NAME)\
            .setReservedCols([LABEL_COL_NAME])
    );

source.link(
    VectorSummarizerBatchOp()\
        .setSelectedCol(VECTOR_COL_NAME)\
        .lazyPrintVectorSummary("< Origin data >")
);

VectorStandardScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector Standard Scale >")
    );

VectorMinMaxScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector MinMax Scale >")
    );

VectorMaxAbsScaler()\
    .setSelectedCol(VECTOR_COL_NAME)\
    .fit(source)\
    .transform(source)\
    .link(
        VectorSummarizerBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .lazyPrintVectorSummary("< after Vector MaxAbs Scale >")
    );

BatchOperator.execute();
#c_4_2

source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)\
    .link(
        VectorAssemblerBatchOp()\
            .setSelectedCols(FEATURE_COL_NAMES)\
            .setOutputCol(VECTOR_COL_NAME)\
            .setReservedCols([LABEL_COL_NAME])
    );

source\
    .link(
        VectorNormalizeBatchOp()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .setP(1.0)
    )\
    .firstN(5)\
    .print();

#c_5

df = pd.DataFrame(
    [
        ["a", 10.0, 100],
        ["b", -2.5, 9],
        ["c", 100.2, 1],
        ["d", -99.9, 100],
        [None, None, None]
    ]
)  
source = BatchOperator\
    .fromDataframe(df, schemaStr='col1 string, col2 double, col3 double')\
    .select("col1, col2, CAST(col3 AS INTEGER) AS col3")


source.lazyPrint(-1, "< origin data >");

pipeline = Pipeline()\
    .add(
        Imputer()\
            .setSelectedCols(["col1"])\
            .setStrategy('VALUE')\
            .setFillValue("e")
    )\
    .add(
        Imputer()\
            .setSelectedCols(["col2", "col3"])\
            .setStrategy('MEAN')
    );

pipeline.fit(source)\
    .transform(source)\
    .print();

#c_6

dict_arr = {'name':['Alice','Bob','Cindy'], 
            'value':[1,2,3]}

pd.DataFrame(dict_arr)
arr_2D =[
    ['Alice',1],
    ['Bob',2],
    ['Cindy',3]
]

pd.DataFrame(arr_2D, columns=['name', 'value'] )  
arr_2D =[
    ['Alice',1],
    ['Bob',2],
    ['Cindy',3]
]

pd.DataFrame(arr_2D, columns=['name', 'value'] )  
# DataFrame <> BatchOperator
source = CsvSourceBatchOp()\
    .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data")\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")
source.firstN(5).print()
df_iris = source.collectToDataframe()
df_iris.head()
iris = BatchOperator\
    .fromDataframe(
        df_iris,
        "sepal_length double, sepal_width double, petal_length double, " 
        + "petal_width double, category string"
    );

iris.firstN(5).print()