Alink教程(Python版)

第23章 情感分析

本章包括下面各节:
23.1 使用提供的特征
23.1.1 使用朴素贝叶斯方法
23.1.2 使用逻辑回归算法
23.2 如何提取特征
23.3 构造更多特征
23.4 模型保存与预测
23.4.1 批式/流式预测任务
23.4.2 嵌入式预测

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.html.use_mathjax', False)

DATA_DIR = ROOT_DIR + "sentiment_imdb" + os.sep

ORIGIN_DATA_DIR = DATA_DIR + "aclImdb" + os.sep

TRAIN_FILE = "train.ak"
TEST_FILE = "test.ak"

PIPELINE_MODEL = "pipeline_model.ak"

TXT_COL_NAME = "review"
LABEL_COL_NAME = "label"
VECTOR_COL_NAME = "vec"
PREDICTION_COL_NAME = "pred"
PRED_DETAIL_COL_NAME = "predinfo"

#c_1

train_set = LibSvmSourceBatchOp()\
    .setFilePath(ORIGIN_DATA_DIR + "train" + os.sep + "labeledBow.feat")\
    .setStartIndex(0);

train_set.lazyPrint(1, "train_set");

train_set\
    .groupBy("label", "label, COUNT(label) AS cnt")\
    .orderBy("label", 100)\
    .lazyPrint(-1, "labels of train_set");

test_set = LibSvmSourceBatchOp()\
    .setFilePath(ORIGIN_DATA_DIR + "test" + os.sep + "labeledBow.feat")\
    .setStartIndex(0);

test_set\
    .groupBy("label", "label, COUNT(label) AS cnt")\
    .orderBy("label", 100)\
    .lazyPrint(-1, "labels of test_set");

train_set = train_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, "
                             + "features AS " + VECTOR_COL_NAME);
test_set = test_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, "
                           + "features AS " + VECTOR_COL_NAME);

train_set.lazyPrint(1, "train_set");

NaiveBayesTextClassifier()\
    .setModelType("Multinomial")\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .enableLazyPrintModelInfo()\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("NaiveBayesTextClassifier + Multinomial")
    );
BatchOperator.execute();

Pipeline()\
    .add(
        Binarizer()\
            .setSelectedCol(VECTOR_COL_NAME)\
            .enableLazyPrintTransformData(1, "After Binarizer")
    )\
    .add(
        NaiveBayesTextClassifier()\
            .setModelType("Bernoulli")\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .enableLazyPrintModelInfo()
    )\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("Binarizer + NaiveBayesTextClassifier + Bernoulli")
    );
BatchOperator.execute();

LogisticRegression()\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
    .enableLazyPrintTrainInfo("< LR train info >")\
    .enableLazyPrintModelInfo("< LR model info >")\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression")
    );
BatchOperator.execute();


lr = LogisticRegression()\
    .setVectorCol(VECTOR_COL_NAME)\
    .setLabelCol(LABEL_COL_NAME)\
    .setPredictionCol(PREDICTION_COL_NAME)\
    .setPredictionDetailCol(PRED_DETAIL_COL_NAME);

gridSearch = GridSearchCV()\
    .setEstimator(
        Pipeline().add(lr)
    )\
    .setParamGrid(
        ParamGrid()\
            .addGrid(lr, 'MAX_ITER', [10, 20, 30, 40, 50, 60, 80, 100])
    )\
    .setTuningEvaluator(
        BinaryClassificationTuningEvaluator()\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .setTuningBinaryClassMetric('AUC')
    )\
    .setNumFolds(6)\
    .enableLazyPrintTrainInfo();

bestModel = gridSearch.fit(train_set);

bestModel\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("LogisticRegression")
    );
BatchOperator.execute();

#c_2

if not(os.path.exists(DATA_DIR + TRAIN_FILE)) :
    data_arr = []
    for label in ["pos", "neg"] :
        file_names = os.listdir(ORIGIN_DATA_DIR + "train" + os.sep + label)
        for file_name in file_names :
            f = open(ORIGIN_DATA_DIR + "train" + os.sep + label + os.sep + file_name)
            data_arr.append([label, f.read()])
            f.close() 

    BatchOperator\
        .fromDataframe(
            pd.DataFrame(data_arr), 
            schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string'
        )\
        .link(
            AkSinkBatchOp()\
                .setFilePath(DATA_DIR + TRAIN_FILE)
        );
    BatchOperator.execute();

if not(os.path.exists(DATA_DIR + TEST_FILE)) :
    data_arr = []
    for label in ["pos", "neg"] :
        file_names = os.listdir(ORIGIN_DATA_DIR + "test" + os.sep + label)
        for file_name in file_names :
            f = open(ORIGIN_DATA_DIR + "test" + os.sep + label + os.sep + file_name)
            data_arr.append([label, f.read()])
            f.close() 

    BatchOperator\
        .fromDataframe(
            pd.DataFrame(data_arr), 
            schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string'
        )\
        .link(
            AkSinkBatchOp()\
                .setFilePath(DATA_DIR + TEST_FILE)
        );
    BatchOperator.execute();


train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

train_set.lazyPrint(2);

Pipeline()\
    .add(
        RegexTokenizer()\
            .setPattern("\\W+")\
            .setSelectedCol(TXT_COL_NAME)
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol(VECTOR_COL_NAME)\
            .enableLazyPrintTransformData(1)
    )\
    .add(
        LogisticRegression()\
            .setMaxIter(30)\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)
    )\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("DocCountVectorizer")
    );
BatchOperator.execute();

Pipeline()\
    .add(
        RegexTokenizer()\
            .setPattern("\\W+")\
            .setSelectedCol(TXT_COL_NAME)
    )\
    .add(
        DocHashCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol(VECTOR_COL_NAME)\
            .enableLazyPrintTransformData(1)
    )\
    .add(
        LogisticRegression()\
            .setMaxIter(30)\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)
    )\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("DocHashCountVectorizer")
    );
BatchOperator.execute();
#c_3

useLocalEnv(4)

train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);
test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

Pipeline()\
    .add(
        RegexTokenizer()\
            .setPattern("\\W+")\
            .setSelectedCol(TXT_COL_NAME)
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol(VECTOR_COL_NAME)
    )\
    .add(
        NGram()\
            .setN(2)\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol("v_2")\
            .enableLazyPrintTransformData(1, "2-gram")
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol("v_2")\
            .setOutputCol("v_2")
    )\
    .add(
        VectorAssembler()\
            .setSelectedCols([VECTOR_COL_NAME, "v_2"])\
            .setOutputCol(VECTOR_COL_NAME)
    )\
    .add(
        LogisticRegression()\
            .setMaxIter(30)\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)
    )\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("NGram 2")
    );
BatchOperator.execute();

Pipeline()\
    .add(
        RegexTokenizer()\
            .setPattern("\\W+")\
            .setSelectedCol(TXT_COL_NAME)
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol(VECTOR_COL_NAME)
    )\
    .add(
        NGram()\
            .setN(2)\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol("v_2")
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setSelectedCol("v_2")\
            .setOutputCol("v_2")
    )\
    .add(
        NGram()\
            .setN(3)\
            .setSelectedCol(TXT_COL_NAME)\
            .setOutputCol("v_3")
    )\
    .add(
        DocCountVectorizer()\
            .setFeatureType("WORD_COUNT")\
            .setVocabSize(10000)\
            .setSelectedCol("v_3")\
            .setOutputCol("v_3")
    )\
    .add(
        VectorAssembler()\
            .setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\
            .setOutputCol(VECTOR_COL_NAME)\
    )\
    .add(
        LogisticRegression()\
            .setMaxIter(30)\
            .setVectorCol(VECTOR_COL_NAME)\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionCol(PREDICTION_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)
    )\
    .fit(train_set)\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("NGram 2 and 3")
    );
BatchOperator.execute();
#c_4

train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE);

if not(os.path.exists(DATA_DIR + PIPELINE_MODEL)) :
    Pipeline()\
        .add(
            RegexTokenizer()\
                .setPattern("\\W+")\
                .setSelectedCol(TXT_COL_NAME)
        )\
        .add(
            DocCountVectorizer()\
                .setFeatureType("WORD_COUNT")\
                .setSelectedCol(TXT_COL_NAME)\
                .setOutputCol(VECTOR_COL_NAME)
        )\
        .add(
            NGram()\
                .setN(2)\
                .setSelectedCol(TXT_COL_NAME)\
                .setOutputCol("v_2")
        )\
        .add(
            DocCountVectorizer()\
                .setFeatureType("WORD_COUNT")\
                .setVocabSize(50000)\
                .setSelectedCol("v_2")\
                .setOutputCol("v_2")
        )\
        .add(
            NGram()\
                .setN(3)\
                .setSelectedCol(TXT_COL_NAME)\
                .setOutputCol("v_3")
        )\
        .add(
            DocCountVectorizer()\
                .setFeatureType("WORD_COUNT")\
                .setVocabSize(10000)\
                .setSelectedCol("v_3")\
                .setOutputCol("v_3")
        )\
        .add(
            VectorAssembler()\
                .setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\
                .setOutputCol(VECTOR_COL_NAME)
        )\
        .add(
            LogisticRegression()\
                .setMaxIter(30)\
                .setVectorCol(VECTOR_COL_NAME)\
                .setLabelCol(LABEL_COL_NAME)\
                .setPredictionCol(PREDICTION_COL_NAME)\
                .setPredictionDetailCol(PRED_DETAIL_COL_NAME)
        )\
        .fit(train_set)\
        .save(DATA_DIR + PIPELINE_MODEL);
    BatchOperator.execute();


pipeline_model = PipelineModel.load(DATA_DIR + PIPELINE_MODEL);

test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE);

pipeline_model\
    .transform(test_set)\
    .link(
        EvalBinaryClassBatchOp()\
            .setPositiveLabelValueString("pos")\
            .setLabelCol(LABEL_COL_NAME)\
            .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\
            .lazyPrintMetrics("NGram 2 and 3")
    );
BatchOperator.execute();

test_stream = AkSourceStreamOp().setFilePath(DATA_DIR + TEST_FILE);
pipeline_model\
    .transform(test_stream)\
    .sample(0.001)\
    .select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME + ", " + TXT_COL_NAME)\
    .print();
StreamOperator.execute();

review_str\ = "Oh dear. good cast, but to write and direct is an art and to write wit and direct wit is a bit of a "\ + "task. Even doing good comedy you have to get the timing and moment right. Im not putting it all down "\ + "there were parts where i laughed loud but that was at very few times. The main focus to me was on the "\ + "fast free flowing dialogue, that made some people in the film annoying. It may sound great while "\ + "reading the script in your head but getting that out and to the camera is a different task. And the "\ + "hand held camera work does give energy to few parts of the film. Overall direction was good but the "\ + "script was not all that to me, but I'm sure you was reading the script in your head it would sound good"\ + ". Sorry."; local_predictor = pipeline_model.collectLocalPredictor("review string"); print(local_predictor.getOutputColNames()); pred_row = local_predictor.map([review_str]); print(pred_row[4]); local_predictor_2 = LocalPredictor(DATA_DIR + PIPELINE_MODEL, "review string"); print(local_predictor_2.getOutputColNames()); pred_row = local_predictor_2.map([review_str]); print(pred_row[4]);