本章包括下面各节:
10.1 整体流程
10.1.1 特征哑元化
10.1.2 特征的重要性
10.2 减少模型特征的个数
10.3 离散特征转化
10.3.1 独热编码
10.3.2 特征哈希
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "german_credit" + os.sep ORIGIN_FILE = "german.data"; TRAIN_FILE = "train.ak"; TEST_FILE = "test.ak"; COL_NAMES = [ "status", "duration", "credit_history", "purpose", "credit_amount", "savings", "employment", "installment_rate", "marriage_sex", "debtors", "residence", "property", "age", "other_plan", "housing", "number_credits", "job", "maintenance_num", "telephone", "foreign_worker", "class" ] COL_TYPES = [ "string", "int", "string", "string", "int", "string", "string", "int", "string", "string", "int", "string", "int", "string", "string", "int", "string", "int", "string", "string", "int" ] CLAUSE_CREATE_FEATURES = "(case status when 'A11' then 1 else 0 end) as status_A11,"\ + "(case status when 'A12' then 1 else 0 end) as status_A12,"\ + "(case status when 'A13' then 1 else 0 end) as status_A13,"\ + "(case status when 'A14' then 1 else 0 end) as status_A14,"\ + "duration,"\ + "(case credit_history when 'A30' then 1 else 0 end) as credit_history_A30,"\ + "(case credit_history when 'A31' then 1 else 0 end) as credit_history_A31,"\ + "(case credit_history when 'A32' then 1 else 0 end) as credit_history_A32,"\ + "(case credit_history when 'A33' then 1 else 0 end) as credit_history_A33,"\ + "(case credit_history when 'A34' then 1 else 0 end) as credit_history_A34,"\ + "(case purpose when 'A40' then 1 else 0 end) as purpose_A40,"\ + "(case purpose when 'A41' then 1 else 0 end) as purpose_A41,"\ + "(case purpose when 'A42' then 1 else 0 end) as purpose_A42,"\ + "(case purpose when 'A43' then 1 else 0 end) as purpose_A43,"\ + "(case purpose when 'A44' then 1 else 0 end) as purpose_A44,"\ + "(case purpose when 'A45' then 1 else 0 end) as purpose_A45,"\ + "(case purpose when 'A46' then 1 else 0 end) as purpose_A46,"\ + "(case purpose when 'A47' then 1 else 0 end) as purpose_A47,"\ + "(case purpose when 'A48' then 1 else 0 end) as purpose_A48,"\ + "(case purpose when 'A49' then 1 else 0 end) as purpose_A49,"\ + "(case purpose when 'A410' then 1 else 0 end) as purpose_A410,"\ + "credit_amount,"\ + "(case savings when 'A61' then 1 else 0 end) as savings_A61,"\ + "(case savings when 'A62' then 1 else 0 end) as savings_A62,"\ + "(case savings when 'A63' then 1 else 0 end) as savings_A63,"\ + "(case savings when 'A64' then 1 else 0 end) as savings_A64,"\ + "(case savings when 'A65' then 1 else 0 end) as savings_A65,"\ + "(case employment when 'A71' then 1 else 0 end) as employment_A71,"\ + "(case employment when 'A72' then 1 else 0 end) as employment_A72,"\ + "(case employment when 'A73' then 1 else 0 end) as employment_A73,"\ + "(case employment when 'A74' then 1 else 0 end) as employment_A74,"\ + "(case employment when 'A75' then 1 else 0 end) as employment_A75,"\ + "installment_rate,"\ + "(case marriage_sex when 'A91' then 1 else 0 end) as marriage_sex_A91,"\ + "(case marriage_sex when 'A92' then 1 else 0 end) as marriage_sex_A92,"\ + "(case marriage_sex when 'A93' then 1 else 0 end) as marriage_sex_A93,"\ + "(case marriage_sex when 'A94' then 1 else 0 end) as marriage_sex_A94,"\ + "(case marriage_sex when 'A95' then 1 else 0 end) as marriage_sex_A95,"\ + "(case debtors when 'A101' then 1 else 0 end) as debtors_A101,"\ + "(case debtors when 'A102' then 1 else 0 end) as debtors_A102,"\ + "(case debtors when 'A103' then 1 else 0 end) as debtors_A103,"\ + "residence,"\ + "(case property when 'A121' then 1 else 0 end) as property_A121,"\ + "(case property when 'A122' then 1 else 0 end) as property_A122,"\ + "(case property when 'A123' then 1 else 0 end) as property_A123,"\ + "(case property when 'A124' then 1 else 0 end) as property_A124,"\ + "age,"\ + "(case other_plan when 'A141' then 1 else 0 end) as other_plan_A141,"\ + "(case other_plan when 'A142' then 1 else 0 end) as other_plan_A142,"\ + "(case other_plan when 'A143' then 1 else 0 end) as other_plan_A143,"\ + "(case housing when 'A151' then 1 else 0 end) as housing_A151,"\ + "(case housing when 'A152' then 1 else 0 end) as housing_A152,"\ + "(case housing when 'A153' then 1 else 0 end) as housing_A153,"\ + "number_credits,"\ + "(case job when 'A171' then 1 else 0 end) as job_A171,"\ + "(case job when 'A172' then 1 else 0 end) as job_A172,"\ + "(case job when 'A173' then 1 else 0 end) as job_A173,"\ + "(case job when 'A174' then 1 else 0 end) as job_A174,"\ + "maintenance_num,"\ + "(case telephone when 'A192' then 1 else 0 end) as telephone,"\ + "(case foreign_worker when 'A201' then 1 else 0 end) as foreign_worker,"\ + "class " LABEL_COL_NAME = "class"; FEATURE_COL_NAMES = COL_NAMES.copy() FEATURE_COL_NAMES.remove(LABEL_COL_NAME) NUMERIC_FEATURE_COL_NAMES = [ "duration", "credit_amount", "installment_rate", "residence", "age", "number_credits", "maintenance_num" ] CATEGORY_FEATURE_COL_NAMES = FEATURE_COL_NAMES.copy() for numeric_col in NUMERIC_FEATURE_COL_NAMES : CATEGORY_FEATURE_COL_NAMES.remove(numeric_col) VEC_COL_NAME = "vec"; PREDICTION_COL_NAME = "pred"; PRED_DETAIL_COL_NAME = "predinfo";
#c_0 source = CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_FILE)\ .setSchemaStr(generateSchemaString(COL_NAMES, COL_TYPES))\ .setFieldDelimiter(" "); source\ .lazyPrint(5, "< origin data >")\ .lazyPrintStatistics(); BatchOperator.execute(); splitTrainTestIfNotExist(source, DATA_DIR + TRAIN_FILE, DATA_DIR + TEST_FILE, 0.8);
#c_1 train_data = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TRAIN_FILE)\ .select(CLAUSE_CREATE_FEATURES); test_data = AkSourceBatchOp()\ .setFilePath(DATA_DIR + TEST_FILE)\ .select(CLAUSE_CREATE_FEATURES); new_features = train_data.getColNames() new_features.remove(LABEL_COL_NAME) train_data.lazyPrint(5, "< new features >"); trainer = LogisticRegressionTrainBatchOp()\ .setFeatureCols(new_features)\ .setLabelCol(LABEL_COL_NAME); predictor = LogisticRegressionPredictBatchOp()\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME); train_data.link(trainer); predictor.linkFrom(trainer, test_data); def print_feature_importance(linearModelTrainInfo: LinearModelTrainInfo): df = pd.DataFrame({'name':linearModelTrainInfo.getColNames(), 'value':linearModelTrainInfo.getImportance()}) pd.set_option('display.max_rows', 100) print(df.sort_values(by = ['value'],axis = 0,ascending = False)) pd.set_option('display.max_rows', 10) trainer\ .lazyPrintTrainInfo()\ .lazyCollectTrainInfo(print_feature_importance) predictor.link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("2")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics() ); BatchOperator.execute();
#c_2 train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE).select(CLAUSE_CREATE_FEATURES); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE).select(CLAUSE_CREATE_FEATURES); new_features = train_data.getColNames() new_features.remove(LABEL_COL_NAME) train_data.lazyPrint(5, "< new features >") trainer = LogisticRegressionTrainBatchOp()\ .setFeatureCols(new_features)\ .setLabelCol(LABEL_COL_NAME)\ .setL1(0.01) predictor = LogisticRegressionPredictBatchOp()\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) train_data.link(trainer); predictor.linkFrom(trainer, test_data); trainer\ .lazyPrintTrainInfo()\ .lazyCollectTrainInfo(print_feature_importance) predictor.link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("2")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics() ); BatchOperator.execute()
#c_3_1 train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE); pipeline = Pipeline()\ .add( OneHotEncoder()\ .setSelectedCols(CATEGORY_FEATURE_COL_NAMES)\ .setEncode('VECTOR') )\ .add( VectorAssembler()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setOutputCol(VEC_COL_NAME) )\ .add( LogisticRegression()\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) ); pipeline\ .fit(train_data)\ .transform(test_data)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("2")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics() ); BatchOperator.execute()
#c_3_2 train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE); test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE); pipeline = Pipeline()\ .add( FeatureHasher()\ .setSelectedCols(FEATURE_COL_NAMES)\ .setCategoricalCols(CATEGORY_FEATURE_COL_NAMES)\ .setOutputCol(VEC_COL_NAME) )\ .add( LogisticRegression()\ .setVectorCol(VEC_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) ); pipeline\ .fit(train_data)\ .transform(test_data)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("2")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics() ); BatchOperator.execute()