本章包括下面各节:
23.1 使用提供的特征
23.1.1 使用朴素贝叶斯方法
23.1.2 使用逻辑回归算法
23.2 如何提取特征
23.3 构造更多特征
23.4 模型保存与预测
23.4.1 批式/流式预测任务
23.4.2 嵌入式预测
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.html.use_mathjax', False)
DATA_DIR = ROOT_DIR + "sentiment_imdb" + os.sep
ORIGIN_DATA_DIR = DATA_DIR + "aclImdb" + os.sep
TRAIN_FILE = "train.ak"
TEST_FILE = "test.ak"
PIPELINE_MODEL = "pipeline_model.ak"
TXT_COL_NAME = "review"
LABEL_COL_NAME = "label"
VECTOR_COL_NAME = "vec"
PREDICTION_COL_NAME = "pred"
PRED_DETAIL_COL_NAME = "predinfo"
#c_1
train_set = LibSvmSourceBatchOp()\
.setFilePath(ORIGIN_DATA_DIR + "train" + os.sep + "labeledBow.feat")\
.setStartIndex(0);
train_set.lazyPrint(1, "train_set");
train_set\
.groupBy("label", "label, COUNT(label) AS cnt")\
.orderBy("label", 100)\
.lazyPrint(-1, "labels of train_set");
test_set = LibSvmSourceBatchOp()\
.setFilePath(ORIGIN_DATA_DIR + "test" + os.sep + "labeledBow.feat")\
.setStartIndex(0);
test_set\
.groupBy("label", "label, COUNT(label) AS cnt")\
.orderBy("label", 100)\
.lazyPrint(-1, "labels of test_set");
train_set = train_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, "
+ "features AS " + VECTOR_COL_NAME);
test_set = test_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, "
+ "features AS " + VECTOR_COL_NAME);
train_set.lazyPrint(1, "train_set");
NaiveBayesTextClassifier()\
.setModelType("Multinomial")\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.enableLazyPrintModelInfo()\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("NaiveBayesTextClassifier + Multinomial")
);
BatchOperator.execute();
Pipeline()\
.add(
Binarizer()\
.setSelectedCol(VECTOR_COL_NAME)\
.enableLazyPrintTransformData(1, "After Binarizer")
)\
.add(
NaiveBayesTextClassifier()\
.setModelType("Bernoulli")\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.enableLazyPrintModelInfo()
)\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("Binarizer + NaiveBayesTextClassifier + Bernoulli")
);
BatchOperator.execute();
LogisticRegression()\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.enableLazyPrintTrainInfo("< LR train info >")\
.enableLazyPrintModelInfo("< LR model info >")\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("LogisticRegression")
);
BatchOperator.execute();
lr = LogisticRegression()\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME);
gridSearch = GridSearchCV()\
.setEstimator(
Pipeline().add(lr)
)\
.setParamGrid(
ParamGrid()\
.addGrid(lr, 'MAX_ITER', [10, 20, 30, 40, 50, 60, 80, 100])
)\
.setTuningEvaluator(
BinaryClassificationTuningEvaluator()\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.setTuningBinaryClassMetric('AUC')
)\
.setNumFolds(6)\
.enableLazyPrintTrainInfo();
bestModel = gridSearch.fit(train_set);
bestModel\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("LogisticRegression")
);
BatchOperator.execute();
#c_2
if not(os.path.exists(DATA_DIR + TRAIN_FILE)) :
data_arr = []
for label in ["pos", "neg"] :
file_names = os.listdir(ORIGIN_DATA_DIR + "train" + os.sep + label)
for file_name in file_names :
f = open(ORIGIN_DATA_DIR + "train" + os.sep + label + os.sep + file_name)
data_arr.append([label, f.read()])
f.close()
BatchOperator\
.fromDataframe(
pd.DataFrame(data_arr),
schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string'
)\
.link(
AkSinkBatchOp()\
.setFilePath(DATA_DIR + TRAIN_FILE)
);
BatchOperator.execute();
if not(os.path.exists(DATA_DIR + TEST_FILE)) :
data_arr = []
for label in ["pos", "neg"] :
file_names = os.listdir(ORIGIN_DATA_DIR + "test" + os.sep + label)
for file_name in file_names :
f = open(ORIGIN_DATA_DIR + "test" + os.sep + label + os.sep + file_name)
data_arr.append([label, f.read()])
f.close()
BatchOperator\
.fromDataframe(
pd.DataFrame(data_arr),
schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string'
)\
.link(
AkSinkBatchOp()\
.setFilePath(DATA_DIR + TEST_FILE)
);
BatchOperator.execute();
train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
train_set.lazyPrint(2);
Pipeline()\
.add(
RegexTokenizer()\
.setPattern("\\W+")\
.setSelectedCol(TXT_COL_NAME)
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol(VECTOR_COL_NAME)\
.enableLazyPrintTransformData(1)
)\
.add(
LogisticRegression()\
.setMaxIter(30)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
)\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("DocCountVectorizer")
);
BatchOperator.execute();
Pipeline()\
.add(
RegexTokenizer()\
.setPattern("\\W+")\
.setSelectedCol(TXT_COL_NAME)
)\
.add(
DocHashCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol(VECTOR_COL_NAME)\
.enableLazyPrintTransformData(1)
)\
.add(
LogisticRegression()\
.setMaxIter(30)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
)\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("DocHashCountVectorizer")
);
BatchOperator.execute();
#c_3
useLocalEnv(4)
train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
Pipeline()\
.add(
RegexTokenizer()\
.setPattern("\\W+")\
.setSelectedCol(TXT_COL_NAME)
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol(VECTOR_COL_NAME)
)\
.add(
NGram()\
.setN(2)\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol("v_2")\
.enableLazyPrintTransformData(1, "2-gram")
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol("v_2")\
.setOutputCol("v_2")
)\
.add(
VectorAssembler()\
.setSelectedCols([VECTOR_COL_NAME, "v_2"])\
.setOutputCol(VECTOR_COL_NAME)
)\
.add(
LogisticRegression()\
.setMaxIter(30)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
)\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("NGram 2")
);
BatchOperator.execute();
Pipeline()\
.add(
RegexTokenizer()\
.setPattern("\\W+")\
.setSelectedCol(TXT_COL_NAME)
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol(VECTOR_COL_NAME)
)\
.add(
NGram()\
.setN(2)\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol("v_2")
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol("v_2")\
.setOutputCol("v_2")
)\
.add(
NGram()\
.setN(3)\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol("v_3")
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setVocabSize(10000)\
.setSelectedCol("v_3")\
.setOutputCol("v_3")
)\
.add(
VectorAssembler()\
.setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\
.setOutputCol(VECTOR_COL_NAME)\
)\
.add(
LogisticRegression()\
.setMaxIter(30)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
)\
.fit(train_set)\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("NGram 2 and 3")
);
BatchOperator.execute();
#c_4
train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
if not(os.path.exists(DATA_DIR + PIPELINE_MODEL)) :
Pipeline()\
.add(
RegexTokenizer()\
.setPattern("\\W+")\
.setSelectedCol(TXT_COL_NAME)
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol(VECTOR_COL_NAME)
)\
.add(
NGram()\
.setN(2)\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol("v_2")
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setVocabSize(50000)\
.setSelectedCol("v_2")\
.setOutputCol("v_2")
)\
.add(
NGram()\
.setN(3)\
.setSelectedCol(TXT_COL_NAME)\
.setOutputCol("v_3")
)\
.add(
DocCountVectorizer()\
.setFeatureType("WORD_COUNT")\
.setVocabSize(10000)\
.setSelectedCol("v_3")\
.setOutputCol("v_3")
)\
.add(
VectorAssembler()\
.setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\
.setOutputCol(VECTOR_COL_NAME)
)\
.add(
LogisticRegression()\
.setMaxIter(30)\
.setVectorCol(VECTOR_COL_NAME)\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionCol(PREDICTION_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)
)\
.fit(train_set)\
.save(DATA_DIR + PIPELINE_MODEL);
BatchOperator.execute();
pipeline_model = PipelineModel.load(DATA_DIR + PIPELINE_MODEL);
test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);
pipeline_model\
.transform(test_set)\
.link(
EvalBinaryClassBatchOp()\
.setPositiveLabelValueString("pos")\
.setLabelCol(LABEL_COL_NAME)\
.setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
.lazyPrintMetrics("NGram 2 and 3")
);
BatchOperator.execute();
test_stream = AkSourceStreamOp().setFilePath(DATA_DIR + TEST_FILE);
pipeline_model\
.transform(test_stream)\
.sample(0.001)\
.select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME + ", " + TXT_COL_NAME)\
.print();
StreamOperator.execute();
review_str\ = "Oh dear. good cast, but to write and direct is an art and to write wit and direct wit is a bit of a "\ + "task. Even doing good comedy you have to get the timing and moment right. Im not putting it all down "\ + "there were parts where i laughed loud but that was at very few times. The main focus to me was on the "\ + "fast free flowing dialogue, that made some people in the film annoying. It may sound great while "\ + "reading the script in your head but getting that out and to the camera is a different task. And the "\ + "hand held camera work does give energy to few parts of the film. Overall direction was good but the "\ + "script was not all that to me, but I'm sure you was reading the script in your head it would sound good"\ + ". Sorry."; local_predictor = pipeline_model.collectLocalPredictor("review string"); print(local_predictor.getOutputColNames()); pred_row = local_predictor.map([review_str]); print(pred_row[4]); local_predictor_2 = LocalPredictor(DATA_DIR + PIPELINE_MODEL, "review string"); print(local_predictor_2.getOutputColNames()); pred_row = local_predictor_2.map([review_str]); print(pred_row[4]);
```python
```