Alink教程(Python版)

第5章 支持Flink SQL

本章包括下面各节:
5.1 基本操作
5.1.1 注册
5.1.2 运行
5.1.3 内置函数
5.1.4 用户定义函数
5.2 简化操作
5.2.1 单表操作
5.2.2 两表的连接(JOIN)操作
5.2.3 两表的集合操作

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(4)

from utils import *
import os
import pandas as pd

DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep

RATING_FILE = "u.data";
USER_FILE = "u.user";
ITEM_FILE = "u.item";
RATING_TRAIN_FILE = "ua.base";
RATING_TEST_FILE = "ua.test";

USER_COL = "user_id";
ITEM_COL = "item_id";
RATING_COL = "rating";
RECOMM_COL = "recomm";

RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long";

USER_SCHEMA_STRING = "user_id long, age int, gender string, occupation string, zip_code string";

ITEM_SCHEMA_STRING = "item_id long, title string, "\
    + "release_date string, video_release_date string, imdb_url string, "\
    + "unknown int, action int, adventure int, animation int, "\
    + "children int, comedy int, crime int, documentary int, drama int, "\
    + "fantasy int, film_noir int, horror int, musical int, mystery int, "\
    + "romance int, sci_fi int, thriller int, war int, western int";


def getSourceRatings() :
    return TsvSourceBatchOp()\
            .setFilePath(DATA_DIR + RATING_FILE)\
            .setSchemaStr(RATING_SCHEMA_STRING);


def getStreamSourceRatings() :
    return TsvSourceStreamOp()\
            .setFilePath(DATA_DIR + RATING_FILE)\
            .setSchemaStr(RATING_SCHEMA_STRING);


def getSourceUsers() :
    return CsvSourceBatchOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + USER_FILE)\
            .setSchemaStr(USER_SCHEMA_STRING);


def getSourceItems() :
    return CsvSourceBatchOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + ITEM_FILE)\
            .setSchemaStr(ITEM_SCHEMA_STRING);


def getStreamSourceItems() :
    return CsvSourceStreamOp()\
            .setFieldDelimiter("|")\
            .setFilePath(DATA_DIR + ITEM_FILE)\
            .setSchemaStr(ITEM_SCHEMA_STRING);

#c_1

ratings = getSourceRatings();
users = getSourceUsers();
items = getSourceItems();

ratings.registerTableName("ratings");
items.registerTableName("items");
users.registerTableName("users");

BatchOperator.sqlQuery(
    "SELECT title, cnt, avg_rating"
    + " FROM ( SELECT item_id, COUNT(*) AS cnt, AVG(rating) AS avg_rating"
    + "        FROM ratings "
    + "        GROUP BY item_id "
    + "        ORDER BY cnt DESC LIMIT 10 "
    + "      ) AS t"
    + " JOIN items"
    + " ON t.item_id=items.item_id"
    + " ORDER BY cnt DESC"
).print();


BatchOperator.sqlQuery(
    "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating"
    + " FROM ( SELECT item_id, COUNT(rating) AS cnt, "
    + "               AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, "
    + "               AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating "
    + "        FROM (SELECT item_id, rating, gender FROM ratings "
    + "                     JOIN users ON ratings.user_id=users.user_id)"
    + "        GROUP BY item_id "
    + "      ) AS t"
    + " JOIN items"
    + " ON t.item_id=items.item_id"
    + " ORDER BY diff_rating DESC LIMIT 10"
).print();

BatchOperator.sqlQuery(
    "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating"
    + " FROM ( SELECT item_id, COUNT(rating) AS cnt, "
    + "               AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, "
    + "               AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating "
    + "        FROM (SELECT item_id, rating, gender FROM ratings "
    + "                     JOIN users ON ratings.user_id=users.user_id)"
    + "        GROUP BY item_id "
    + "        HAVING COUNT(rating)>=50 "
    + "      ) AS t"
    + " JOIN items"
    + " ON t.item_id=items.item_id"
    + " ORDER BY diff_rating DESC LIMIT 10"
).print();


import datetime

@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3))
def from_unix_timestamp(ts):
    return datetime.datetime.fromtimestamp(ts)

BatchOperator.registerFunction("from_unix_timestamp", from_unix_timestamp);

BatchOperator.sqlQuery(
    "SELECT MIN(dt) AS min_dt, MAX(dt) AS max_dt "
    + " FROM ( SELECT from_unix_timestamp(ts) AS dt, 1 AS grp FROM ratings) "
    + " GROUP BY grp "
).print();

ratings\
    .select("from_unix_timestamp(ts) AS dt, 1 AS grp")\
    .groupBy("grp","MIN(dt) AS min_dt, MAX(dt) AS max_dt")\
    .print();
#c_2_1

ratings = getSourceRatings();
users = getSourceUsers();

ratings_select = ratings.select("user_id, item_id AS movie_id");
ratings_select.firstN(5).print();

ratings.select("user_id, item_id AS movie_id").firstN(5).print();

ratings_select = ratings.select("*");
ratings_select.firstN(5).print();

ratings.alias("f1,f2,f3,f4").firstN(5).print();

ratings.filter("rating > 3").firstN(5).print();
ratings.where("rating > 3").firstN(5).print();

users.select("gender").distinct().print();

users.groupBy("gender", "gender, COUNT(*) AS cnt").print();

users.orderBy("age", 5).print();
users.orderBy("age", offset=1, fetch=3).print();

users.orderBy("age", 5, order = 'desc').print();
users.orderBy("age", offset=1, fetch=3, order = 'desc').print();

#c_2_2

ratings = getSourceRatings();
items = getSourceItems();

left_ratings = ratings\
    .filter("user_id<3 AND item_id<4")\
    .select("user_id, item_id, rating");

right_movies = items\
    .select("item_id AS movie_id, title")\
    .filter("movie_id < 6 AND MOD(movie_id, 2) = 1");

print("# left_ratings #");
left_ratings.print();
print("\n# right_movies #");
right_movies.print();

print("# JOIN #");
JoinBatchOp()\
    .setJoinPredicate("item_id = movie_id")\
    .setSelectClause("user_id, item_id, title, rating")\
    .linkFrom(left_ratings, right_movies)\
    .print();

print("\n# LEFT OUTER JOIN #");
LeftOuterJoinBatchOp()\
    .setJoinPredicate("item_id = movie_id")\
    .setSelectClause("user_id, item_id, title, rating")\
    .linkFrom(left_ratings, right_movies)\
    .print();

print("\n# RIGHT OUTER JOIN #");
RightOuterJoinBatchOp()\
    .setJoinPredicate("item_id = movie_id")\
    .setSelectClause("user_id, item_id, title, rating")\
    .linkFrom(left_ratings, right_movies)\
    .print();

print("\n# FULL OUTER JOIN #");
FullOuterJoinBatchOp()\
    .setJoinPredicate("item_id = movie_id")\
    .setSelectClause("user_id, item_id, title, rating")\
    .linkFrom(left_ratings, right_movies)\
    .print();

#c_2_3

users = getSourceUsers();

users_1_4 = users.filter("user_id<5");
print("# users_1_4 #");
users_1_4.print();

users_3_6 = users.filter("user_id>2 AND user_id<7");
print("\n# users_3_6 #");
users_3_6.print();

UnionAllBatchOp().linkFrom(users_1_4, users_3_6).print();

UnionBatchOp().linkFrom(users_1_4, users_3_6).print();

IntersectBatchOp().linkFrom(users_1_4, users_3_6).print();

IntersectAllBatchOp()\
    .linkFrom(
        UnionAllBatchOp().linkFrom(users_1_4, users_1_4),
        UnionAllBatchOp().linkFrom(users_1_4, users_3_6)
    )\
    .print();

MinusBatchOp().linkFrom(users_1_4, users_3_6).print();

MinusAllBatchOp()\
    .linkFrom(
        UnionAllBatchOp().linkFrom(users_1_4, users_1_4),
        UnionAllBatchOp().linkFrom(users_1_4, users_3_6)
    )\
    .print();