Alink教程(Python版)

第8章 线性二分类模型

本章包括下面各节:
8.1 线性模型的基础知识
8.1.1 损失函数
8.1.2 经验风险函数与结构风险函数
8.1.3 线性模型与损失函数
8.1.4 逻辑回归与线性支持向量机(Linear SVM)
8.2 二分类评估方法
8.2.1 基本指标
8.2.2 综合指标
8.2.3 评估曲线
8.3 数据探索
8.3.1 基本统计
8.3.2 相关性
8.4 训练集和测试集
8.5 逻辑回归模型
8.6 线性支持向量机模型
8.7 模型评估
8.8 特征的多项式扩展
8.9 因子分解机

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)

DATA_DIR = ROOT_DIR + "banknote" + os.sep

ORIGIN_FILE = "data_banknote_authentication.txt"

SCHEMA_STRING = "variance double, skewness double, kurtosis double, entropy double, class int"

TRAIN_FILE = "train.ak"
TEST_FILE = "test.ak"
LR_PRED_FILE = "lr_pred.ak"
SVM_PRED_FILE = "svm_pred.ak"

FEATURE_COL_NAMES = ["variance", "skewness", "kurtosis", "entropy"]
LABEL_COL_NAME = "class"

VEC_COL_NAME = "vec"

PREDICTION_COL_NAME = "pred"
PRED_DETAIL_COL_NAME = "predinfo"
#c_1
source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)

print("column names of source:")
print(source.getColNames())

print("column types of source:")
print(source.getColTypes())

source.firstN(5).print()
#c_1_1
source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)

summary = SummarizerBatchOp().linkFrom(source).collectSummary()
print("Count of data set : " + str(summary.count()))
print("Max value of entropy : " + str(summary.max("entropy")))
print(summary)

source.link(
    SummarizerBatchOp()\
        .lazyCollectSummary(
            lambda tableSummary:(
                print("Count of data set : " + str(tableSummary.count())),
                print("Max value of entropy : " + str(tableSummary.max("entropy"))),
                print(tableSummary)
            )
        )
)

source.link(
    SummarizerBatchOp().lazyPrintSummary()
)

source\
    .lazyPrintStatistics("<- origin data ->")\
    .firstN(5)\
    .lazyPrintStatistics("<- first 5 data ->")\
    .print()
#c_1_2
source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)

correlation = CorrelationBatchOp().linkFrom(source).collectCorrelation()
colNames = correlation.getColNames()
print("Correlation of " + colNames[0] + " with " + colNames[1] 
      + " is " + str(correlation.getCorrelation()[0][1]));
print(correlation.getCorrelationMatrix())


def print_correlation_info(correlationResult: CorrelationResult):
    colNames = correlationResult.getColNames()
    print("Correlation of " + colNames[0] + " with " + colNames[1]
          + " is " + str(correlationResult.getCorrelation()[0][1]))
    print(correlationResult.getCorrelationMatrix())

    
source\
    .link(
        CorrelationBatchOp()\
            .lazyCollectCorrelation(print_correlation_info)
    )

source.link(
    CorrelationBatchOp().lazyPrintCorrelation("< Pearson Correlation >")
)

source.link(
    CorrelationBatchOp()\
        .setMethod("SPEARMAN")\
        .lazyPrintCorrelation("< Spearman Correlation >")
)

BatchOperator.execute()
import matplotlib.pyplot as plt
import seaborn as sns

df_banknote = source.collectToDataframe()

sns.pairplot(df_banknote, vars = df_banknote.columns[:-1], hue = 'class')

plt.show()
from sklearn.manifold import TSNE

tsne = TSNE(n_components = 2, learning_rate = 100).fit_transform(df_banknote.iloc[:, 0:4])

plt.scatter(tsne[:, 0], tsne[:, 1], c = df_banknote.iloc[:, 4])
plt.colorbar()
plt.show()
#c_2
source = CsvSourceBatchOp()\
    .setFilePath(DATA_DIR + ORIGIN_FILE)\
    .setSchemaStr(SCHEMA_STRING)

splitTrainTestIfNotExist(source, DATA_DIR + TRAIN_FILE, DATA_DIR + TEST_FILE, 0.8)
#c_3
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE)
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE)

lrTrainer = LogisticRegressionTrainBatchOp()\
    .setFeatureCols(FEATURE_COL_NAMES)\
    .setLabelCol(LABEL_COL_NAME)

lrPredictor = LogisticRegressionPredictBatchOp()\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)

train_data.link(lrTrainer)

lrPredictor.linkFrom(lrTrainer, test_data)

lrTrainer.lazyPrintTrainInfo().lazyPrintModelInfo()

lrPredictor\
    .lazyPrint(5, "< Prediction >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + LR_PRED_FILE)\
            .setOverwriteSink(True)
    )

BatchOperator.execute()
#c_4
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE)
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE)

svmTrainer = LinearSvmTrainBatchOp()\
    .setFeatureCols(FEATURE_COL_NAMES)\
    .setLabelCol(LABEL_COL_NAME)

svmPredictor = LinearSvmPredictBatchOp()\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)

train_data.link(svmTrainer)

svmPredictor.linkFrom(svmTrainer, test_data)

svmTrainer.lazyPrintTrainInfo().lazyPrintModelInfo()

svmPredictor\
    .lazyPrint(5, "< Prediction >")\
    .link(
        AkSinkBatchOp()\
            .setFilePath(DATA_DIR + SVM_PRED_FILE)\
            .setOverwriteSink(True)
    )

BatchOperator.execute()
#c_5
lr_metrics = EvalBinaryClassBatchOp()\
    .setPositiveLabelValueString("1")\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .linkFrom(
        AkSourceBatchOp().setFilePath(DATA_DIR + LR_PRED_FILE)
    )\
    .collectMetrics()

print("< LR >")
print("AUC : " + str(lr_metrics.getAuc()) 
      + "\t Accuracy : " + str(lr_metrics.getAccuracy()) 
      + "\t Precision : " + str(lr_metrics.getPrecision())
      + "\t Recall : " + str(lr_metrics.getRecall())
     )

print(lr_metrics)

lr_metrics.saveRocCurveAsImage(DATA_DIR + "lr_roc.jpg", True)
lr_metrics.saveRecallPrecisionCurveAsImage(DATA_DIR + "lr_recallprec.jpg", True)
lr_metrics.saveLiftChartAsImage(DATA_DIR + "lr_lift.jpg", True)
lr_metrics.saveKSAsImage(DATA_DIR + "lr_ks.jpg", True)

AkSourceBatchOp()\
    .setFilePath(DATA_DIR + SVM_PRED_FILE)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("1")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics()
            .lazyCollectMetrics(
                lambda binaryClassMetrics:(
                    binaryClassMetrics.saveRocCurveAsImage(
                        DATA_DIR + "svm_roc.jpg", True),
                    binaryClassMetrics.saveRecallPrecisionCurveAsImage(
                        DATA_DIR + "svm_recallprec.jpg", True),
                    binaryClassMetrics.saveLiftChartAsImage(
                        DATA_DIR + "svm_lift.jpg", True),
                    binaryClassMetrics.saveKSAsImage(
                        DATA_DIR + "svm_ks.jpg", True)
                )
            )
    )

BatchOperator.execute()
#c_6
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

featureExpand = Pipeline()\
    .add(
        VectorAssembler()\
            .setSelectedCols(FEATURE_COL_NAMES)\
            .setOutputCol(VEC_COL_NAME + "_0")
    )\
    .add(
        VectorPolynomialExpand()\
            .setSelectedCol(VEC_COL_NAME + "_0")\
            .setOutputCol(VEC_COL_NAME)\
            .setDegree(2)
    )\
    .fit(train_data)

train_data = featureExpand.transform(train_data);
test_data = featureExpand.transform(test_data);

train_data.lazyPrint(1);

LinearSvm()\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
        .setPositiveLabelValueString("1")\
        .setLabelCol(LABEL_COL_NAME)\
        .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
        .lazyPrintMetrics("LinearSVM")
    )

LogisticRegression()\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("1")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression")
    )

LogisticRegression()\
    .setOptimMethod("Newton")\
    .setVectorCol(VEC_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("1")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression + OptimMethod.Newton")
    )

BatchOperator.execute()
#c_7
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

FmClassifier()\
    .setNumEpochs(10)\
    .setLearnRate(0.5)\
    .setNumFactor(2)\
    .setFeatureCols(FEATURE_COL_NAMES)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .enableLazyPrintTrainInfo()\
    .enableLazyPrintModelInfo()\
    .fit(train_data)\
    .transform(test_data)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("1")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("FM")
    )

BatchOperator.execute()