本章包括下面各节:
6.1 用户定义标量函数(UDF)
6.1.1 示例数据及问题
6.1.2 UDF的定义
6.1.3 使用UDF处理批式数据
6.1.4 使用UDF处理流式数据
6.2 用户定义表值函数(UDTF)
6.2.1 示例数据及问题
6.2.2 UDTF的定义
6.2.3 使用UDTF处理批式数据
6.2.4 使用UDTF处理流式数据
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.types.Row; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.utils.UDFBatchOp; import com.alibaba.alink.operator.batch.utils.UDTFBatchOp; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.utils.UDFStreamOp; import com.alibaba.alink.operator.stream.utils.UDTFStreamOp; import java.util.HashMap; import java.util.Map.Entry; public class Chap06 { public static void main(String[] args) throws Exception { BatchOperator.setParallelism(1); c_1_3(); c_1_4(); c_2_3(); c_2_4(); } public static class FromUnixTimestamp extends ScalarFunction { public java.sql.Timestamp eval(Long ts) { return new java.sql.Timestamp(ts * 1000); } @Override public TypeInformation <?> getResultType(Class <?>[] signature) { return Types.SQL_TIMESTAMP; } } static void c_1_3() throws Exception { BatchOperator <?> ratings = Chap24.getSourceRatings(); ratings.firstN(5).print(); ratings .link( new UDFBatchOp() .setFunc(new FromUnixTimestamp()) .setSelectedCols("ts") .setOutputCol("ts") ) .firstN(5) .print(); BatchOperator.registerFunction("from_unix_timestamp", new FromUnixTimestamp()); ratings .select("user_id, item_id, rating, from_unix_timestamp(ts) AS ts") .firstN(5) .print(); ratings.registerTableName("ratings"); BatchOperator .sqlQuery("SELECT user_id, item_id, rating, from_unix_timestamp(ts) AS ts FROM ratings") .firstN(5) .print(); } static void c_1_4() throws Exception { StreamOperator <?> ratings = Chap24.getStreamSourceRatings(); ratings = ratings.filter("user_id=1 AND item_id<5"); ratings.print(); StreamOperator.execute(); ratings .link( new UDFStreamOp() .setFunc(new FromUnixTimestamp()) .setSelectedCols("ts") .setOutputCol("ts") ) .print(); StreamOperator.execute(); StreamOperator.registerFunction("from_unix_timestamp", new FromUnixTimestamp()); ratings .select("user_id, item_id, rating, from_unix_timestamp(ts) AS ts") .print(); StreamOperator.execute(); ratings.registerTableName("ratings"); StreamOperator .sqlQuery("SELECT user_id, item_id, rating, from_unix_timestamp(ts) AS ts FROM ratings") .print(); StreamOperator.execute(); } public static class WordCount extends TableFunction <Row> { private HashMap <String, Integer> map = new HashMap <>(); public void eval(String str) { if (null == str || str.isEmpty()) { return; } for (String s : str.split(" ")) { if (map.containsKey(s)) { map.put(s, 1 + map.get(s)); } else { map.put(s, 1); } } for (Entry <String, Integer> entry : map.entrySet()) { collect(Row.of(entry.getKey(), entry.getValue())); } map.clear(); } @Override public TypeInformation <Row> getResultType() { return Types.ROW(Types.STRING, Types.INT); } } public static void c_2_3() throws Exception { BatchOperator items = Chap24.getSourceItems(); items.select("item_id, title").lazyPrint(10, "<- original data ->"); BatchOperator <?> words = items.link( new UDTFBatchOp() .setFunc(new WordCount()) .setSelectedCols("title") .setOutputCols("word", "cnt") .setReservedCols("item_id") ); words.lazyPrint(20, "<- after word count ->"); words.groupBy("word", "word, SUM(cnt) AS cnt") .orderBy("cnt", 20, false) .print(); BatchOperator.registerFunction("word_count", new WordCount()); items.registerTableName("items"); BatchOperator .sqlQuery("SELECT item_id, word, cnt FROM items, " + "LATERAL TABLE(word_count(title)) as T(word, cnt)") .firstN(20) .print(); } public static void c_2_4() throws Exception { StreamOperator items = Chap24.getStreamSourceItems(); items = items.select("item_id, title").filter("item_id<4"); items.print(); StreamOperator.execute(); StreamOperator <?> words = items.link( new UDTFStreamOp() .setFunc(new WordCount()) .setSelectedCols("title") .setOutputCols("word", "cnt") .setReservedCols("item_id") ); words.print(); StreamOperator.execute(); StreamOperator.registerFunction("word_count", new WordCount()); items.registerTableName("items"); StreamOperator.sqlQuery("SELECT item_id, word, cnt FROM items, " + "LATERAL TABLE(word_count(title)) as T(word, cnt)") .print(); StreamOperator.execute(); } }