Alink教程(Python版)

第24章 构建推荐系统

本章包括下面各节:
24.1 与推荐相关的组件介绍
24.2 常用的推荐算法
24.2.1 协同过滤
24.2.2 交替最小二乘法
24.3 数据探索
24.4 评分预测
24.5 根据用户推荐影片
24.6 计算相似影片
24.7 根据影片推荐用户
24.8 计算相似用户

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd

pd.set_option('display.max_colwidth', 1000)
pd.set_option('display.max_rows', 100)

DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep

RATING_FILE = "u.data";
USER_FILE = "u.user";
ITEM_FILE = "u.item";
RATING_TRAIN_FILE = "ua.base";
RATING_TEST_FILE = "ua.test";

USER_COL = "user_id";
ITEM_COL = "item_id";
RATING_COL = "rating";
RECOMM_COL = "recomm";

ALS_MODEL_FILE = "als_model.ak";
ITEMCF_MODEL_FILE = "itemcf_model.ak";
USERCF_MODEL_FILE = "usercf_model.ak";

RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long";

USER_SCHEMA_STRING\
    = "user_id long, age int, gender string, occupation string, zip_code string";

ITEM_SCHEMA_STRING = "item_id long, title string, "\
    + "release_date string, video_release_date string, imdb_url string, "\
    + "unknown int, action int, adventure int, animation int, "\
    + "children int, comedy int, crime int, documentary int, drama int, "\
    + "fantasy int, film_noir int, horror int, musical int, mystery int, "\
    + "romance int, sci_fi int, thriller int, war int, western int";


def getSourceRatings() :
    return TsvSourceBatchOp()\
            .setFilePath(DATA_DIR + RATING_FILE)\
            .setSchemaStr(RATING_SCHEMA_STRING);


def getStreamSourceRatings() :
    return TsvSourceStreamOp()\
            .setFilePath(DATA_DIR + RATING_FILE)\
            .setSchemaStr(RATING_SCHEMA_STRING);


def getSourceUsers() :
    return CsvSourceBatchOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + USER_FILE)\
            .setSchemaStr(USER_SCHEMA_STRING);


def getSourceItems() :
    return CsvSourceBatchOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + ITEM_FILE)\
            .setSchemaStr(ITEM_SCHEMA_STRING);


def getStreamSourceItems() :
    return CsvSourceStreamOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + ITEM_FILE)\
            .setSchemaStr(ITEM_SCHEMA_STRING);

#c_4

train_set = TsvSourceBatchOp()\
    .setFilePath(DATA_DIR + RATING_TRAIN_FILE)\
    .setSchemaStr(RATING_SCHEMA_STRING);

test_set = TsvSourceBatchOp()\
    .setFilePath(DATA_DIR + RATING_TEST_FILE)\
    .setSchemaStr(RATING_SCHEMA_STRING);

if not(os.path.exists(DATA_DIR + ALS_MODEL_FILE)) :
    train_set\
        .link(
            AlsTrainBatchOp()\
                .setUserCol(USER_COL)\
                .setItemCol(ITEM_COL)\
                .setRateCol(RATING_COL)\
                .setLambda(0.1)\
                .setRank(10)\
                .setNumIter(10)
        )\
        .link(
            AkSinkBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE)
        );
    BatchOperator.execute();


PipelineModel(
    AlsRateRecommender()\
        .setUserCol(USER_COL)\
        .setItemCol(ITEM_COL)\
        .setRecommCol(RECOMM_COL)\
        .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE)),
    Lookup()\
        .setSelectedCols([ITEM_COL])\
        .setOutputCols(["item_name"])\
        .setModelData(getSourceItems())\
        .setMapKeyCols(["item_id"])\
        .setMapValueCols(["title"])
    )\
    .transform(test_set.filter("user_id=1"))\
    .select("user_id, rating, recomm, item_name")\
    .orderBy("rating, recomm", 1000)\
    .lazyPrint(-1);
BatchOperator.execute();

AlsRateRecommender()\
    .setUserCol(USER_COL)\
    .setItemCol(ITEM_COL)\
    .setRecommCol(RECOMM_COL)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ALS_MODEL_FILE))\
    .transform(test_set)\
    .link(
        EvalRegressionBatchOp()\
            .setLabelCol(RATING_COL)\
            .setPredictionCol(RECOMM_COL)\
            .lazyPrintMetrics()
    );
BatchOperator.execute();
#c_5

if not(os.path.exists(DATA_DIR + ITEMCF_MODEL_FILE)) :
    getSourceRatings()\
        .link(
            ItemCfTrainBatchOp()\
                .setUserCol(USER_COL)\
                .setItemCol(ITEM_COL)\
                .setRateCol(RATING_COL)
        )\
        .link(
            AkSinkBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
        );
    BatchOperator.execute();

test_data = BatchOperator.fromDataframe(pd.DataFrame([ [1] ]) , schemaStr='user_id long')

ItemCfItemsPerUserRecommender()\
    .setUserCol(USER_COL)\
    .setRecommCol(RECOMM_COL)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\
    .transform(test_data)\
    .print();

recomm_predictor = ItemCfItemsPerUserRecommender()\
    .setUserCol(USER_COL)\
    .setRecommCol(RECOMM_COL)\
    .setK(20)\
    .setModelData(
        AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
    )\
    .collectLocalPredictor("user_id long");

print(recomm_predictor.getOutputColNames());

kv_predictor = Lookup()\
    .setSelectedCols([ITEM_COL])\
    .setOutputCols(["item_name"])\
    .setModelData(getSourceItems())\
    .setMapKeyCols(["item_id"])\
    .setMapValueCols(["title"])\
    .collectLocalPredictor("item_id long");

print(kv_predictor.getOutputColNames());

recommResultStr = recomm_predictor.map([1])[1];

print(recommResultStr);


import json

for id in eval(json.loads(recommResultStr).get('item_id')):
    print(kv_predictor.map([id]));

    
Lookup()\
    .setSelectedCols([ITEM_COL])\
    .setOutputCols(["item_name"])\
    .setModelData(getSourceItems())\
    .setMapKeyCols(["item_id"])\
    .setMapValueCols(["title"])\
    .transform(getSourceRatings().filter("user_id=1 AND rating>4"))\
    .select("item_name")\
    .orderBy("item_name", 1000)\
    .print()

recomm_predictor_2 = ItemCfItemsPerUserRecommender()\
    .setUserCol(USER_COL)\
    .setRecommCol(RECOMM_COL)\
    .setK(20)\
    .setExcludeKnown(True)\
    .setModelData(
        AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
    )\
    .collectLocalPredictor("user_id long");

recommResultStr = recomm_predictor_2.map([1])[1];

print(recommResultStr);

for id in eval(json.loads(recommResultStr).get('item_id')):
    print(kv_predictor.map([id]));

#c_6

test_data = BatchOperator\
    .fromDataframe(
        pd.DataFrame([ [50] ]), 
        schemaStr=ITEM_COL + ' long'
    );

ItemCfSimilarItemsRecommender()\
    .setItemCol(ITEM_COL)\
    .setRecommCol(RECOMM_COL)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE))\
    .transform(test_data)\
    .print();

recomm_predictor = ItemCfSimilarItemsRecommender()\
.setItemCol(ITEM_COL)\
.setRecommCol(RECOMM_COL)\
.setK(10)\
.setModelData(
    AkSourceBatchOp().setFilePath(DATA_DIR + ITEMCF_MODEL_FILE)
)\
.collectLocalPredictor("item_id long");

kv_predictor = Lookup()\
.setSelectedCols([ITEM_COL])\
.setOutputCols(["item_name"])\
.setModelData(getSourceItems())\
.setMapKeyCols(["item_id"])\
.setMapValueCols(["title"])\
.collectLocalPredictor("item_id long");

recommResultStr = recomm_predictor.map([50])[1];

for id in eval(json.loads(recommResultStr).get('item_id')):
    print(kv_predictor.map([id]));

#c_7

if not(os.path.exists(DATA_DIR + USERCF_MODEL_FILE)) :
    getSourceRatings()\
    .link(
        UserCfTrainBatchOp()\
            .setUserCol(USER_COL)\
            .setItemCol(ITEM_COL)\
            .setRateCol(RATING_COL)
    )\
    .link(
        AkSinkBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE)
    );
    BatchOperator.execute();


test_data = BatchOperator\
    .fromDataframe(
        pd.DataFrame([ [50] ]), 
        schemaStr=ITEM_COL + ' long'
    )

UserCfUsersPerItemRecommender()\
    .setItemCol(ITEM_COL)\
    .setRecommCol(RECOMM_COL)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
    .transform(test_data)\
    .print();

getSourceRatings()\
    .filter("user_id IN (276,429,222,864,194,650,896,303,749,301) AND item_id=50")\
    .print();

UserCfUsersPerItemRecommender()\
    .setItemCol(ITEM_COL)\
    .setRecommCol(RECOMM_COL)\
    .setExcludeKnown(True)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
    .transform(test_data)\
    .print();
#c_8

test_data = BatchOperator\
    .fromDataframe(
        pd.DataFrame([ [1] ]), 
        schemaStr=USER_COL + ' long'
    );

UserCfSimilarUsersRecommender()\
    .setUserCol(USER_COL)\
    .setRecommCol(RECOMM_COL)\
    .setModelData(AkSourceBatchOp().setFilePath(DATA_DIR + USERCF_MODEL_FILE))\
    .transform(test_data)\
    .print();

getSourceUsers()\
    .filter("user_id IN (1, 916,864,268,92,435,457,738,429,303,276)")\
    .print();