本章包括下面各节:
5.1 基本操作
5.1.1 注册
5.1.2 运行
5.1.3 内置函数
5.1.4 用户定义函数
5.2 简化操作
5.2.1 单表操作
5.2.2 两表的连接(JOIN)操作
5.2.3 两表的集合操作
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(4)
from utils import *
import os
import pandas as pd
DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep
RATING_FILE = "u.data";
USER_FILE = "u.user";
ITEM_FILE = "u.item";
RATING_TRAIN_FILE = "ua.base";
RATING_TEST_FILE = "ua.test";
USER_COL = "user_id";
ITEM_COL = "item_id";
RATING_COL = "rating";
RECOMM_COL = "recomm";
RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long";
USER_SCHEMA_STRING = "user_id long, age int, gender string, occupation string, zip_code string";
ITEM_SCHEMA_STRING = "item_id long, title string, "\
+ "release_date string, video_release_date string, imdb_url string, "\
+ "unknown int, action int, adventure int, animation int, "\
+ "children int, comedy int, crime int, documentary int, drama int, "\
+ "fantasy int, film_noir int, horror int, musical int, mystery int, "\
+ "romance int, sci_fi int, thriller int, war int, western int";
def getSourceRatings() :
return TsvSourceBatchOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getStreamSourceRatings() :
return TsvSourceStreamOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getSourceUsers() :
return CsvSourceBatchOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + USER_FILE)\
.setSchemaStr(USER_SCHEMA_STRING);
def getSourceItems() :
return CsvSourceBatchOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
def getStreamSourceItems() :
return CsvSourceStreamOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
#c_1
ratings = getSourceRatings();
users = getSourceUsers();
items = getSourceItems();
ratings.registerTableName("ratings");
items.registerTableName("items");
users.registerTableName("users");
BatchOperator.sqlQuery(
"SELECT title, cnt, avg_rating"
+ " FROM ( SELECT item_id, COUNT(*) AS cnt, AVG(rating) AS avg_rating"
+ " FROM ratings "
+ " GROUP BY item_id "
+ " ORDER BY cnt DESC LIMIT 10 "
+ " ) AS t"
+ " JOIN items"
+ " ON t.item_id=items.item_id"
+ " ORDER BY cnt DESC"
).print();
BatchOperator.sqlQuery(
"SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating"
+ " FROM ( SELECT item_id, COUNT(rating) AS cnt, "
+ " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, "
+ " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating "
+ " FROM (SELECT item_id, rating, gender FROM ratings "
+ " JOIN users ON ratings.user_id=users.user_id)"
+ " GROUP BY item_id "
+ " ) AS t"
+ " JOIN items"
+ " ON t.item_id=items.item_id"
+ " ORDER BY diff_rating DESC LIMIT 10"
).print();
BatchOperator.sqlQuery(
"SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating"
+ " FROM ( SELECT item_id, COUNT(rating) AS cnt, "
+ " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, "
+ " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating "
+ " FROM (SELECT item_id, rating, gender FROM ratings "
+ " JOIN users ON ratings.user_id=users.user_id)"
+ " GROUP BY item_id "
+ " HAVING COUNT(rating)>=50 "
+ " ) AS t"
+ " JOIN items"
+ " ON t.item_id=items.item_id"
+ " ORDER BY diff_rating DESC LIMIT 10"
).print();
import datetime
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3))
def from_unix_timestamp(ts):
return datetime.datetime.fromtimestamp(ts)
BatchOperator.registerFunction("from_unix_timestamp", from_unix_timestamp);
BatchOperator.sqlQuery(
"SELECT MIN(dt) AS min_dt, MAX(dt) AS max_dt "
+ " FROM ( SELECT from_unix_timestamp(ts) AS dt, 1 AS grp FROM ratings) "
+ " GROUP BY grp "
).print();
ratings\
.select("from_unix_timestamp(ts) AS dt, 1 AS grp")\
.groupBy("grp","MIN(dt) AS min_dt, MAX(dt) AS max_dt")\
.print();
#c_2_1
ratings = getSourceRatings();
users = getSourceUsers();
ratings_select = ratings.select("user_id, item_id AS movie_id");
ratings_select.firstN(5).print();
ratings.select("user_id, item_id AS movie_id").firstN(5).print();
ratings_select = ratings.select("*");
ratings_select.firstN(5).print();
ratings.alias("f1,f2,f3,f4").firstN(5).print();
ratings.filter("rating > 3").firstN(5).print();
ratings.where("rating > 3").firstN(5).print();
users.select("gender").distinct().print();
users.groupBy("gender", "gender, COUNT(*) AS cnt").print();
users.orderBy("age", 5).print();
users.orderBy("age", offset=1, fetch=3).print();
users.orderBy("age", 5, order = 'desc').print();
users.orderBy("age", offset=1, fetch=3, order = 'desc').print();
#c_2_2
ratings = getSourceRatings();
items = getSourceItems();
left_ratings = ratings\
.filter("user_id<3 AND item_id<4")\
.select("user_id, item_id, rating");
right_movies = items\
.select("item_id AS movie_id, title")\
.filter("movie_id < 6 AND MOD(movie_id, 2) = 1");
print("# left_ratings #");
left_ratings.print();
print("\n# right_movies #");
right_movies.print();
print("# JOIN #");
JoinBatchOp()\
.setJoinPredicate("item_id = movie_id")\
.setSelectClause("user_id, item_id, title, rating")\
.linkFrom(left_ratings, right_movies)\
.print();
print("\n# LEFT OUTER JOIN #");
LeftOuterJoinBatchOp()\
.setJoinPredicate("item_id = movie_id")\
.setSelectClause("user_id, item_id, title, rating")\
.linkFrom(left_ratings, right_movies)\
.print();
print("\n# RIGHT OUTER JOIN #");
RightOuterJoinBatchOp()\
.setJoinPredicate("item_id = movie_id")\
.setSelectClause("user_id, item_id, title, rating")\
.linkFrom(left_ratings, right_movies)\
.print();
print("\n# FULL OUTER JOIN #");
FullOuterJoinBatchOp()\
.setJoinPredicate("item_id = movie_id")\
.setSelectClause("user_id, item_id, title, rating")\
.linkFrom(left_ratings, right_movies)\
.print();
#c_2_3
users = getSourceUsers();
users_1_4 = users.filter("user_id<5");
print("# users_1_4 #");
users_1_4.print();
users_3_6 = users.filter("user_id>2 AND user_id<7");
print("\n# users_3_6 #");
users_3_6.print();
UnionAllBatchOp().linkFrom(users_1_4, users_3_6).print();
UnionBatchOp().linkFrom(users_1_4, users_3_6).print();
IntersectBatchOp().linkFrom(users_1_4, users_3_6).print();
IntersectAllBatchOp()\
.linkFrom(
UnionAllBatchOp().linkFrom(users_1_4, users_1_4),
UnionAllBatchOp().linkFrom(users_1_4, users_3_6)
)\
.print();
MinusBatchOp().linkFrom(users_1_4, users_3_6).print();
MinusAllBatchOp()\
.linkFrom(
UnionAllBatchOp().linkFrom(users_1_4, users_1_4),
UnionAllBatchOp().linkFrom(users_1_4, users_3_6)
)\
.print();
```python
```