第2章 系统概况与核心概念

本章包括下面各节:
2.1 基本概念
2.2 批式任务与流式任务
2.3 Alink=A+link
2.3.1 BatchOperator和StreamOperator
2.3.2 link方式是批式算法/流式算法的通用使用方式
2.3.3 link的简化
2.3.4 组件的主输出与侧输出
2.4 Pipeline与PipelineModel
2.4.1 概念和定义
2.4.2 深入介绍
2.5 触发Alink任务的执行
2.6 模型信息显示
2.7 文件系统与数据库
2.8 Schema String

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

DATA_DIR = ROOT_DIR + "temp" + os.sep

TREE_MODEL_FILE = "tree_model_19.ak"
PIPELINE_MODEL_FILE = "pipeline_model_19.ak"
#c_5_1
# https://www.yuque.com/pinshu/alink_tutorial/book_python_2_5_1

source = CsvSourceBatchOp()\
    .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
        + "/iris/iris.data")\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, "
        + "petal_width double, category string")

source\
    .lazyPrint()\
    .lazyPrint(title=">>> print with title.")\
    .lazyPrint(2)\
    .lazyPrint(2, ">>> print 2 rows with title.")\
    .lazyPrintStatistics()\
    .lazyPrintStatistics(">>> summary of current data.")\
    .lazyCollectToDataframe(lambda df : print("number of rows : " + str(len(df))))\
    .lazyCollectStatistics(lambda tableSummary : 
                           print("number of valid values :" 
                                 + str(tableSummary.numValidValue("sepal_length")) 
                                 + "\nnumber of missing values :" 
                                 + str(tableSummary.numMissingValue("sepal_length"))))\
    .link(
        SelectBatchOp()\
            .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio")
    )\
    .lazyPrint(title=">>> final data")\
    .lazyPrintStatistics(">>> summary of final data.")

BatchOperator.execute()

Pipeline()\
    .add(
        Select()\
            .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio")\
            .enableLazyPrintTransformData(5, ">>> output data after Select")\
            .enableLazyPrintTransformStat(">>> summary of data after Select ")
    )\
    .add(
        StandardScaler()\
            .setSelectedCols(["sepal_length", "sepal_width"])\
            .enableLazyPrintModelInfo(">>> model info")\
            .enableLazyPrintTransformData(5, ">>> output data after StandardScaler")\
            .enableLazyPrintTransformStat(">>> summary of data after StandardScaler")\
    )\
    .fit(source)\
    .transform(source)\
    .lazyPrint(title=">>> output data after the whole pipeline")

BatchOperator.execute()
#c_6

df = pd.DataFrame(
    [
        ["sunny", 85.0, 85.0, False, "no"],
        ["sunny", 80.0, 90.0, True, "no"],
        ["overcast", 83.0, 78.0, False, "yes"],
        ["rainy", 70.0, 96.0, False, "yes"],
        ["rainy", 68.0, 80.0, False, "yes"],
        ["rainy", 65.0, 70.0, True, "no"],
        ["overcast", 64.0, 65.0, True, "yes"],
        ["sunny", 72.0, 95.0, False, "no"],
        ["sunny", 69.0, 70.0, False, "yes"],
        ["rainy", 75.0, 80.0, False, "yes"],
        ["sunny", 75.0, 70.0, True, "yes"],
        ["overcast", 72.0, 90.0, True, "yes"],
        ["overcast", 81.0, 75.0, False, "yes"],
        ["rainy", 71.0, 80.0, True, "no"]
    ]
)

source = BatchOperator.fromDataframe(df, schemaStr="outlook string, Temperature double, Humidity double, Windy boolean, play string")

source\
    .link(
        C45TrainBatchOp()\
            .setFeatureCols(["outlook", "Temperature", "Humidity", "Windy"])\
            .setCategoricalCols(["outlook", "Windy"])\
            .setLabelCol("play")
    )\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + TREE_MODEL_FILE)\
            .setOverwriteSink(True)
    )
BatchOperator.execute()

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + TREE_MODEL_FILE)\
    .link(
        DecisionTreeModelInfoBatchOp()\
            .lazyPrintModelInfo()\
            .lazyCollectModelInfo(
                lambda decisionTreeModelInfo: 
                    decisionTreeModelInfo.saveTreeAsImage(
                        DATA_DIR + "tree_model.png", True)
            )
    )
BatchOperator.execute()


if os.path.exists(DATA_DIR + PIPELINE_MODEL_FILE):
    os.remove(DATA_DIR + PIPELINE_MODEL_FILE)


df = pd.DataFrame(
    [
        [2009, 0.5],
        [2010, 9.36],
        [2011, 52.0],
        [2012, 191.0],
        [2013, 350.0],
        [2014, 571.0],
        [2015, 912.0],
        [2016, 1207.0],
        [2017, 1682.0]
    ]
)  
train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double')

pipeline = Pipeline()\
    .add(
        Select().setClause("*, x*x AS x2")
    )\
    .add(
        LinearRegression()\
            .setFeatureCols(["x", "x2"])\
            .setLabelCol("gmv")\
            .setPredictionCol("pred")
    )

pipeline.fit(train_set).save(DATA_DIR + PIPELINE_MODEL_FILE)
BatchOperator.execute()

pipelineModel = PipelineModel.load(DATA_DIR + PIPELINE_MODEL_FILE);

stages = pipelineModel.getTransformers()

for i in range(2) :
    print(str(i) + "\t" + str(stages[i]));

stages[1].getModelData()\
    .link(
        LinearRegModelInfoBatchOp().lazyPrintModelInfo()
    )
BatchOperator.execute()