本章包括下面各节:
2.1 基本概念
2.2 批式任务与流式任务
2.3 Alink=A+link
2.3.1 BatchOperator和StreamOperator
2.3.2 link方式是批式算法/流式算法的通用使用方式
2.3.3 link的简化
2.3.4 组件的主输出与侧输出
2.4 Pipeline与PipelineModel
2.4.1 概念和定义
2.4.2 深入介绍
2.5 触发Alink任务的执行
2.6 模型信息显示
2.7 文件系统与数据库
2.8 Schema String
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "temp" + os.sep TREE_MODEL_FILE = "tree_model_19.ak" PIPELINE_MODEL_FILE = "pipeline_model_19.ak"
#c_5_1 # https://www.yuque.com/pinshu/alink_tutorial/book_python_2_5_1 source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string") source\ .lazyPrint()\ .lazyPrint(title=">>> print with title.")\ .lazyPrint(2)\ .lazyPrint(2, ">>> print 2 rows with title.")\ .lazyPrintStatistics()\ .lazyPrintStatistics(">>> summary of current data.")\ .lazyCollectToDataframe(lambda df : print("number of rows : " + str(len(df))))\ .lazyCollectStatistics(lambda tableSummary : print("number of valid values :" + str(tableSummary.numValidValue("sepal_length")) + "\nnumber of missing values :" + str(tableSummary.numMissingValue("sepal_length"))))\ .link( SelectBatchOp()\ .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio") )\ .lazyPrint(title=">>> final data")\ .lazyPrintStatistics(">>> summary of final data.") BatchOperator.execute() Pipeline()\ .add( Select()\ .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio")\ .enableLazyPrintTransformData(5, ">>> output data after Select")\ .enableLazyPrintTransformStat(">>> summary of data after Select ") )\ .add( StandardScaler()\ .setSelectedCols(["sepal_length", "sepal_width"])\ .enableLazyPrintModelInfo(">>> model info")\ .enableLazyPrintTransformData(5, ">>> output data after StandardScaler")\ .enableLazyPrintTransformStat(">>> summary of data after StandardScaler")\ )\ .fit(source)\ .transform(source)\ .lazyPrint(title=">>> output data after the whole pipeline") BatchOperator.execute()
#c_6 df = pd.DataFrame( [ ["sunny", 85.0, 85.0, False, "no"], ["sunny", 80.0, 90.0, True, "no"], ["overcast", 83.0, 78.0, False, "yes"], ["rainy", 70.0, 96.0, False, "yes"], ["rainy", 68.0, 80.0, False, "yes"], ["rainy", 65.0, 70.0, True, "no"], ["overcast", 64.0, 65.0, True, "yes"], ["sunny", 72.0, 95.0, False, "no"], ["sunny", 69.0, 70.0, False, "yes"], ["rainy", 75.0, 80.0, False, "yes"], ["sunny", 75.0, 70.0, True, "yes"], ["overcast", 72.0, 90.0, True, "yes"], ["overcast", 81.0, 75.0, False, "yes"], ["rainy", 71.0, 80.0, True, "no"] ] ) source = BatchOperator.fromDataframe(df, schemaStr="outlook string, Temperature double, Humidity double, Windy boolean, play string") source\ .link( C45TrainBatchOp()\ .setFeatureCols(["outlook", "Temperature", "Humidity", "Windy"])\ .setCategoricalCols(["outlook", "Windy"])\ .setLabelCol("play") )\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TREE_MODEL_FILE)\ .setOverwriteSink(True) ) BatchOperator.execute() AkSourceBatchOp()\ .setFilePath(DATA_DIR + TREE_MODEL_FILE)\ .link( DecisionTreeModelInfoBatchOp()\ .lazyPrintModelInfo()\ .lazyCollectModelInfo( lambda decisionTreeModelInfo: decisionTreeModelInfo.saveTreeAsImage( DATA_DIR + "tree_model.png", True) ) ) BatchOperator.execute() if os.path.exists(DATA_DIR + PIPELINE_MODEL_FILE): os.remove(DATA_DIR + PIPELINE_MODEL_FILE) df = pd.DataFrame( [ [2009, 0.5], [2010, 9.36], [2011, 52.0], [2012, 191.0], [2013, 350.0], [2014, 571.0], [2015, 912.0], [2016, 1207.0], [2017, 1682.0] ] ) train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double') pipeline = Pipeline()\ .add( Select().setClause("*, x*x AS x2") )\ .add( LinearRegression()\ .setFeatureCols(["x", "x2"])\ .setLabelCol("gmv")\ .setPredictionCol("pred") ) pipeline.fit(train_set).save(DATA_DIR + PIPELINE_MODEL_FILE) BatchOperator.execute() pipelineModel = PipelineModel.load(DATA_DIR + PIPELINE_MODEL_FILE); stages = pipelineModel.getTransformers() for i in range(2) : print(str(i) + "\t" + str(stages[i])); stages[1].getModelData()\ .link( LinearRegModelInfoBatchOp().lazyPrintModelInfo() ) BatchOperator.execute()