本章包括下面各节:
1.1 Alink是什么
1.2 免费下载、安装
1.3 Alink的功能
1.3.1 丰富的算法库
1.3.2 多样的使用体验
1.3.3 与SparkML的对比
1.4 关于数据和代码
1.5 简单示例
1.5.1 数据的读/写与显示
1.5.2 批式训练和批式预测
1.5.3 流式处理和流式预测
1.5.4 定义Pipeline,简化操作
1.5.5 嵌入预测服务系统
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "temp" + os.sep
#c_5_1 source = CsvSourceBatchOp()\ .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases" + "/iris/iris.data")\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, " + "petal_width double, category string") source.firstN(5).print(); source.sampleWithSize(10)\ .link( CsvSinkBatchOp()\ .setFilePath(DATA_DIR + "iris_10.data")\ .setOverwriteSink(True) ) BatchOperator.execute();
#c_5_2 df = pd.DataFrame( [ [2009, 0.5], [2010, 9.36], [2011, 52.0], [2012, 191.0], [2013, 350.0], [2014, 571.0], [2015, 912.0], [2016, 1207.0], [2017, 1682.0] ] ) train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double') df_2 = pd.DataFrame( [ [2018], [2019] ] ) pred_set = BatchOperator.fromDataframe(df_2, schemaStr='x int') train_set = train_set.select("x, x*x AS x2, gmv"); trainer = LinearRegTrainBatchOp()\ .setFeatureCols(["x", "x2"])\ .setLabelCol("gmv") train_set.link(trainer); trainer.link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + "gmv_reg.model")\ .setOverwriteSink(True) ) BatchOperator.execute() lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model") pred_set = pred_set.select("x, x*x AS x2") predictor = LinearRegPredictBatchOp().setPredictionCol("pred") predictor.linkFrom(lr_model, pred_set).print();
#c_5_3 pred_set = StreamOperator.fromDataframe(df_2, schemaStr='x int') lr_model = AkSourceBatchOp().setFilePath(DATA_DIR + "gmv_reg.model") predictor = LinearRegPredictStreamOp(lr_model).setPredictionCol("pred") pred_set\ .select("x, x*x AS x2")\ .link(predictor)\ .print() StreamOperator.execute()
#c_5_4 if os.path.exists(DATA_DIR + "gmv_pipeline.model"): os.remove(DATA_DIR + "gmv_pipeline.model") train_set = BatchOperator.fromDataframe(df, schemaStr='x int, gmv double') pipeline = Pipeline()\ .add( Select().setClause("*, x*x AS x2") )\ .add( LinearRegression()\ .setFeatureCols(["x", "x2"])\ .setLabelCol("gmv")\ .setPredictionCol("pred") ) pipeline.fit(train_set).save(DATA_DIR + "gmv_pipeline.model") BatchOperator.execute() pipelineModel = PipelineModel.load(DATA_DIR + "gmv_pipeline.model") pred_batch = BatchOperator.fromDataframe(df_2, schemaStr='x int') pipelineModel.transform(pred_batch).print() pred_stream = StreamOperator.fromDataframe(df_2, schemaStr='x int') pipelineModel.transform(pred_stream).print() StreamOperator.execute()
#c_5_5 predictor = LocalPredictor(DATA_DIR + "gmv_pipeline.model", "x int") print(predictor.getOutputColNames()) for x in [2018, 2019] : print(predictor.map([x]))