本章包括下面各节:
24.1 与推荐相关的组件介绍
24.2 常用的推荐算法
24.2.1 协同过滤
24.2.2 交替最小二乘法
24.3 数据探索
24.4 评分预测
24.5 根据用户推荐影片
24.6 计算相似影片
24.7 根据影片推荐用户
24.8 计算相似用户
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd pd.set_option('display.max_colwidth', 1000) pd.set_option('display.max_rows', 100) DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep RATING_FILE = "u.data"; USER_FILE = "u.user"; ITEM_FILE = "u.item"; RATING_TRAIN_FILE = "ua.base"; RATING_TEST_FILE = "ua.test"; USER_COL = "user_id"; ITEM_COL = "item_id"; RATING_COL = "rating"; RECOMM_COL = "recomm"; ALS_MODEL_FILE = "als_model.ak"; ITEMCF_MODEL_FILE = "itemcf_model.ak"; USERCF_MODEL_FILE = "usercf_model.ak"; RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long"; USER_SCHEMA_STRING\ = "user_id long, age int, gender string, occupation string, zip_code string"; ITEM_SCHEMA_STRING = "item_id long, title string, "\ + "release_date string, video_release_date string, imdb_url string, "\ + "unknown int, action int, adventure int, animation int, "\ + "children int, comedy int, crime int, documentary int, drama int, "\ + "fantasy int, film_noir int, horror int, musical int, mystery int, "\ + "romance int, sci_fi int, thriller int, war int, western int"; def getSourceRatings() : return TsvSourceBatchOp()\ .setFilePath(DATA_DIR + RATING_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); def getStreamSourceRatings() : return TsvSourceStreamOp()\ .setFilePath(DATA_DIR + RATING_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); def getSourceUsers() : return CsvSourceBatchOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + USER_FILE)\ .setSchemaStr(USER_SCHEMA_STRING); def getSourceItems() : return CsvSourceBatchOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + ITEM_FILE)\ .setSchemaStr(ITEM_SCHEMA_STRING); def getStreamSourceItems() : return CsvSourceStreamOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + ITEM_FILE)\ .setSchemaStr(ITEM_SCHEMA_STRING);
#c_4 train_set = TsvSourceBatchOp()\ .setFilePath(DATA_DIR + RATING_TRAIN_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); test_set = TsvSourceBatchOp()\ .setFilePath(DATA_DIR + RATING_TEST_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); if not(os.path.exists(DATA_DIR + ALS_MODEL_FILE)) : train_set\ .link( AlsTrainBatchOp()\ .setUserCol(USER_COL)\ .setItemCol(ITEM_COL)\ .setRateCol(RATING_COL)\ .setLambda(0.1)\ .setRank(10)\ .setNumIter(10) )\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE) ); BatchOperator.execute(); PipelineModel( AlsRateRecommender()\ .setUserCol(USER_COL)\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE)), Lookup()\ .setSelectedCols([ITEM_COL])\ .setOutputCols(["item_name"])\ .setModelData(getSourceItems())\ .setMapKeyCols(["item_id"])\ .setMapValueCols(["title"]) )\ .transform(test_set.filter("user_id=1"))\ .select("user_id, rating, recomm, item_name")\ .orderBy("rating, recomm", 1000)\ .lazyPrint(-1); BatchOperator.execute(); AlsRateRecommender()\ .setUserCol(USER_COL)\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE))\ .transform(test_set)\ .link( EvalRegressionBatchOp()\ .setLabelCol(RATING_COL)\ .setPredictionCol(RECOMM_COL)\ .lazyPrintMetrics() ); BatchOperator.execute();
#c_5 if not(os.path.exists(DATA_DIR + ITEMCF_MODEL_FILE)) : getSourceRatings()\ .link( ItemCfTrainBatchOp()\ .setUserCol(USER_COL)\ .setItemCol(ITEM_COL)\ .setRateCol(RATING_COL) )\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) ); BatchOperator.execute(); test_data = BatchOperator.fromDataframe(pd.DataFrame([ [1] ]) , schemaStr='user_id long') ItemCfItemsPerUserRecommender()\ .setUserCol(USER_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\ .transform(test_data)\ .print(); recomm_predictor = ItemCfItemsPerUserRecommender()\ .setUserCol(USER_COL)\ .setRecommCol(RECOMM_COL)\ .setK(20)\ .setModelData( AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) )\ .collectLocalPredictor("user_id long"); print(recomm_predictor.getOutputColNames()); kv_predictor = Lookup()\ .setSelectedCols([ITEM_COL])\ .setOutputCols(["item_name"])\ .setModelData(getSourceItems())\ .setMapKeyCols(["item_id"])\ .setMapValueCols(["title"])\ .collectLocalPredictor("item_id long"); print(kv_predictor.getOutputColNames()); recommResultStr = recomm_predictor.map([1])[1]; print(recommResultStr); import json for id in eval(json.loads(recommResultStr).get('item_id')): print(kv_predictor.map([id])); Lookup()\ .setSelectedCols([ITEM_COL])\ .setOutputCols(["item_name"])\ .setModelData(getSourceItems())\ .setMapKeyCols(["item_id"])\ .setMapValueCols(["title"])\ .transform(getSourceRatings().filter("user_id=1 AND rating>4"))\ .select("item_name")\ .orderBy("item_name", 1000)\ .print() recomm_predictor_2 = ItemCfItemsPerUserRecommender()\ .setUserCol(USER_COL)\ .setRecommCol(RECOMM_COL)\ .setK(20)\ .setExcludeKnown(True)\ .setModelData( AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) )\ .collectLocalPredictor("user_id long"); recommResultStr = recomm_predictor_2.map([1])[1]; print(recommResultStr); for id in eval(json.loads(recommResultStr).get('item_id')): print(kv_predictor.map([id]));
#c_6 test_data = BatchOperator\ .fromDataframe( pd.DataFrame([ [50] ]), schemaStr=ITEM_COL + ' long' ); ItemCfSimilarItemsRecommender()\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\ .transform(test_data)\ .print(); recomm_predictor = ItemCfSimilarItemsRecommender()\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setK(10)\ .setModelData( AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE) )\ .collectLocalPredictor("item_id long"); kv_predictor = Lookup()\ .setSelectedCols([ITEM_COL])\ .setOutputCols(["item_name"])\ .setModelData(getSourceItems())\ .setMapKeyCols(["item_id"])\ .setMapValueCols(["title"])\ .collectLocalPredictor("item_id long"); recommResultStr = recomm_predictor.map([50])[1]; for id in eval(json.loads(recommResultStr).get('item_id')): print(kv_predictor.map([id]));
#c_7 if not(os.path.exists(DATA_DIR + USERCF_MODEL_FILE)) : getSourceRatings()\ .link( UserCfTrainBatchOp()\ .setUserCol(USER_COL)\ .setItemCol(ITEM_COL)\ .setRateCol(RATING_COL) )\ .link( AkSinkBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE) ); BatchOperator.execute(); test_data = BatchOperator\ .fromDataframe( pd.DataFrame([ [50] ]), schemaStr=ITEM_COL + ' long' ) UserCfUsersPerItemRecommender()\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\ .transform(test_data)\ .print(); getSourceRatings()\ .filter("user_id IN (276,429,222,864,194,650,896,303,749,301) AND item_id=50")\ .print(); UserCfUsersPerItemRecommender()\ .setItemCol(ITEM_COL)\ .setRecommCol(RECOMM_COL)\ .setExcludeKnown(True)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\ .transform(test_data)\ .print();
#c_8 test_data = BatchOperator\ .fromDataframe( pd.DataFrame([ [1] ]), schemaStr=USER_COL + ' long' ); UserCfSimilarUsersRecommender()\ .setUserCol(USER_COL)\ .setRecommCol(RECOMM_COL)\ .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\ .transform(test_data)\ .print(); getSourceUsers()\ .filter("user_id IN (1, 916,864,268,92,435,457,738,429,303,276)")\ .print();