本章包括下面各节:
5.1 基本操作
5.1.1 注册
5.1.2 运行
5.1.3 内置函数
5.1.4 用户定义函数
5.2 简化操作
5.2.1 单表操作
5.2.2 两表的连接(JOIN)操作
5.2.3 两表的集合操作
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(4) from utils import * import os import pandas as pd DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep RATING_FILE = "u.data"; USER_FILE = "u.user"; ITEM_FILE = "u.item"; RATING_TRAIN_FILE = "ua.base"; RATING_TEST_FILE = "ua.test"; USER_COL = "user_id"; ITEM_COL = "item_id"; RATING_COL = "rating"; RECOMM_COL = "recomm"; RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long"; USER_SCHEMA_STRING = "user_id long, age int, gender string, occupation string, zip_code string"; ITEM_SCHEMA_STRING = "item_id long, title string, "\ + "release_date string, video_release_date string, imdb_url string, "\ + "unknown int, action int, adventure int, animation int, "\ + "children int, comedy int, crime int, documentary int, drama int, "\ + "fantasy int, film_noir int, horror int, musical int, mystery int, "\ + "romance int, sci_fi int, thriller int, war int, western int"; def getSourceRatings() : return TsvSourceBatchOp()\ .setFilePath(DATA_DIR + RATING_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); def getStreamSourceRatings() : return TsvSourceStreamOp()\ .setFilePath(DATA_DIR + RATING_FILE)\ .setSchemaStr(RATING_SCHEMA_STRING); def getSourceUsers() : return CsvSourceBatchOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + USER_FILE)\ .setSchemaStr(USER_SCHEMA_STRING); def getSourceItems() : return CsvSourceBatchOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + ITEM_FILE)\ .setSchemaStr(ITEM_SCHEMA_STRING); def getStreamSourceItems() : return CsvSourceStreamOp()\ .setFieldDelimiter("|")\ .setFilePath(DATA_DIR + ITEM_FILE)\ .setSchemaStr(ITEM_SCHEMA_STRING);
#c_1 ratings = getSourceRatings(); users = getSourceUsers(); items = getSourceItems(); ratings.registerTableName("ratings"); items.registerTableName("items"); users.registerTableName("users"); BatchOperator.sqlQuery( "SELECT title, cnt, avg_rating" + " FROM ( SELECT item_id, COUNT(*) AS cnt, AVG(rating) AS avg_rating" + " FROM ratings " + " GROUP BY item_id " + " ORDER BY cnt DESC LIMIT 10 " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY cnt DESC" ).print(); BatchOperator.sqlQuery( "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating" + " FROM ( SELECT item_id, COUNT(rating) AS cnt, " + " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, " + " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating " + " FROM (SELECT item_id, rating, gender FROM ratings " + " JOIN users ON ratings.user_id=users.user_id)" + " GROUP BY item_id " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY diff_rating DESC LIMIT 10" ).print(); BatchOperator.sqlQuery( "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating" + " FROM ( SELECT item_id, COUNT(rating) AS cnt, " + " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, " + " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating " + " FROM (SELECT item_id, rating, gender FROM ratings " + " JOIN users ON ratings.user_id=users.user_id)" + " GROUP BY item_id " + " HAVING COUNT(rating)>=50 " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY diff_rating DESC LIMIT 10" ).print(); import datetime @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3)) def from_unix_timestamp(ts): return datetime.datetime.fromtimestamp(ts) BatchOperator.registerFunction("from_unix_timestamp", from_unix_timestamp); BatchOperator.sqlQuery( "SELECT MIN(dt) AS min_dt, MAX(dt) AS max_dt " + " FROM ( SELECT from_unix_timestamp(ts) AS dt, 1 AS grp FROM ratings) " + " GROUP BY grp " ).print(); ratings\ .select("from_unix_timestamp(ts) AS dt, 1 AS grp")\ .groupBy("grp","MIN(dt) AS min_dt, MAX(dt) AS max_dt")\ .print();
#c_2_1 ratings = getSourceRatings(); users = getSourceUsers(); ratings_select = ratings.select("user_id, item_id AS movie_id"); ratings_select.firstN(5).print(); ratings.select("user_id, item_id AS movie_id").firstN(5).print(); ratings_select = ratings.select("*"); ratings_select.firstN(5).print(); ratings.alias("f1,f2,f3,f4").firstN(5).print(); ratings.filter("rating > 3").firstN(5).print(); ratings.where("rating > 3").firstN(5).print(); users.select("gender").distinct().print(); users.groupBy("gender", "gender, COUNT(*) AS cnt").print(); users.orderBy("age", 5).print(); users.orderBy("age", offset=1, fetch=3).print(); users.orderBy("age", 5, order = 'desc').print(); users.orderBy("age", offset=1, fetch=3, order = 'desc').print();
#c_2_2 ratings = getSourceRatings(); items = getSourceItems(); left_ratings = ratings\ .filter("user_id<3 AND item_id<4")\ .select("user_id, item_id, rating"); right_movies = items\ .select("item_id AS movie_id, title")\ .filter("movie_id < 6 AND MOD(movie_id, 2) = 1"); print("# left_ratings #"); left_ratings.print(); print("\n# right_movies #"); right_movies.print(); print("# JOIN #"); JoinBatchOp()\ .setJoinPredicate("item_id = movie_id")\ .setSelectClause("user_id, item_id, title, rating")\ .linkFrom(left_ratings, right_movies)\ .print(); print("\n# LEFT OUTER JOIN #"); LeftOuterJoinBatchOp()\ .setJoinPredicate("item_id = movie_id")\ .setSelectClause("user_id, item_id, title, rating")\ .linkFrom(left_ratings, right_movies)\ .print(); print("\n# RIGHT OUTER JOIN #"); RightOuterJoinBatchOp()\ .setJoinPredicate("item_id = movie_id")\ .setSelectClause("user_id, item_id, title, rating")\ .linkFrom(left_ratings, right_movies)\ .print(); print("\n# FULL OUTER JOIN #"); FullOuterJoinBatchOp()\ .setJoinPredicate("item_id = movie_id")\ .setSelectClause("user_id, item_id, title, rating")\ .linkFrom(left_ratings, right_movies)\ .print();
#c_2_3 users = getSourceUsers(); users_1_4 = users.filter("user_id<5"); print("# users_1_4 #"); users_1_4.print(); users_3_6 = users.filter("user_id>2 AND user_id<7"); print("\n# users_3_6 #"); users_3_6.print(); UnionAllBatchOp().linkFrom(users_1_4, users_3_6).print(); UnionBatchOp().linkFrom(users_1_4, users_3_6).print(); IntersectBatchOp().linkFrom(users_1_4, users_3_6).print(); IntersectAllBatchOp()\ .linkFrom( UnionAllBatchOp().linkFrom(users_1_4, users_1_4), UnionAllBatchOp().linkFrom(users_1_4, users_3_6) )\ .print(); MinusBatchOp().linkFrom(users_1_4, users_3_6).print(); MinusAllBatchOp()\ .linkFrom( UnionAllBatchOp().linkFrom(users_1_4, users_1_4), UnionAllBatchOp().linkFrom(users_1_4, users_3_6) )\ .print();