本章包括下面各节:
21.1 数据探索
21.2 分词
21.2.1 中文分词
21.2.2 Tokenizer和RegexTokenizer
21.3 词频统计
21.4 单词的区分度
21.5 抽取关键词
21.5.1 原理简介
21.5.2 示例
21.6 文本相似度
21.6.1 文本成对比较
21.6.2 最相似的TopN
21.7 主题模型
21.7.1 LDA模型
21.7.2 新闻的主题模型
21.7.3 主题与原始分类的对比
21.8 组件使用小结
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) pd.set_option('display.html.use_mathjax', False) DATA_DIR = ROOT_DIR + "news_toutiao" + os.sep ORIGIN_TRAIN_FILE = "toutiao_cat_data.txt"; FIELD_DELIMITER = "_!_"; SNN_MODEL_FILE = "snn_model.ak"; APPROX_SNN_MODEL_FILE = "approx_snn_model.ak"; LDA_MODEL_FILE = "lda_model.ak"; LDA_PWZ_FILE = "lda_pwz.ak"; SCHEMA_STRING = "id string, category_code int, category_name string,"\ + " news_title string, keywords string"; TXT_COL_NAME = "news_title"; LABEL_COL_NAME = "category_name"; PREDICTION_COL_NAME = "pred"; def getSource() : return CsvSourceBatchOp()\ .setFilePath(DATA_DIR + ORIGIN_TRAIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .setFieldDelimiter(FIELD_DELIMITER); def getStreamSource() : return CsvSourceStreamOp()\ .setFilePath(DATA_DIR + ORIGIN_TRAIN_FILE)\ .setSchemaStr(SCHEMA_STRING)\ .setFieldDelimiter(FIELD_DELIMITER);
#c_1 getSource()\ .lazyPrint(10)\ .lazyPrintStatistics(); getSource()\ .groupBy("category_code, category_name", "category_code, category_name, COUNT(category_name) AS cnt")\ .orderBy("category_code", 100)\ .lazyPrint(-1); BatchOperator.execute();
#c_2_1 df = pd.DataFrame( [ "大家好!我在学习、使用Alink。", "【流式计算和批式计算】、(Alink)", "《人工智能》,“机器学习”?2020" ] ) source = BatchOperator.fromDataframe(df, schemaStr='sentence string') source\ .link( SegmentBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("words") )\ .print(); source\ .link( SegmentBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("words")\ .setUserDefinedDict(["流式计算", "机器学习"]) )\ .print(); source\ .link( SegmentBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("words")\ .setUserDefinedDict(["流式计算", "机器学习"]) )\ .link( StopWordsRemoverBatchOp()\ .setSelectedCol("words")\ .setOutputCol("left_words") )\ .print(); source\ .link( SegmentBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("words")\ .setUserDefinedDict(["流式计算", "机器学习"]) )\ .link( StopWordsRemoverBatchOp()\ .setSelectedCol("words")\ .setOutputCol("left_words")\ .setStopWords(["计算", "2020"]) )\ .print(); getSource()\ .select("news_title")\ .link( SegmentBatchOp()\ .setSelectedCol("news_title")\ .setOutputCol("segmented_title") )\ .firstN(10)\ .print();
#c_2_2 df = pd.DataFrame( [ "Hello! This is Alink!", "Flink,Alink..AI#ML@2020" ] ) source = BatchOperator.fromDataframe(df, schemaStr='sentence string') source\ .link( TokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens") )\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("regex_tokens") )\ .lazyPrint(-1); source\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens_1")\ .setPattern("\\W+") )\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens_2")\ .setGaps(False)\ .setPattern("\\w+") )\ .lazyPrint(-1); source\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens_1")\ .setPattern("\\W+") )\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens_2")\ .setPattern("\\W+")\ .setToLowerCase(False) )\ .lazyPrint(-1); source\ .link( RegexTokenizerBatchOp()\ .setSelectedCol("sentence")\ .setOutputCol("tokens")\ .setPattern("\\W+") )\ .link( StopWordsRemoverBatchOp()\ .setSelectedCol("tokens")\ .setOutputCol("left_tokens") )\ .lazyPrint(-1); BatchOperator.execute();
#c_3 titles = getSource()\ .firstN(10)\ .select("news_title")\ .link( SegmentBatchOp()\ .setSelectedCol("news_title")\ .setOutputCol("segmented_title")\ .setReservedCols(None) ); titles\ .link( WordCountBatchOp()\ .setSelectedCol("segmented_title") )\ .orderBy("cnt", 100, order = 'desc')\ .lazyPrint(-1, "WordCount"); titles\ .link( DocWordCountBatchOp()\ .setDocIdCol("segmented_title")\ .setContentCol("segmented_title") )\ .lazyPrint(-1, "DocWordCount"); BatchOperator.execute()
#c_4 titles = getSource()\ .firstN(10)\ .select("news_title")\ .link( SegmentBatchOp()\ .setSelectedCol("news_title")\ .setOutputCol("segmented_title")\ .setReservedCols(None) ); for featureType in ["WORD_COUNT", "BINARY", "TF", "IDF", "TF_IDF"] : DocCountVectorizer()\ .setFeatureType(featureType)\ .setSelectedCol("segmented_title")\ .setOutputCol("vec")\ .fit(titles)\ .transform(titles)\ .lazyPrint(-1, "DocCountVectorizer + " + featureType); for featureType in ["WORD_COUNT", "BINARY", "TF", "IDF", "TF_IDF"] : DocHashCountVectorizer()\ .setFeatureType(featureType)\ .setSelectedCol("segmented_title")\ .setOutputCol("vec")\ .setNumFeatures(100)\ .fit(titles)\ .transform(titles)\ .lazyPrint(-1, "DocHashCountVectorizer + " + featureType); BatchOperator.execute();
#c_5_2 df = pd.DataFrame([ "蒸羊羔、蒸熊掌、蒸鹿尾儿、烧花鸭、烧雏鸡、烧子鹅、卤猪、卤鸭、酱鸡、腊肉、松花小肚儿、晾肉、香肠儿、什锦苏盘、熏鸡白肚儿、清蒸八宝猪、江米酿鸭子、罐儿野鸡、罐儿鹌鹑。"\ + "卤什件儿、卤子鹅、山鸡、兔脯、菜蟒、银鱼、清蒸哈什蚂、烩鸭丝、烩鸭腰、烩鸭条、清拌鸭丝、黄心管儿、焖白鳝、焖黄鳝、豆豉鲇鱼、锅烧鲤鱼、烀烂甲鱼、抓炒鲤鱼、抓炒对儿虾。"\ + "软炸里脊、软炸鸡、什锦套肠儿、卤煮寒鸦儿、麻酥油卷儿、熘鲜蘑、熘鱼脯、熘鱼肚、熘鱼片儿、醋熘肉片儿、烩三鲜、烩白蘑、烩鸽子蛋、炒银丝、烩鳗鱼、炒白虾、炝青蛤、炒面鱼。"\ + "炒竹笋、芙蓉燕菜、炒虾仁儿、烩虾仁儿、烩腰花儿、烩海参、炒蹄筋儿、锅烧海参、锅烧白菜、炸木耳、炒肝尖儿、桂花翅子、清蒸翅子、炸飞禽、炸汁儿、炸排骨、清蒸江瑶柱。"\ + "糖熘芡仁米、拌鸡丝、拌肚丝、什锦豆腐、什锦丁儿、糟鸭、糟熘鱼片儿、熘蟹肉、炒蟹肉、烩蟹肉、清拌蟹肉、蒸南瓜、酿倭瓜、炒丝瓜、酿冬瓜、烟鸭掌儿、焖鸭掌儿、焖笋、炝茭白。"\ + "茄子晒炉肉、鸭羹、蟹肉羹、鸡血汤、三鲜木樨汤、红丸子、白丸子、南煎丸子、四喜丸子、三鲜丸子、氽丸子、鲜虾丸子、鱼脯丸子、饹炸丸子、豆腐丸子、樱桃肉、马牙肉、米粉肉。"\ + "一品肉、栗子肉、坛子肉、红焖肉、黄焖肉、酱豆腐肉、晒炉肉、炖肉、黏糊肉、烀肉、扣肉、松肉、罐儿肉、烧肉、大肉、烤肉、白肉、红肘子、白肘子、熏肘子、水晶肘子、蜜蜡肘子。"\ + "锅烧肘子、扒肘条、炖羊肉、酱羊肉、烧羊肉、烤羊肉、清羔羊肉、五香羊肉、氽三样儿、爆三样儿、炸卷果儿、烩散丹、烩酸燕儿、烩银丝、烩白杂碎、氽节子、烩节子、炸绣球。"\ + "三鲜鱼翅、栗子鸡、氽鲤鱼、酱汁鲫鱼、活钻鲤鱼、板鸭、筒子鸡、烩脐肚、烩南荠、爆肚仁儿、盐水肘花儿、锅烧猪蹄儿、拌稂子、炖吊子、烧肝尖儿、烧肥肠儿、烧心、烧肺。"\ + "烧紫盖儿、烧连帖、烧宝盖儿、油炸肺、酱瓜丝儿、山鸡丁儿、拌海蜇、龙须菜、炝冬笋、玉兰片、烧鸳鸯、烧鱼头、烧槟子、烧百合、炸豆腐、炸面筋、炸软巾、糖熘饹儿。"\ + "拔丝山药、糖焖莲子、酿山药、杏仁儿酪、小炒螃蟹、氽大甲、炒荤素儿、什锦葛仙米、鳎目鱼、八代鱼、海鲫鱼、黄花鱼、鲥鱼、带鱼、扒海参、扒燕窝、扒鸡腿儿、扒鸡块儿。"\ + "扒肉、扒面筋、扒三样儿、油泼肉、酱泼肉、炒虾黄、熘蟹黄、炒子蟹、炸子蟹、佛手海参、炸烹儿、炒芡子米、奶汤、翅子汤、三丝汤、熏斑鸠、卤斑鸠、海白米、烩腰丁儿。"\ + "火烧茨菰、炸鹿尾儿、焖鱼头、拌皮渣儿、氽肥肠儿、炸紫盖儿、鸡丝豆苗、十二台菜、汤羊、鹿肉、驼峰、鹿大哈、插根儿、炸花件儿,清拌粉皮儿、炝莴笋、烹芽韭、木樨菜。"\ + "烹丁香、烹大肉、烹白肉、麻辣野鸡、烩酸蕾、熘脊髓、咸肉丝儿、白肉丝儿、荸荠一品锅、素炝春不老、清焖莲子、酸黄菜、烧萝卜、脂油雪花儿菜、烩银耳、炒银枝儿。"\ + "八宝榛子酱、黄鱼锅子、白菜锅子、什锦锅子、汤圆锅子、菊花锅子、杂烩锅子、煮饽饽锅子、肉丁辣酱、炒肉丝、炒肉片儿、烩酸菜、烩白菜、烩豌豆、焖扁豆、氽毛豆、炒豇豆,外加腌苤蓝丝儿。" ]) BatchOperator.fromDataframe(df, schemaStr='doc string')\ .link( SegmentBatchOp()\ .setSelectedCol("doc")\ .setOutputCol("words") )\ .link( StopWordsRemoverBatchOp().setSelectedCol("words") )\ .link( KeywordsExtractionBatchOp()\ .setMethod('TEXT_RANK')\ .setSelectedCol("words")\ .setOutputCol("extract_keywords") )\ .select("extract_keywords")\ .print(); getSource()\ .link( SegmentBatchOp()\ .setSelectedCol("news_title")\ .setOutputCol("segmented_title") )\ .link( StopWordsRemoverBatchOp().setSelectedCol("segmented_title") )\ .link( KeywordsExtractionBatchOp()\ .setTopN(5)\ .setMethod('TF_IDF')\ .setSelectedCol("segmented_title")\ .setOutputCol("extract_keywords") )\ .select("news_title, extract_keywords")\ .firstN(10)\ .print();
#c_6_1 df = pd.DataFrame( [ ["机器学习", "机器学习"], ["批式计算", "流式计算"], ["Machine Learning", "ML"], ["Flink", "Alink"], ["Good Morning!", "Good Evening!"] ] ) source = BatchOperator.fromDataframe(df, schemaStr='col1 string, col2 string') source.lazyPrint(-1); source\ .link( StringSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN")\ .setOutputCol("LEVENSHTEIN") )\ .link( StringSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN_SIM")\ .setOutputCol("LEVENSHTEIN_SIM") )\ .link( StringSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS")\ .setOutputCol("LCS") )\ .link( StringSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS_SIM")\ .setOutputCol("LCS_SIM") )\ .link( StringSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("JACCARD_SIM")\ .setOutputCol("JACCARD_SIM") )\ .lazyPrint(-1, "\n## StringSimilarityPairwiseBatchOp ##"); source\ .link( SegmentBatchOp().setSelectedCol("col1") )\ .link( SegmentBatchOp().setSelectedCol("col2") )\ .link( TextSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN")\ .setOutputCol("LEVENSHTEIN") )\ .link( TextSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN_SIM")\ .setOutputCol("LEVENSHTEIN_SIM") )\ .link( TextSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS")\ .setOutputCol("LCS") )\ .link( TextSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS_SIM")\ .setOutputCol("LCS_SIM") )\ .link( TextSimilarityPairwiseBatchOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("JACCARD_SIM")\ .setOutputCol("JACCARD_SIM") )\ .lazyPrint(-1, "\n## TextSimilarityPairwiseBatchOp ##"); BatchOperator.execute(); source_stream = StreamOperator.fromDataframe(df, schemaStr='col1 string, col2 string') source_stream\ .link( StringSimilarityPairwiseStreamOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN")\ .setOutputCol("LEVENSHTEIN") )\ .link( StringSimilarityPairwiseStreamOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LEVENSHTEIN_SIM")\ .setOutputCol("LEVENSHTEIN_SIM") )\ .link( StringSimilarityPairwiseStreamOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS")\ .setOutputCol("LCS") )\ .link( StringSimilarityPairwiseStreamOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("LCS_SIM")\ .setOutputCol("LCS_SIM") )\ .link( StringSimilarityPairwiseStreamOp()\ .setSelectedCols(["col1", "col2"])\ .setMetric("JACCARD_SIM")\ .setOutputCol("JACCARD_SIM") )\ .print(); StreamOperator.execute();
#c_6_2 df = pd.DataFrame( [ "林徽因什么理由拒绝了徐志摩而选择梁思成为终身伴侣", "发酵床的垫料种类有哪些?哪种更好?", "京城最值得你来场文化之旅的博物馆", "什么是超写实绘画?" ] ) target = BatchOperator.fromDataframe(df, schemaStr=TXT_COL_NAME + ' string') source = getSource(); for metric in ["LEVENSHTEIN", "LCS", "SSK", "COSINE"] : StringNearestNeighbor()\ .setMetric(metric)\ .setSelectedCol(TXT_COL_NAME)\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles")\ .fit(source)\ .transform(target)\ .lazyPrint(-1, "StringNearestNeighbor + " + metric); BatchOperator.execute(); for metric in ["LEVENSHTEIN", "LCS", "SSK", "COSINE"] : Pipeline()\ .add( Segment()\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("segmented_title") )\ .add( TextNearestNeighbor()\ .setMetric(metric)\ .setSelectedCol("segmented_title")\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles") )\ .fit(source)\ .transform(target)\ .lazyPrint(-1, "TextNearestNeighbor + " + metric); BatchOperator.execute(); for metric in ["JACCARD_SIM", "MINHASH_JACCARD_SIM", "SIMHASH_HAMMING_SIM"] : StringApproxNearestNeighbor()\ .setMetric(metric)\ .setSelectedCol(TXT_COL_NAME)\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles")\ .fit(source)\ .transform(target)\ .lazyPrint(-1, "StringApproxNearestNeighbor + " + metric); BatchOperator.execute(); for metric in ["JACCARD_SIM", "MINHASH_JACCARD_SIM", "SIMHASH_HAMMING_SIM"] : Pipeline()\ .add( Segment()\ .setSelectedCol(TXT_COL_NAME)\ .setOutputCol("segmented_title") )\ .add( TextApproxNearestNeighbor()\ .setMetric(metric)\ .setSelectedCol("segmented_title")\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles") )\ .fit(source)\ .transform(target)\ .lazyPrint(-1, "TextApproxNearestNeighbor + " + metric); BatchOperator.execute(); snn = Pipeline()\ .add( StringNearestNeighbor()\ .setMetric("LEVENSHTEIN")\ .setSelectedCol(TXT_COL_NAME)\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles") ); approx_snn = Pipeline()\ .add( StringApproxNearestNeighbor()\ .setMetric("JACCARD_SIM")\ .setSelectedCol(TXT_COL_NAME)\ .setIdCol(TXT_COL_NAME)\ .setTopN(5)\ .setOutputCol("similar_titles") ); sw = Stopwatch(); if not(os.path.exists(DATA_DIR + SNN_MODEL_FILE)) : sw.reset(); sw.start(); snn.fit(source).save(DATA_DIR + SNN_MODEL_FILE); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); if not(os.path.exists(DATA_DIR + APPROX_SNN_MODEL_FILE)) : sw.reset(); sw.start(); approx_snn.fit(source).save(DATA_DIR + APPROX_SNN_MODEL_FILE); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); target_stock = source.filter("category_name = 'stock'"); target_news_story = source.filter("category_name = 'news_story'"); sw.reset(); sw.start(); PipelineModel\ .load(DATA_DIR + SNN_MODEL_FILE)\ .transform(target_stock)\ .lazyPrint(10, "StringNeareastNeighbor + LEVENSHTEIN"); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); PipelineModel\ .load(DATA_DIR + APPROX_SNN_MODEL_FILE)\ .transform(target_stock)\ .lazyPrint(10, "JACCARD_SIM + stock"); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); sw.reset(); sw.start(); PipelineModel\ .load(DATA_DIR + APPROX_SNN_MODEL_FILE)\ .transform(target_news_story)\ .lazyPrint(10, "JACCARD_SIM + news_story"); BatchOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan()); #StreamOperator.setParallelism(1); stream_target = StreamOperator.fromDataframe(df, schemaStr=TXT_COL_NAME + ' string') PipelineModel\ .load(DATA_DIR + SNN_MODEL_FILE)\ .transform(stream_target)\ .print(); StreamOperator.execute(); stream_target_stock = getStreamSource().filter("category_name = 'stock'"); sw.reset(); sw.start(); PipelineModel\ .load(DATA_DIR + APPROX_SNN_MODEL_FILE)\ .transform(stream_target_stock)\ .sample(0.02)\ .print(); StreamOperator.execute(); sw.stop(); print(sw.getElapsedTimeSpan());
#c_7 docs = getSource()\ .select(LABEL_COL_NAME + ", " + TXT_COL_NAME)\ .link(SegmentBatchOp().setSelectedCol(TXT_COL_NAME))\ .link(StopWordsRemoverBatchOp().setSelectedCol(TXT_COL_NAME)); docs.lazyPrint(10); if not(os.path.exists(DATA_DIR + LDA_MODEL_FILE)) : lda = LdaTrainBatchOp()\ .setTopicNum(10)\ .setNumIter(200)\ .setVocabSize(20000)\ .setSelectedCol(TXT_COL_NAME)\ .setRandomSeed(123); docs.link(lda); lda.lazyPrintModelInfo(); lda.link(AkSinkBatchOp().setFilePath(DATA_DIR + LDA_MODEL_FILE)); lda.getSideOutput(0).link(AkSinkBatchOp().setFilePath(DATA_DIR + LDA_PWZ_FILE)); BatchOperator.execute(); LdaPredictBatchOp()\ .setSelectedCol(TXT_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .setPredictionDetailCol("predinfo")\ .linkFrom( AkSourceBatchOp().setFilePath(DATA_DIR + LDA_MODEL_FILE), docs )\ .lazyPrint(5)\ .link( EvalClusterBatchOp()\ .setLabelCol(LABEL_COL_NAME)\ .setPredictionCol(PREDICTION_COL_NAME)\ .lazyPrintMetrics() ); pwz = AkSourceBatchOp().setFilePath(DATA_DIR + LDA_PWZ_FILE); pwz.sample(0.001).lazyPrint(10); for t in range(0, 10) : pwz.select("word, topic_" + str(t))\ .orderBy("topic_" + str(t), 20, order = 'desc')\ .lazyPrint(-1, "topic" + str(t)); BatchOperator.execute()