本章包括下面各节:
24.1 与推荐相关的组件介绍
24.2 常用的推荐算法
24.2.1 协同过滤
24.2.2 交替最小二乘法
24.3 数据探索
24.4 评分预测
24.5 根据用户推荐影片
24.6 计算相似影片
24.7 根据影片推荐用户
24.8 计算相似用户
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.max_rows', 100)
DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep
RATING_FILE = "u.data";
USER_FILE = "u.user";
ITEM_FILE = "u.item";
RATING_TRAIN_FILE = "ua.base";
RATING_TEST_FILE = "ua.test";
USER_COL = "user_id";
ITEM_COL = "item_id";
RATING_COL = "rating";
RECOMM_COL = "recomm";
ALS_MODEL_FILE = "als_model.ak";
ITEMCF_MODEL_FILE = "itemcf_model.ak";
USERCF_MODEL_FILE = "usercf_model.ak";
RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long";
USER_SCHEMA_STRING\
= "user_id long, age int, gender string, occupation string, zip_code string";
ITEM_SCHEMA_STRING = "item_id long, title string, "\
+ "release_date string, video_release_date string, imdb_url string, "\
+ "unknown int, action int, adventure int, animation int, "\
+ "children int, comedy int, crime int, documentary int, drama int, "\
+ "fantasy int, film_noir int, horror int, musical int, mystery int, "\
+ "romance int, sci_fi int, thriller int, war int, western int";
def getSourceRatings() :
return TsvSourceBatchOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getStreamSourceRatings() :
return TsvSourceStreamOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getSourceUsers() :
return CsvSourceBatchOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + USER_FILE)\
.setSchemaStr(USER_SCHEMA_STRING);
def getSourceItems() :
return CsvSourceBatchOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
def getStreamSourceItems() :
return CsvSourceStreamOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
#c_4
train_set = TsvSourceBatchOp()\
.setFilePath(DATA_DIR + RATING_TRAIN_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
test_set = TsvSourceBatchOp()\
.setFilePath(DATA_DIR + RATING_TEST_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
if not(os.path.exists(DATA_DIR + ALS_MODEL_FILE)) :
train_set\
.link(
AlsTrainBatchOp()\
.setUserCol(USER_COL)\
.setItemCol(ITEM_COL)\
.setRateCol(RATING_COL)\
.setLambda(0.1)\
.setRank(10)\
.setNumIter(10)
)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE)
);
BatchOperator.execute();
PipelineModel(
AlsRateRecommender()\
.setUserCol(USER_COL)\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE)),
Lookup()\
.setSelectedCols([ITEM_COL])\
.setOutputCols(["item_name"])\
.setModelData(getSourceItems())\
.setMapKeyCols(["item_id"])\
.setMapValueCols(["title"])
)\
.transform(test_set.filter("user_id=1"))\
.select("user_id, rating, recomm, item_name")\
.orderBy("rating, recomm", 1000)\
.lazyPrint(-1);
BatchOperator.execute();
AlsRateRecommender()\
.setUserCol(USER_COL)\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE))\
.transform(test_set)\
.link(
EvalRegressionBatchOp()\
.setLabelCol(RATING_COL)\
.setPredictionCol(RECOMM_COL)\
.lazyPrintMetrics()
);
BatchOperator.execute();
#c_5
if not(os.path.exists(DATA_DIR + ITEMCF_MODEL_FILE)) :
getSourceRatings()\
.link(
ItemCfTrainBatchOp()\
.setUserCol(USER_COL)\
.setItemCol(ITEM_COL)\
.setRateCol(RATING_COL)
)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
);
BatchOperator.execute();
test_data = BatchOperator.fromDataframe(pd.DataFrame([ [1] ]) , schemaStr='user_id long')
ItemCfItemsPerUserRecommender()\
.setUserCol(USER_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\
.transform(test_data)\
.print();
recomm_predictor = ItemCfItemsPerUserRecommender()\
.setUserCol(USER_COL)\
.setRecommCol(RECOMM_COL)\
.setK(20)\
.setModelData(
AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
)\
.collectLocalPredictor("user_id long");
print(recomm_predictor.getOutputColNames());
kv_predictor = Lookup()\
.setSelectedCols([ITEM_COL])\
.setOutputCols(["item_name"])\
.setModelData(getSourceItems())\
.setMapKeyCols(["item_id"])\
.setMapValueCols(["title"])\
.collectLocalPredictor("item_id long");
print(kv_predictor.getOutputColNames());
recommResultStr = recomm_predictor.map([1])[1];
print(recommResultStr);
import json
for id in eval(json.loads(recommResultStr).get('item_id')):
print(kv_predictor.map([id]));
Lookup()\
.setSelectedCols([ITEM_COL])\
.setOutputCols(["item_name"])\
.setModelData(getSourceItems())\
.setMapKeyCols(["item_id"])\
.setMapValueCols(["title"])\
.transform(getSourceRatings().filter("user_id=1 AND rating>4"))\
.select("item_name")\
.orderBy("item_name", 1000)\
.print()
recomm_predictor_2 = ItemCfItemsPerUserRecommender()\
.setUserCol(USER_COL)\
.setRecommCol(RECOMM_COL)\
.setK(20)\
.setExcludeKnown(True)\
.setModelData(
AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
)\
.collectLocalPredictor("user_id long");
recommResultStr = recomm_predictor_2.map([1])[1];
print(recommResultStr);
for id in eval(json.loads(recommResultStr).get('item_id')):
print(kv_predictor.map([id]));
#c_6
test_data = BatchOperator\
.fromDataframe(
pd.DataFrame([ [50] ]),
schemaStr=ITEM_COL + ' long'
);
ItemCfSimilarItemsRecommender()\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\
.transform(test_data)\
.print();
recomm_predictor = ItemCfSimilarItemsRecommender()\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setK(10)\
.setModelData(
AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
)\
.collectLocalPredictor("item_id long");
kv_predictor = Lookup()\
.setSelectedCols([ITEM_COL])\
.setOutputCols(["item_name"])\
.setModelData(getSourceItems())\
.setMapKeyCols(["item_id"])\
.setMapValueCols(["title"])\
.collectLocalPredictor("item_id long");
recommResultStr = recomm_predictor.map([50])[1];
for id in eval(json.loads(recommResultStr).get('item_id')):
print(kv_predictor.map([id]));
#c_7
if not(os.path.exists(DATA_DIR + USERCF_MODEL_FILE)) :
getSourceRatings()\
.link(
UserCfTrainBatchOp()\
.setUserCol(USER_COL)\
.setItemCol(ITEM_COL)\
.setRateCol(RATING_COL)
)\
.link(
AkSinkBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE)
);
BatchOperator.execute();
test_data = BatchOperator\
.fromDataframe(
pd.DataFrame([ [50] ]),
schemaStr=ITEM_COL + ' long'
)
UserCfUsersPerItemRecommender()\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
.transform(test_data)\
.print();
getSourceRatings()\
.filter("user_id IN (276,429,222,864,194,650,896,303,749,301) AND item_id=50")\
.print();
UserCfUsersPerItemRecommender()\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setExcludeKnown(True)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
.transform(test_data)\
.print();
#c_8
test_data = BatchOperator\
.fromDataframe(
pd.DataFrame([ [1] ]),
schemaStr=USER_COL + ' long'
);
UserCfSimilarUsersRecommender()\
.setUserCol(USER_COL)\
.setRecommCol(RECOMM_COL)\
.setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
.transform(test_data)\
.print();
getSourceUsers()\
.filter("user_id IN (1, 916,864,268,92,435,457,738,429,303,276)")\
.print();
```python
```