本章包括下面各节:
14.1 整体流程
14.2 数据准备
14.3 特征工程
14.4 使用特征工程处理数据
14.5 在线训练
14.6 模型过滤
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
pd.set_option('display.max_colwidth', 1000)
DATA_DIR = ROOT_DIR + "ctr_avazu" + os.sep
SCHEMA_STRING\
= "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, "\
+ "site_category string, app_id string, app_domain string, app_category string, device_id string, "\
+ "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, "\
+ "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int"
CATEGORY_COL_NAMES = [
"C1", "banner_pos", "site_category", "app_domain",
"app_category", "device_type", "device_conn_type",
"site_id", "site_domain", "device_id", "device_model"
]
NUMERICAL_COL_NAMES = ["C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21"]
FEATURE_MODEL_FILE = "feature_model.ak"
INIT_MODEL_FILE = "init_model.ak"
LABEL_COL_NAME = "click"
VEC_COL_NAME = "vec"
PREDICTION_COL_NAME = "pred"
PRED_DETAIL_COL_NAME = "pred_info"
NUM_HASH_FEATURES = 30000
#c_2
TextSourceBatchOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-small.csv")\
.firstN(10)\
.print()
trainBatchData = CsvSourceBatchOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-small.csv")\
.setSchemaStr(SCHEMA_STRING);
trainBatchData.firstN(10).print();
#c_3
trainBatchData = CsvSourceBatchOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-small.csv")\
.setSchemaStr(SCHEMA_STRING);
feature_pipeline = Pipeline()\
.add(
StandardScaler()\
.setSelectedCols(NUMERICAL_COL_NAMES)
)\
.add(
FeatureHasher()\
.setSelectedCols(CATEGORY_COL_NAMES + NUMERICAL_COL_NAMES)\
.setCategoricalCols(CATEGORY_COL_NAMES)\
.setOutputCol(VEC_COL_NAME)\
.setNumFeatures(NUM_HASH_FEATURES)
);
if not(os.path.exists(DATA_DIR + FEATURE_MODEL_FILE)) :
feature_pipeline\
.fit(trainBatchData)\
.save(DATA_DIR + FEATURE_MODEL_FILE)
BatchOperator.execute()
#c_4
feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE)
data = CsvSourceStreamOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-ctr-train-8M.csv")\
.setSchemaStr(SCHEMA_STRING);
if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) :
trainBatchData = CsvSourceBatchOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-small.csv")\
.setSchemaStr(SCHEMA_STRING);
lr = LogisticRegressionTrainBatchOp()\
.setVectorCol(VEC_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setWithIntercept(True)\
.setMaxIter(10);
feature_pipelineModel\
.transform(trainBatchData)\
.link(lr)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE)
);
BatchOperator.execute();
#c_5
feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE);
initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);
data = CsvSourceStreamOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-ctr-train-8M.csv")\
.setSchemaStr(SCHEMA_STRING)\
.setIgnoreFirstLine(True)
spliter = SplitStreamOp().setFraction(0.5).linkFrom(data);
train_stream_data = feature_pipelineModel.transform(spliter);
test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0));
model = FtrlTrainStreamOp(initModel)\
.setVectorCol(VEC_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setWithIntercept(True)\
.setAlpha(0.1)\
.setBeta(0.1)\
.setL1(0.01)\
.setL2(0.01)\
.setTimeInterval(10)\
.setVectorSize(NUM_HASH_FEATURES)\
.linkFrom(train_stream_data);
predResult = FtrlPredictStreamOp(initModel)\
.setVectorCol(VEC_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setReservedCols([LABEL_COL_NAME])\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.linkFrom(model, test_stream_data);
# predResult\
# .sample(0.0001)\
# .select("'Pred Sample' AS out_type, *")\
# .print();
predResult.print(key="predResult", refreshInterval = 30, maxLimit=20)
predResult\
.link(
EvalBinaryClassStreamOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.setTimeInterval(10)
)\
.link(
JsonValueStreamOp()\
.setSelectedCol("Data")\
.setReservedCols(["Statistics"])\
.setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\
.setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])
)\
.print(key="evaluation", refreshInterval = 30, maxLimit=20)
# .select("'Eval Metric' AS out_type, *")\
# .print();
StreamOperator.execute();
#c_6
data = CsvSourceStreamOp()\
.setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/"
+ "data-files/avazu-ctr-train-8M.csv")\
.setSchemaStr(SCHEMA_STRING)\
.setIgnoreFirstLine(True);
feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE);
spliter = SplitStreamOp().setFraction(0.5).linkFrom(data);
train_stream_data = feature_pipelineModel.transform(spliter);
test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0));
initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE);
model = FtrlTrainStreamOp(initModel)\
.setVectorCol(VEC_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setWithIntercept(True)\
.setAlpha(0.1)\
.setBeta(0.1)\
.setL1(0.01)\
.setL2(0.01)\
.setTimeInterval(10)\
.setVectorSize(NUM_HASH_FEATURES)\
.linkFrom(train_stream_data);
model_filter = FtrlModelFilterStreamOp()\
.setPositiveLabelValueString("1")\
.setVectorCol(VEC_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setAccuracyThreshold(0.83)\
.setAucThreshold(0.71)\
.linkFrom(model, train_stream_data);
model_filter\
.select("'Model' AS out_type, *")\
.print();
predResult = FtrlPredictStreamOp(initModel)\
.setVectorCol(VEC_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setReservedCols([LABEL_COL_NAME])\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.linkFrom(model_filter, test_stream_data);
predResult\
.sample(0.0001)\
.select("'Pred Sample' AS out_type, *")\
.print();
predResult\
.link(
EvalBinaryClassStreamOp()\
.setPositiveLabelValueString("1")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.setTimeInterval(10)
)\
.link(
JsonValueStreamOp()\
.setSelectedCol("Data")\
.setReservedCols(["Statistics"])\
.setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\
.setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"])
)\
.select("'Eval Metric' AS out_type, *")\
.print();
StreamOperator.execute();
```python
```