Alink教程(Python版)

第14章 在线学习

本章包括下面各节:
14.1 整体流程
14.2 数据准备
14.3 特征工程
14.4 使用特征工程处理数据
14.5 在线训练
14.6 模型过滤

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)

DATA_DIR = ROOT_DIR + "ctr_avazu" + os.sep

SCHEMA_STRING\
    = "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, "\
    + "site_category string, app_id string, app_domain string, app_category string, device_id string, "\
    + "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, "\
    + "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int"

CATEGORY_COL_NAMES = [
    "C1", "banner_pos", "site_category", "app_domain",
    "app_category", "device_type", "device_conn_type",
    "site_id", "site_domain", "device_id", "device_model"
]

NUMERICAL_COL_NAMES = ["C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21"]

FEATURE_MODEL_FILE = "feature_model.ak"
INIT_MODEL_FILE = "init_model.ak"

LABEL_COL_NAME = "click"
VEC_COL_NAME = "vec"
PREDICTION_COL_NAME = "pred"
PRED_DETAIL_COL_NAME = "pred_info"

NUM_HASH_FEATURES = 30000

#c_2
TextSourceBatchOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-small.csv")\
    .firstN(10)\
    .print()

trainBatchData = CsvSourceBatchOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-small.csv")\
    .setSchemaStr(SCHEMA_STRING);

trainBatchData.firstN(10).print();

#c_3
trainBatchData = CsvSourceBatchOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-small.csv")\
    .setSchemaStr(SCHEMA_STRING);

feature_pipeline = Pipeline()\
    .add(
        StandardScaler()\
            .setSelectedCols(NUMERICAL_COL_NAMES)
    )\
    .add(
        FeatureHasher()\
            .setSelectedCols(CATEGORY_COL_NAMES + NUMERICAL_COL_NAMES)\
            .setCategoricalCols(CATEGORY_COL_NAMES)\
            .setOutputCol(VEC_COL_NAME)\
            .setNumFeatures(NUM_HASH_FEATURES)
    );

if not(os.path.exists(DATA_DIR + FEATURE_MODEL_FILE)) :
    feature_pipeline\
        .fit(trainBatchData)\
        .save(DATA_DIR + FEATURE_MODEL_FILE)
    BatchOperator.execute()

#c_4
feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE)

data = CsvSourceStreamOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-ctr-train-8M.csv")\
    .setSchemaStr(SCHEMA_STRING);

if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) :
    trainBatchData = CsvSourceBatchOp()\
        .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                     + "data-files/avazu-small.csv")\
        .setSchemaStr(SCHEMA_STRING);

    lr = LogisticRegressionTrainBatchOp()\
        .setVectorCol(VEC_COL_NAME)\
        .setLabelCol(LABEL_COL_NAME)\
        .setWithIntercept(True)\
        .setMaxIter(10);

    feature_pipelineModel\
    .transform(trainBatchData)\
    .link(lr)\
    .link(
        AkSinkBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE)
    );
    BatchOperator.execute();

#c_5 
feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE);

initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

data = CsvSourceStreamOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-ctr-train-8M.csv")\
    .setSchemaStr(SCHEMA_STRING)\
    .setIgnoreFirstLine(True)

spliter = SplitStreamOp().setFraction(0.5).linkFrom(data);
train_stream_data = feature_pipelineModel.transform(spliter);
test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0));

model = FtrlTrainStreamOp(initModel)\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setWithIntercept(True)\
    .setAlpha(0.1)\
    .setBeta(0.1)\
    .setL1(0.01)\
    .setL2(0.01)\
    .setTimeInterval(10)\
    .setVectorSize(NUM_HASH_FEATURES)\
    .linkFrom(train_stream_data);

predResult = FtrlPredictStreamOp(initModel)\
    .setVectorCol(VEC_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setReservedCols([LABEL_COL_NAME])\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .linkFrom(model, test_stream_data);

# predResult\
#     .sample(0.0001)\
#     .select("'Pred Sample' AS out_type, *")\
#     .print();

predResult.print(key="predResult", refreshInterval = 30, maxLimit=20)
predResult\
    .link(
        EvalBinaryClassStreamOp()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .setTimeInterval(10)
    )\
    .link(
        JsonValueStreamOp()\
            .setSelectedCol("Data")\
            .setReservedCols(["Statistics"])\
            .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\
            .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])
    )\
    .print(key="evaluation", refreshInterval = 30, maxLimit=20)
# .select("'Eval Metric' AS out_type, *")\
#     .print();

StreamOperator.execute();

#c_6
data = CsvSourceStreamOp()\
    .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
                 + "data-files/avazu-ctr-train-8M.csv")\
    .setSchemaStr(SCHEMA_STRING)\
    .setIgnoreFirstLine(True);

feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE);

spliter = SplitStreamOp().setFraction(0.5).linkFrom(data);
train_stream_data = feature_pipelineModel.transform(spliter);
test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0));

initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);

model = FtrlTrainStreamOp(initModel)\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setWithIntercept(True)\
    .setAlpha(0.1)\
    .setBeta(0.1)\
    .setL1(0.01)\
    .setL2(0.01)\
    .setTimeInterval(10)\
    .setVectorSize(NUM_HASH_FEATURES)\
    .linkFrom(train_stream_data);

model_filter = FtrlModelFilterStreamOp()\
    .setPositiveLabelValueString("1")\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setAccuracyThreshold(0.83)\
    .setAucThreshold(0.71)\
    .linkFrom(model, train_stream_data);

model_filter\
    .select("'Model' AS out_type, *")\
    .print();

predResult = FtrlPredictStreamOp(initModel)\
    .setVectorCol(VEC_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setReservedCols([LABEL_COL_NAME])\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .linkFrom(model_filter, test_stream_data);

predResult\
    .sample(0.0001)\
    .select("'Pred Sample' AS out_type, *")\
    .print();

predResult\
    .link(
        EvalBinaryClassStreamOp()\
            .setPositiveLabelValueString("1")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .setTimeInterval(10)
    )\
    .link(
        JsonValueStreamOp()\
            .setSelectedCol("Data")\
            .setReservedCols(["Statistics"])\
            .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\
            .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])
    )\
    .select("'Eval Metric' AS out_type, *")\
    .print();

StreamOperator.execute();