本章包括下面各节:
6.1 用户定义标量函数(UDF)
6.1.1 示例数据及问题
6.1.2 UDF的定义
6.1.3 使用UDF处理批式数据
6.1.4 使用UDF处理流式数据
6.2 用户定义表值函数(UDTF)
6.2.1 示例数据及问题
6.2.2 UDTF的定义
6.2.3 使用UDTF处理批式数据
6.2.4 使用UDTF处理流式数据
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
DATA_DIR = ROOT_DIR + "movielens" + os.sep + "ml-100k" + os.sep
RATING_FILE = "u.data";
ITEM_FILE = "u.item";
RATING_SCHEMA_STRING = "user_id long, item_id long, rating float, ts long";
ITEM_SCHEMA_STRING = "item_id long, title string, "\
+ "release_date string, video_release_date string, imdb_url string, "\
+ "unknown int, action int, adventure int, animation int, "\
+ "children int, comedy int, crime int, documentary int, drama int, "\
+ "fantasy int, film_noir int, horror int, musical int, mystery int, "\
+ "romance int, sci_fi int, thriller int, war int, western int";
def getSourceRatings() :
return TsvSourceBatchOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getStreamSourceRatings() :
return TsvSourceStreamOp()\
.setFilePath(DATA_DIR + RATING_FILE)\
.setSchemaStr(RATING_SCHEMA_STRING);
def getSourceItems() :
return CsvSourceBatchOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
def getStreamSourceItems() :
return CsvSourceStreamOp()\
.setFieldDelimiter("|")\
.setFilePath(DATA_DIR + ITEM_FILE)\
.setSchemaStr(ITEM_SCHEMA_STRING);
import datetime
@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.TIMESTAMP(3))
def from_unix_timestamp(ts):
return datetime.datetime.fromtimestamp(ts)
#c_1_3
ratings = getSourceRatings();
ratings.firstN(5).print();
ratings\
.link(
UDFBatchOp()\
.setFunc(from_unix_timestamp)\
.setSelectedCols(["ts"])\
.setOutputCol("ts")
)\
.firstN(5)\
.print();
BatchOperator.registerFunction("from_unix_timestamp", from_unix_timestamp);
ratings\
.select("user_id, item_id, rating, from_unix_timestamp(ts) AS ts")\
.firstN(5)\
.print();
ratings.registerTableName("ratings");
BatchOperator\
.sqlQuery("SELECT user_id, item_id, rating, from_unix_timestamp(ts) AS ts FROM ratings")\
.firstN(5)\
.print();
#c_1_4
ratings = getStreamSourceRatings();
ratings = ratings.filter("user_id=1 AND item_id<5");
ratings.print();
StreamOperator.execute();
ratings\
.link(
UDFStreamOp()\
.setFunc(from_unix_timestamp)\
.setSelectedCols(["ts"])\
.setOutputCol("ts")
)\
.print();
StreamOperator.execute();
StreamOperator.registerFunction("from_unix_timestamp", from_unix_timestamp);
ratings\
.select("user_id, item_id, rating, from_unix_timestamp(ts) AS ts")\
.print();
StreamOperator.execute();
ratings.registerTableName("ratings");
StreamOperator\
.sqlQuery("SELECT user_id, item_id, rating, from_unix_timestamp(ts) AS ts FROM ratings")\
.print();
StreamOperator.execute();
@udtf(input_types=[DataTypes.STRING()], result_types=[DataTypes.STRING(), DataTypes.INT()])
def doc_word_count(s):
dict = {}
for t in s.split() :
if t in dict :
dict[t] = dict[t] + 1
else :
dict[t] = 1
for k in dict :
yield k, dict[k]
#c_2_3
items = getSourceItems();
items.select("item_id, title").lazyPrint(10, "<- original data ->");
words = items\
.link(
UDTFBatchOp()\
.setFunc(doc_word_count)\
.setSelectedCols(["title"])\
.setOutputCols(["word", "cnt"])\
.setReservedCols(["item_id"])
);
words.lazyPrint(20, "<- after word count ->");
words.groupBy("word", "word, SUM(cnt) AS cnt")\
.orderBy("cnt", 20, order='desc')\
.print();
BatchOperator.registerFunction("doc_word_count", doc_word_count);
items.registerTableName("items");
BatchOperator\
.sqlQuery("SELECT item_id, word, cnt FROM items, "
+ "LATERAL TABLE(doc_word_count(title)) as T(word, cnt)")\
.firstN(20)\
.print();
#c_2_4
items = getStreamSourceItems();
items = items.select("item_id, title").filter("item_id<4");
items.print();
StreamOperator.execute();
words = items\
.link(
UDTFStreamOp()\
.setFunc(doc_word_count)\
.setSelectedCols(["title"])\
.setOutputCols(["word", "cnt"])\
.setReservedCols(["item_id"])
);
words.print();
StreamOperator.execute();
StreamOperator.registerFunction("doc_word_count", doc_word_count);
items.registerTableName("items");
StreamOperator\
.sqlQuery("SELECT item_id, word, cnt FROM items, "
+ "LATERAL TABLE(doc_word_count(title)) as T(word, cnt)")\
.print();
StreamOperator.execute();
```python
```