本章包括下面各节:
23.1 使用提供的特征
23.1.1 使用朴素贝叶斯方法
23.1.2 使用逻辑回归算法
23.2 如何提取特征
23.3 构造更多特征
23.4 模型保存与预测
23.4.1 批式/流式预测任务
23.4.2 嵌入式预测
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) pd.set_option('display.html.use_mathjax', False) DATA_DIR = ROOT_DIR + "sentiment_imdb" + os.sep ORIGIN_DATA_DIR = DATA_DIR + "aclImdb" + os.sep TRAIN_FILE = "train.ak" TEST_FILE = "test.ak" PIPELINE_MODEL = "pipeline_model.ak" TXT_COL_NAME = "review" LABEL_COL_NAME = "label" VECTOR_COL_NAME = "vec" PREDICTION_COL_NAME = "pred" PRED_DETAIL_COL_NAME = "predinfo"
#c_1 train_set = LibSvmSourceBatchOp()\ .setFilePath(ORIGIN_DATA_DIR + "train" + os.sep + "labeledBow.feat")\ .setStartIndex(0); train_set.lazyPrint(1, "train_set"); train_set\ .groupBy("label", "label, COUNT(label) AS cnt")\ .orderBy("label", 100)\ .lazyPrint(-1, "labels of train_set"); test_set = LibSvmSourceBatchOp()\ .setFilePath(ORIGIN_DATA_DIR + "test" + os.sep + "labeledBow.feat")\ .setStartIndex(0); test_set\ .groupBy("label", "label, COUNT(label) AS cnt")\ .orderBy("label", 100)\ .lazyPrint(-1, "labels of test_set"); train_set = train_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, " + "features AS " + VECTOR_COL_NAME); test_set = test_set.select("CASE WHEN label>5 THEN 'pos' ELSE 'neg' END AS label, " + "features AS " + VECTOR_COL_NAME); train_set.lazyPrint(1, "train_set"); NaiveBayesTextClassifier()\ .setModelType("Multinomial")\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .enableLazyPrintModelInfo()\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("NaiveBayesTextClassifier + Multinomial") ); BatchOperator.execute(); Pipeline()\ .add( Binarizer()\ .setSelectedCol(VECTOR_COL_NAME)\ .enableLazyPrintTransformData(1, "After Binarizer") )\ .add( NaiveBayesTextClassifier()\ .setModelType("Bernoulli")\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .enableLazyPrintModelInfo() )\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("Binarizer + NaiveBayesTextClassifier + Bernoulli") ); BatchOperator.execute(); LogisticRegression()\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .enableLazyPrintTrainInfo("< LR train info >")\ .enableLazyPrintModelInfo("< LR model info >")\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("LogisticRegression") ); BatchOperator.execute(); lr = LogisticRegression()\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME); gridSearch = GridSearchCV()\ .setEstimator( Pipeline().add(lr) )\ .setParamGrid( ParamGrid()\ .addGrid(lr, 'MAX_ITER', [10, 20, 30, 40, 50, 60, 80, 100]) )\ .setTuningEvaluator( BinaryClassificationTuningEvaluator()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .setTuningBinaryClassMetric('AUC') )\ .setNumFolds(6)\ .enableLazyPrintTrainInfo(); bestModel = gridSearch.fit(train_set); bestModel\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("LogisticRegression") ); BatchOperator.execute();
#c_2 if not(os.path.exists(DATA_DIR + TRAIN_FILE)) : data_arr = [] for label in ["pos", "neg"] : file_names = os.listdir(ORIGIN_DATA_DIR + "train" + os.sep + label) for file_name in file_names : f = open(ORIGIN_DATA_DIR + "train" + os.sep + label + os.sep + file_name) data_arr.append([label, f.read()]) f.close() BatchOperator\ .fromDataframe( pd.DataFrame(data_arr), schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string' )\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TRAIN_FILE) ); BatchOperator.execute(); if not(os.path.exists(DATA_DIR + TEST_FILE)) : data_arr = [] for label in ["pos", "neg"] : file_names = os.listdir(ORIGIN_DATA_DIR + "test" + os.sep + label) for file_name in file_names : f = open(ORIGIN_DATA_DIR + "test" + os.sep + label + os.sep + file_name) data_arr.append([label, f.read()]) f.close() BatchOperator\ .fromDataframe( pd.DataFrame(data_arr), schemaStr= LABEL_COL_NAME + ' string, ' + TXT_COL_NAME + ' string' )\ .link( AkSinkBatchOp()\ .setFilePath(DATA_DIR + TEST_FILE) ); BatchOperator.execute(); train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE); test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE); train_set.lazyPrint(2); Pipeline()\ .add( RegexTokenizer()\ .setPattern("\\W+")\ .setSelectedCol(TXT_COL_NAME) )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol(VECTOR_COL_NAME)\ .enableLazyPrintTransformData(1) )\ .add( LogisticRegression()\ .setMaxIter(30)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) )\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("DocCountVectorizer") ); BatchOperator.execute(); Pipeline()\ .add( RegexTokenizer()\ .setPattern("\\W+")\ .setSelectedCol(TXT_COL_NAME) )\ .add( DocHashCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol(VECTOR_COL_NAME)\ .enableLazyPrintTransformData(1) )\ .add( LogisticRegression()\ .setMaxIter(30)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) )\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("DocHashCountVectorizer") ); BatchOperator.execute();
#c_3 useLocalEnv(4) train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE); test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE); Pipeline()\ .add( RegexTokenizer()\ .setPattern("\\W+")\ .setSelectedCol(TXT_COL_NAME) )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol(VECTOR_COL_NAME) )\ .add( NGram()\ .setN(2)\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("v_2")\ .enableLazyPrintTransformData(1, "2-gram") )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol("v_2")\ .setOutputCol("v_2") )\ .add( VectorAssembler()\ .setSelectedCols([VECTOR_COL_NAME, "v_2"])\ .setOutputCol(VECTOR_COL_NAME) )\ .add( LogisticRegression()\ .setMaxIter(30)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) )\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("NGram 2") ); BatchOperator.execute(); Pipeline()\ .add( RegexTokenizer()\ .setPattern("\\W+")\ .setSelectedCol(TXT_COL_NAME) )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol(VECTOR_COL_NAME) )\ .add( NGram()\ .setN(2)\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("v_2") )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol("v_2")\ .setOutputCol("v_2") )\ .add( NGram()\ .setN(3)\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("v_3") )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setVocabSize(10000)\ .setSelectedCol("v_3")\ .setOutputCol("v_3") )\ .add( VectorAssembler()\ .setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\ .setOutputCol(VECTOR_COL_NAME)\ )\ .add( LogisticRegression()\ .setMaxIter(30)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) )\ .fit(train_set)\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("NGram 2 and 3") ); BatchOperator.execute();
#c_4 train_set = AkSourceBatchOp().setFilePath(DATA_DIR + TRAIN_FILE); if not(os.path.exists(DATA_DIR + PIPELINE_MODEL)) : Pipeline()\ .add( RegexTokenizer()\ .setPattern("\\W+")\ .setSelectedCol(TXT_COL_NAME) )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol(VECTOR_COL_NAME) )\ .add( NGram()\ .setN(2)\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("v_2") )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setVocabSize(50000)\ .setSelectedCol("v_2")\ .setOutputCol("v_2") )\ .add( NGram()\ .setN(3)\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("v_3") )\ .add( DocCountVectorizer()\ .setFeatureType("WORD_COUNT")\ .setVocabSize(10000)\ .setSelectedCol("v_3")\ .setOutputCol("v_3") )\ .add( VectorAssembler()\ .setSelectedCols([VECTOR_COL_NAME, "v_2", "v_3"])\ .setOutputCol(VECTOR_COL_NAME) )\ .add( LogisticRegression()\ .setMaxIter(30)\ .setVectorCol(VECTOR_COL_NAME)\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME) )\ .fit(train_set)\ .save(DATA_DIR + PIPELINE_MODEL); BatchOperator.execute(); pipeline_model = PipelineModel.load(DATA_DIR + PIPELINE_MODEL); test_set = AkSourceBatchOp().setFilePath(DATA_DIR + TEST_FILE); pipeline_model\ .transform(test_set)\ .link( EvalBinaryClassBatchOp()\ .setPositiveLabelValueString("pos")\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionDetailCol(PRED_DETAIL_COL_NAME)\ .lazyPrintMetrics("NGram 2 and 3") ); BatchOperator.execute(); test_stream = AkSourceStreamOp().setFilePath(DATA_DIR + TEST_FILE); pipeline_model\ .transform(test_stream)\ .sample(0.001)\ .select(PREDICTION_COL_NAME + ", " + LABEL_COL_NAME + ", " + TXT_COL_NAME)\ .print(); StreamOperator.execute();
review_str\ = "Oh dear. good cast, but to write and direct is an art and to write wit and direct wit is a bit of a "\ + "task. Even doing good comedy you have to get the timing and moment right. Im not putting it all down "\ + "there were parts where i laughed loud but that was at very few times. The main focus to me was on the "\ + "fast free flowing dialogue, that made some people in the film annoying. It may sound great while "\ + "reading the script in your head but getting that out and to the camera is a different task. And the "\ + "hand held camera work does give energy to few parts of the film. Overall direction was good but the "\ + "script was not all that to me, but I'm sure you was reading the script in your head it would sound good"\ + ". Sorry."; local_predictor = pipeline_model.collectLocalPredictor("review string"); print(local_predictor.getOutputColNames()); pred_row = local_predictor.map([review_str]); print(pred_row[4]); local_predictor_2 = LocalPredictor(DATA_DIR + PIPELINE_MODEL, "review string"); print(local_predictor_2.getOutputColNames()); pred_row = local_predictor_2.map([review_str]); print(pred_row[4]);