第2章 系统概况与核心概念

本章包括下面各节:
2.1 基本概念
2.2 批式任务与流式任务
2.3 Alink=A+link
2.3.1 BatchOperator和StreamOperator
2.3.2 link方式是批式算法/流式算法的通用使用方式
2.3.3 link的简化
2.3.4 组件的主输出与侧输出
2.4 Pipeline与PipelineModel
2.4.1 概念和定义
2.4.2 深入介绍
2.5 触发Alink任务的执行
2.6 模型信息显示
2.7 文件系统与数据库
2.8 Schema String

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。

package com.alibaba.alink;

import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.classification.C45TrainBatchOp;
import com.alibaba.alink.operator.batch.classification.DecisionTreeModelInfoBatchOp;
import com.alibaba.alink.operator.batch.regression.LinearRegModelInfoBatchOp;
import com.alibaba.alink.operator.batch.sink.AkSinkBatchOp;
import com.alibaba.alink.operator.batch.source.AkSourceBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;
import com.alibaba.alink.operator.batch.sql.SelectBatchOp;
import com.alibaba.alink.operator.common.statistics.basicstatistic.TableSummary;
import com.alibaba.alink.operator.common.tree.TreeModelInfo.DecisionTreeModelInfo;
import com.alibaba.alink.pipeline.Pipeline;
import com.alibaba.alink.pipeline.PipelineModel;
import com.alibaba.alink.pipeline.TransformerBase;
import com.alibaba.alink.pipeline.dataproc.StandardScaler;
import com.alibaba.alink.pipeline.regression.LinearRegression;
import com.alibaba.alink.pipeline.regression.LinearRegressionModel;
import com.alibaba.alink.pipeline.sql.Select;
import org.apache.flink.types.Row;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

public class Chap02 {

    static final String DATA_DIR = Utils.ROOT_DIR + "temp" + File.separator;

    static final String TREE_MODEL_FILE = "tree_model.ak";
    static final String PIPELINE_MODEL_FILE = "pipeline_model.ak";

    public static void main(String[] args) throws Exception {

        BatchOperator.setParallelism(1);

        c_5_1();

        c_6();

    }

    /*
     * https://www.yuque.com/pinshu/alink_tutorial/book_java_2_5_1
     */
    static void c_5_1() throws Exception {
        CsvSourceBatchOp source = new CsvSourceBatchOp()
            .setFilePath("http://archive.ics.uci.edu/ml/machine-learning-databases"
                + "/iris/iris.data")
            .setSchemaStr("sepal_length double, sepal_width double, petal_length double, "
                + "petal_width double, category string");

        source
            .lazyPrint()
            .lazyPrint(">>> print with title.")
            .lazyPrint(2)
            .lazyPrint(2, ">>> print 2 rows with title.")
            .lazyPrintStatistics()
            .lazyPrintStatistics(">>> summary of current data.")
            .lazyCollect(new Consumer<List<Row>>() {
                @Override
                public void accept(List<Row> rows) {
                    System.out.println("number of rows : " + rows.size());
                }
            })
            .lazyCollectStatistics(new Consumer<TableSummary>() {
                @Override
                public void accept(TableSummary tableSummary) {
                    System.out.println("number of valid values :"
                        + tableSummary.numValidValue("sepal_length"));
                    System.out.println("number of missing values :"
                        + tableSummary.numMissingValue("sepal_length"));
                }
            })
            .link(
                new SelectBatchOp()
                    .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio")
            )
            .lazyPrint(">>> final data")
            .lazyPrintStatistics(">>> summary of final data.");

        BatchOperator.execute();

        new Pipeline()
            .add(
                new Select()
                    .setClause("sepal_length, sepal_width, sepal_length/sepal_width AS ratio")
                    .enableLazyPrintTransformData(5, ">>> output data after Select")
                    .enableLazyPrintTransformStat(">>> summary of data after Select ")
            )
            .add(
                new StandardScaler()
                    .setSelectedCols("sepal_length", "sepal_width")
                    .enableLazyPrintModelInfo(">>> model info")
                    .enableLazyPrintTransformData(5, ">>> output data after StandardScaler")
                    .enableLazyPrintTransformStat(">>> summary of data after StandardScaler")
            )
            .fit(source)
            .transform(source)
            .lazyPrint(">>> output data after the whole pipeline");

        BatchOperator.execute();
    }


    static void c_6() throws Exception {
        MemSourceBatchOp source = new MemSourceBatchOp(
            new Row[]{
                Row.of("sunny", 85.0, 85.0, false, "no"),
                Row.of("sunny", 80.0, 90.0, true, "no"),
                Row.of("overcast", 83.0, 78.0, false, "yes"),
                Row.of("rainy", 70.0, 96.0, false, "yes"),
                Row.of("rainy", 68.0, 80.0, false, "yes"),
                Row.of("rainy", 65.0, 70.0, true, "no"),
                Row.of("overcast", 64.0, 65.0, true, "yes"),
                Row.of("sunny", 72.0, 95.0, false, "no"),
                Row.of("sunny", 69.0, 70.0, false, "yes"),
                Row.of("rainy", 75.0, 80.0, false, "yes"),
                Row.of("sunny", 75.0, 70.0, true, "yes"),
                Row.of("overcast", 72.0, 90.0, true, "yes"),
                Row.of("overcast", 81.0, 75.0, false, "yes"),
                Row.of("rainy", 71.0, 80.0, true, "no")
            },
            new String[]{"outlook", "Temperature", "Humidity", "Windy", "play"}
        );

        source
            .link(
                new C45TrainBatchOp()
                    .setFeatureCols("outlook", "Temperature", "Humidity", "Windy")
                    .setCategoricalCols("outlook", "Windy")
                    .setLabelCol("play")
            )
            .link(
                new AkSinkBatchOp()
                    .setFilePath(DATA_DIR + TREE_MODEL_FILE)
                    .setOverwriteSink(true)
            );

        BatchOperator.execute();

        new AkSourceBatchOp()
            .setFilePath(DATA_DIR + TREE_MODEL_FILE)
            .link(
                new DecisionTreeModelInfoBatchOp()
                    .lazyPrintModelInfo()
                    .lazyCollectModelInfo(new Consumer<DecisionTreeModelInfo>() {
                        @Override
                        public void accept(DecisionTreeModelInfo decisionTreeModelInfo) {
                            try {
                                decisionTreeModelInfo.saveTreeAsImage(
                                    DATA_DIR + "tree_model.png", true);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    })
            );
        BatchOperator.execute();

        MemSourceBatchOp train_set = new MemSourceBatchOp(
            new Row[]{
                Row.of(2009, 0.5),
                Row.of(2010, 9.36),
                Row.of(2011, 52.0),
                Row.of(2012, 191.0),
                Row.of(2013, 350.0),
                Row.of(2014, 571.0),
                Row.of(2015, 912.0),
                Row.of(2016, 1207.0),
                Row.of(2017, 1682.0),
            },
            new String[]{"x", "gmv"}
        );

        Pipeline pipeline = new Pipeline()
            .add(
                new Select()
                    .setClause("*, x*x AS x2")
            )
            .add(
                new LinearRegression()
                    .setFeatureCols("x", "x2")
                    .setLabelCol("gmv")
                    .setPredictionCol("pred")
            );
        pipeline.fit(train_set).save(DATA_DIR + PIPELINE_MODEL_FILE, true);
        BatchOperator.execute();

        PipelineModel pipelineModel = PipelineModel.load(DATA_DIR + PIPELINE_MODEL_FILE);

        TransformerBase<?>[] stages = pipelineModel.getTransformers();

        for (int i = 0; i < stages.length; i++) {
            System.out.println(String.valueOf(i) + "\t" + stages[i]);
        }

        ((LinearRegressionModel) stages[1]).getModelData()
            .link(
                new LinearRegModelInfoBatchOp()
                    .lazyPrintModelInfo()
            );
        BatchOperator.execute();

    }

}