第1章 Alink快速上手

本章包括下面各节:
1.1 Alink是什么
1.2 免费下载、安装
1.3 Alink的功能
1.3.1 丰富的算法库
1.3.2 多样的使用体验
1.3.3 与SparkML的对比
1.4 关于数据和代码
1.5 简单示例
1.5.1 数据的读/写与显示
1.5.2 批式训练和批式预测
1.5.3 流式处理和流式预测
1.5.4 定义Pipeline,简化操作
1.5.5 嵌入预测服务系统

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

DATA_DIR = ROOT_DIR + "temp" + os.sep
#c_5_1
source = CsvSourceBatchOp()\
    .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
                 + "/iris/iris.data")\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, "
                  + "petal_width double, category string")

source.firstN(5).print();

source.sampleWithSize(10)\
    .link(
        CsvSinkBatchOp()\
            .setFilePath(DATA_DIR + "iris_10.data")\
            .setOverwriteSink(True)
    )

BatchOperator.execute();
#c_5_2
df = pd.DataFrame(
    [
        [2009, 0.5],
        [2010, 9.36],
        [2011, 52.0],
        [2012, 191.0],
        [2013, 350.0],
        [2014, 571.0],
        [2015, 912.0],
        [2016, 1207.0],
        [2017, 1682.0]
    ]
)  
train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double')

df_2 = pd.DataFrame(
    [
        [2018],
        [2019]
    ]
) 
pred_set = BatchOperator.fromDataframe(df_2, schemaStr='x int')

train_set = train_set.select("x, x*x AS x2, gmv");

trainer = LinearRegTrainBatchOp()\
    .setFeatureCols(["x", "x2"])\
    .setLabelCol("gmv")

train_set.link(trainer);

trainer.link(
    AkSinkBatchOp()\
        .setFilePath(DATA_DIR + "gmv_reg.model")\
        .setOverwriteSink(True)
)
BatchOperator.execute()

lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model")

pred_set = pred_set.select("x, x*x AS x2")

predictor = LinearRegPredictBatchOp().setPredictionCol("pred")

predictor.linkFrom(lr_model, pred_set).print();
#c_5_3
pred_set = StreamOperator.fromDataframe(df_2, schemaStr='x int')

lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model")

predictor = LinearRegPredictStreamOp(lr_model).setPredictionCol("pred")

pred_set\
    .select("x, x*x AS x2")\
    .link(predictor)\
    .print()

StreamOperator.execute()
#c_5_4
if os.path.exists(DATA_DIR + "gmv_pipeline.model"):
    os.remove(DATA_DIR + "gmv_pipeline.model")

train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double')

pipeline = Pipeline()\
    .add(
        Select().setClause("*, x*x AS x2")
    )\
    .add(
        LinearRegression()\
            .setFeatureCols(["x", "x2"])\
            .setLabelCol("gmv")\
            .setPredictionCol("pred")
    )

pipeline.fit(train_set).save(DATA_DIR + "gmv_pipeline.model")

BatchOperator.execute()

pipelineModel = PipelineModel.load(DATA_DIR + "gmv_pipeline.model")

pred_batch = BatchOperator.fromDataframe(df_2, schemaStr='x int')

pipelineModel.transform(pred_batch).print()


pred_stream = StreamOperator.fromDataframe(df_2, schemaStr='x int')

pipelineModel.transform(pred_stream).print()

StreamOperator.execute()
#c_5_5

predictor = LocalPredictor(DATA_DIR + "gmv_pipeline.model", "x int")

print(predictor.getOutputColNames())

for x in [2018, 2019] :
    print(predictor.map([x]))