本章包括下面各节:
14.1 整体流程
14.2 数据准备
14.3 特征工程
14.4 使用特征工程处理数据
14.5 在线训练
14.6 模型过滤
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) DATA_DIR = ROOT_DIR + "ctr_avazu" + os.sep SCHEMA_STRING\ = "id string, click string, dt string, C1 string, banner_pos int, site_id string, site_domain string, "\ + "site_category string, app_id string, app_domain string, app_category string, device_id string, "\ + "device_ip string, device_model string, device_type string, device_conn_type string, C14 int, C15 int, "\ + "C16 int, C17 int, C18 int, C19 int, C20 int, C21 int" CATEGORY_COL_NAMES = [ "C1", "banner_pos", "site_category", "app_domain", "app_category", "device_type", "device_conn_type", "site_id", "site_domain", "device_id", "device_model" ] NUMERICAL_COL_NAMES = ["C14", "C15", "C16", "C17", "C18", "C19", "C20", "C21"] FEATURE_MODEL_FILE = "feature_model.ak" INIT_MODEL_FILE = "init_model.ak" LABEL_COL_NAME = "click" VEC_COL_NAME = "vec" PREDICTION_COL_NAME = "pred" PRED_DETAIL_COL_NAME = "pred_info" NUM_HASH_FEATURES = 30000
#c_2 TextSourceBatchOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-small.csv")\ .firstN(10)\ .print() trainBatchData = CsvSourceBatchOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-small.csv")\ .setSchemaStr(SCHEMA_STRING); trainBatchData.firstN(10).print();
#c_3 trainBatchData = CsvSourceBatchOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-small.csv")\ .setSchemaStr(SCHEMA_STRING); feature_pipeline = Pipeline()\ .add( StandardScaler()\ .setSelectedCols(NUMERICAL_COL_NAMES) )\ .add( FeatureHasher()\ .setSelectedCols(CATEGORY_COL_NAMES + NUMERICAL_COL_NAMES)\ .setCategoricalCols(CATEGORY_COL_NAMES)\ .setOutputCol(VEC_COL_NAME)\ .setNumFeatures(NUM_HASH_FEATURES) ); if not(os.path.exists(DATA_DIR + FEATURE_MODEL_FILE)) : feature_pipeline\ .fit(trainBatchData)\ .save(DATA_DIR + FEATURE_MODEL_FILE) BatchOperator.execute()
#c_4 feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE) data = CsvSourceStreamOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-ctr-train-8M.csv")\ .setSchemaStr(SCHEMA_STRING); if not(os.path.exists(DATA_DIR + INIT_MODEL_FILE)) : trainBatchData = CsvSourceBatchOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-small.csv")\ .setSchemaStr(SCHEMA_STRING); lr = LogisticRegressionTrainBatchOp()\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setWithIntercept(True)\ .setMaxIter(10); feature_pipelineModel\ .transform(trainBatchData)\ .link(lr)\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE) ); BatchOperator.execute();
#c_5 feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE); initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); data = CsvSourceStreamOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-ctr-train-8M.csv")\ .setSchemaStr(SCHEMA_STRING)\ .setIgnoreFirstLine(True) spliter = SplitStreamOp().setFraction(0.5).linkFrom(data); train_stream_data = feature_pipelineModel.transform(spliter); test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0)); model = FtrlTrainStreamOp(initModel)\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setWithIntercept(True)\ .setAlpha(0.1)\ .setBeta(0.1)\ .setL1(0.01)\ .setL2(0.01)\ .setTimeInterval(10)\ .setVectorSize(NUM_HASH_FEATURES)\ .linkFrom(train_stream_data); predResult = FtrlPredictStreamOp(initModel)\ .setVectorCol(VEC_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setReservedCols([LABEL_COL_NAME])\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .linkFrom(model, test_stream_data); # predResult\ # .sample(0.0001)\ # .select("'Pred Sample' AS out_type, *")\ # .print(); predResult.print(key="predResult", refreshInterval = 30, maxLimit=20)
predResult\ .link( EvalBinaryClassStreamOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .setTimeInterval(10) )\ .link( JsonValueStreamOp()\ .setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"]) )\ .print(key="evaluation", refreshInterval = 30, maxLimit=20) # .select("'Eval Metric' AS out_type, *")\ # .print(); StreamOperator.execute();
#c_6 data = CsvSourceStreamOp()\ .setFilePath("http://alink-release.oss-cn-beijing.aliyuncs.com/" + "data-files/avazu-ctr-train-8M.csv")\ .setSchemaStr(SCHEMA_STRING)\ .setIgnoreFirstLine(True); feature_pipelineModel = PipelineModel.load(DATA_DIR + FEATURE_MODEL_FILE); spliter = SplitStreamOp().setFraction(0.5).linkFrom(data); train_stream_data = feature_pipelineModel.transform(spliter); test_stream_data = feature_pipelineModel.transform(spliter.getSideOutput(0)); initModel = AkSourceBatchOp().setFilePath(DATA_DIR + INIT_MODEL_FILE); model = FtrlTrainStreamOp(initModel)\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setWithIntercept(True)\ .setAlpha(0.1)\ .setBeta(0.1)\ .setL1(0.01)\ .setL2(0.01)\ .setTimeInterval(10)\ .setVectorSize(NUM_HASH_FEATURES)\ .linkFrom(train_stream_data); model_filter = FtrlModelFilterStreamOp()\ .setPositiveLabelValueString("1")\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setAccuracyThreshold(0.83)\ .setAucThreshold(0.71)\ .linkFrom(model, train_stream_data); model_filter\ .select("'Model' AS out_type, *")\ .print(); predResult = FtrlPredictStreamOp(initModel)\ .setVectorCol(VEC_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setReservedCols([LABEL_COL_NAME])\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .linkFrom(model_filter, test_stream_data); predResult\ .sample(0.0001)\ .select("'Pred Sample' AS out_type, *")\ .print(); predResult\ .link( EvalBinaryClassStreamOp()\ .setPositiveLabelValueString("1")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .setTimeInterval(10) )\ .link( JsonValueStreamOp()\ .setSelectedCol("Data")\ .setReservedCols(["Statistics"])\ .setOutputCols(["Accuracy", "AUC", "ConfusionMatrix"])\ .setJsonPath(["$.Accuracy", "$.AUC", "$.ConfusionMatrix"]) )\ .select("'Eval Metric' AS out_type, *")\ .print(); StreamOperator.execute();