本章包括下面各节:
16.1 回归模型的评估指标
16.2 数据探索
16.3 线性回归
16.4 决策树与随机森林
16.5 GBDT
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
DATA_DIR = ROOT_DIR + "wine" + os.sep
ORIGIN_FILE = "winequality-white.csv";
TRAIN_FILE = "train.ak";
TEST_FILE = "test.ak";
COL_NAMES = [
"fixedAcidity", "volatileAcidity", "citricAcid", "residualSugar", "chlorides",
"freeSulfurDioxide", "totalSulfurDioxide", "density", "pH", "sulphates",
"alcohol", "quality"
]
COL_TYPES = [
"double", "double", "double", "double", "double",
"double", "double", "double", "double", "double",
"double", "double"
]
FEATURE_COL_NAMES = COL_NAMES.copy()
FEATURE_COL_NAMES.remove("quality")
LABEL_COL_NAME = "quality";
PREDICTION_COL_NAME = "pred";
#c_1
source = CsvSourceBatchOp()\
.setFilePath(DATA_DIR + ORIGIN_FILE)\
.setSchemaStr(generateSchemaString(COL_NAMES, COL_TYPES))\
.setFieldDelimiter(";")\
.setIgnoreFirstLine(True);
source.lazyPrint(5);
source.link(CorrelationBatchOp().lazyPrintCorrelation());
import matplotlib.pyplot as plt
import seaborn as sns
corr = source.collectToDataframe().corr()
plt.figure(figsize=(15, 5))
sns.heatmap(corr, annot = True, cmap="Greys") ;
source\
.groupBy(LABEL_COL_NAME, LABEL_COL_NAME + ", COUNT(*) AS cnt")\
.orderBy(LABEL_COL_NAME, 100)\
.lazyPrint(-1);
BatchOperator.execute();
splitTrainTestIfNotExist(source, DATA_DIR + TRAIN_FILE, DATA_DIR + TEST_FILE, 0.8);
#c_2
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
LinearRegression()\
.setFeatureCols(FEATURE_COL_NAMES)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.enableLazyPrintTrainInfo()\
.enableLazyPrintModelInfo()\
.fit(train_data)\
.transform(test_data)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("LinearRegression")
);
LassoRegression()\
.setLambda(0.05)\
.setFeatureCols(FEATURE_COL_NAMES)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.enableLazyPrintTrainInfo()\
.enableLazyPrintModelInfo("< LASSO model >")\
.fit(train_data)\
.transform(test_data)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("LassoRegression")
);
BatchOperator.execute();
#c_3
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
DecisionTreeRegressor()\
.setFeatureCols(FEATURE_COL_NAMES)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("DecisionTreeRegressor")
);
BatchOperator.execute();
for numTrees in [2, 4, 8, 16, 32, 64, 128] :
RandomForestRegressor()\
.setNumTrees(numTrees)\
.setFeatureCols(FEATURE_COL_NAMES)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("RandomForestRegressor - " + str(numTrees))
)
BatchOperator.execute();
#c_4
train_data = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_data = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
for numTrees in [16, 32, 64, 128, 256, 512] :
GbdtRegressor()\
.setLearningRate(0.05)\
.setMaxLeaves(256)\
.setFeatureSubsamplingRatio(0.3)\
.setMinSamplesPerLeaf(2)\
.setMaxDepth(100)\
.setNumTrees(numTrees)\
.setFeatureCols(FEATURE_COL_NAMES)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.fit(train_data)\
.transform(test_data)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.lazyPrintMetrics("GbdtRegressor - " + str(numTrees))
);
BatchOperator.execute()
```python
```