本章包括下面各节:
5.1 基本操作
5.1.1 注册
5.1.2 运行
5.1.3 内置函数
5.1.4 用户定义函数
5.2 简化操作
5.2.1 单表操作
5.2.2 两表的连接(JOIN)操作
5.2.3 两表的集合操作
5.3 深入介绍Table Environment
5.3.1 注册数据表名
5.3.2 撤销数据表名
5.3.3 扫描已注册的表
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink; import com.alibaba.alink.common.MLEnvironment; import com.alibaba.alink.common.MLEnvironmentFactory; import com.alibaba.alink.common.type.AlinkTypes; import com.alibaba.alink.common.utils.Stopwatch; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sql.*; import com.alibaba.alink.operator.stream.StreamOperator; import com.alibaba.alink.operator.stream.source.TableSourceStreamOp; import com.alibaba.alink.operator.stream.utils.DataStreamConversionUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types.Row; import java.util.HashMap; import java.util.Map; public class Chap05 { public static void main(String[] args) throws Exception { Stopwatch sw = new Stopwatch(); sw.start(); BatchOperator.setParallelism(1); c_1(); c_2_1(); c_2_2(); c_2_3(); c_3(); c_3_4(); sw.stop(); System.out.println(sw.getElapsedTimeSpan()); } static void c_1() throws Exception { BatchOperator<?> ratings = Chap24.getSourceRatings(); BatchOperator<?> users = Chap24.getSourceUsers(); BatchOperator<?> items = Chap24.getSourceItems(); ratings.registerTableName("ratings"); items.registerTableName("items"); users.registerTableName("users"); BatchOperator.sqlQuery( "SELECT title, cnt, avg_rating" + " FROM ( SELECT item_id, COUNT(*) AS cnt, AVG(rating) AS avg_rating" + " FROM ratings " + " GROUP BY item_id " + " ORDER BY cnt DESC LIMIT 10 " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY cnt DESC" ).print(); BatchOperator.registerFunction("from_unix_timestamp", new FromUnixTimestamp()); BatchOperator.sqlQuery( "SELECT MIN(dt) AS min_dt, MAX(dt) AS max_dt " + " FROM ( SELECT from_unix_timestamp(ts) AS dt, 1 AS grp FROM ratings) " + " GROUP BY grp " ).print(); ratings .select("from_unix_timestamp(ts) AS dt, 1 AS grp") .groupBy("grp", "MIN(dt) AS min_dt, MAX(dt) AS max_dt") .print(); BatchOperator.sqlQuery( "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating" + " FROM ( SELECT item_id, COUNT(rating) AS cnt, " + " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, " + " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating " + " FROM (SELECT item_id, rating, gender FROM ratings " + " JOIN users ON ratings.user_id=users.user_id)" + " GROUP BY item_id " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY diff_rating DESC LIMIT 10" ).print(); BatchOperator.sqlQuery( "SELECT title, cnt, m_rating, f_rating, ABS(m_rating - f_rating) AS diff_rating" + " FROM ( SELECT item_id, COUNT(rating) AS cnt, " + " AVG(CASE WHEN gender='M' THEN rating ELSE NULL END) AS m_rating, " + " AVG(CASE WHEN gender='F' THEN rating ELSE NULL END) AS f_rating " + " FROM (SELECT item_id, rating, gender FROM ratings " + " JOIN users ON ratings.user_id=users.user_id)" + " GROUP BY item_id " + " HAVING COUNT(rating)>=50 " + " ) AS t" + " JOIN items" + " ON t.item_id=items.item_id" + " ORDER BY diff_rating DESC LIMIT 10" ).print(); } static void c_2_1() throws Exception { BatchOperator<?> ratings = Chap24.getSourceRatings(); BatchOperator<?> users = Chap24.getSourceUsers(); BatchOperator ratings_select = ratings.select("user_id, item_id AS movie_id"); ratings_select.firstN(5).print(); ratings.select("user_id, item_id AS movie_id").firstN(5).print(); ratings_select = ratings.select("*"); ratings_select.firstN(5).print(); ratings.as("f1,f2,f3,f4").firstN(5).print(); ratings.filter("rating > 3").firstN(5).print(); ratings.where("rating > 3").firstN(5).print(); users.select("gender").distinct().print(); users.groupBy("gender", "gender, COUNT(*) AS cnt").print(); users.orderBy("age", 5).print(); users.orderBy("age", 1, 3).print(); users.orderBy("age", 5, false).print(); users.orderBy("age", 1, 3, false).print(); } static void c_2_2() throws Exception { BatchOperator<?> ratings = Chap24.getSourceRatings(); BatchOperator<?> items = Chap24.getSourceItems(); BatchOperator left_ratings = ratings .filter("user_id<3 AND item_id<4") .select("user_id, item_id, rating"); BatchOperator right_movies = items .select("item_id AS movie_id, title") .filter("movie_id < 6 AND MOD(movie_id, 2) = 1"); System.out.println("# left_ratings #"); left_ratings.print(); System.out.println("\n# right_movies #"); right_movies.print(); System.out.println("# JOIN #"); new JoinBatchOp() .setJoinPredicate("item_id = movie_id") .setSelectClause("user_id, item_id, title, rating") .linkFrom(left_ratings, right_movies) .print(); System.out.println("\n# LEFT OUTER JOIN #"); new LeftOuterJoinBatchOp() .setJoinPredicate("item_id = movie_id") .setSelectClause("user_id, item_id, title, rating") .linkFrom(left_ratings, right_movies) .print(); System.out.println("\n# RIGHT OUTER JOIN #"); new RightOuterJoinBatchOp() .setJoinPredicate("item_id = movie_id") .setSelectClause("user_id, item_id, title, rating") .linkFrom(left_ratings, right_movies) .print(); System.out.println("\n# FULL OUTER JOIN #"); new FullOuterJoinBatchOp() .setJoinPredicate("item_id = movie_id") .setSelectClause("user_id, item_id, title, rating") .linkFrom(left_ratings, right_movies) .print(); } static void c_2_3() throws Exception { BatchOperator<?> users = Chap24.getSourceUsers(); BatchOperator users_1_4 = users.filter("user_id<5"); System.out.println("# users_1_4 #"); users_1_4.print(); BatchOperator users_3_6 = users.filter("user_id>2 AND user_id<7"); System.out.println("\n# users_3_6 #"); users_3_6.print(); new UnionAllBatchOp().linkFrom(users_1_4, users_3_6).print(); new UnionBatchOp().linkFrom(users_1_4, users_3_6).print(); new IntersectBatchOp().linkFrom(users_1_4, users_3_6).print(); new IntersectAllBatchOp() .linkFrom( new UnionAllBatchOp().linkFrom(users_1_4, users_1_4), new UnionAllBatchOp().linkFrom(users_1_4, users_3_6) ) .print(); new MinusBatchOp().linkFrom(users_1_4, users_3_6).print(); new MinusAllBatchOp() .linkFrom( new UnionAllBatchOp().linkFrom(users_1_4, users_1_4), new UnionAllBatchOp().linkFrom(users_1_4, users_3_6) ) .print(); } static void c_3() throws Exception { BatchTableEnvironment benv = MLEnvironmentFactory.getDefault().getBatchTableEnvironment(); for (String name : benv.listTables()) { benv.sqlUpdate("DROP TABLE IF EXISTS " + name); } BatchOperator<?> ratings = Chap24.getSourceRatings(); BatchOperator<?> users = Chap24.getSourceUsers(); BatchOperator<?> items = Chap24.getSourceItems(); ratings.registerTableName("ratings"); items.registerTableName("items"); users.registerTableName("users"); String[] tableNames = MLEnvironmentFactory.getDefault().getBatchTableEnvironment().listTables(); System.out.println("Table Names : "); for (String name : tableNames) { System.out.println(name); } BatchTableEnvironment batchTableEnvironment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment(); System.out.println("Table Names : "); for (String name : batchTableEnvironment.listTables()) { System.out.println(name); } batchTableEnvironment.sqlUpdate("DROP TABLE IF EXISTS users"); System.out.println("\nTable Names After DROP : "); for (String name : batchTableEnvironment.listTables()) { System.out.println(name); } BatchOperator ratings_scan = BatchOperator.fromTable(batchTableEnvironment.scan("ratings")); ratings_scan.firstN(5).print(); for (String name : benv.listTables()) { benv.sqlUpdate("DROP TABLE IF EXISTS " + name); } } public static class FromUnixTimestamp extends ScalarFunction { public java.sql.Timestamp eval(Long ts) { return new java.sql.Timestamp(ts * 1000); } } static void c_3_4() throws Exception { MLEnvironment mlEnv = MLEnvironmentFactory.getDefault(); StreamExecutionEnvironment env = mlEnv.getStreamExecutionEnvironment(); StreamTableEnvironment tenv = mlEnv.getStreamTableEnvironment(); DataStreamSource<Map<String, Object>> inputDataStreamMap = env.addSource( new SourceFunction<Map<String, Object>>() { @Override public void run(SourceContext<Map<String, Object>> out) throws Exception { Map<String, Object> item = new HashMap<>(); item.put("name", "a"); item.put("val", 110); out.collect(item); Map<String, Object> item1 = new HashMap<>(); item1.put("name", "b"); item1.put("val", 111); out.collect(item1); Map<String, Object> item2 = new HashMap<>(); item2.put("name", "c"); item2.put("val", 113); out.collect(item2); } @Override public void cancel() { } }); inputDataStreamMap.print(); DataStream<Row> inputDataStreamRow = inputDataStreamMap.map(new MapFunction<Map<String, Object>, Row>() { @Override public Row map(Map<String, Object> value) throws Exception { return Row.of(value.get("name"), value.get("val")); } }); inputDataStreamRow.print(); Table inputTable = DataStreamConversionUtil.toTable(mlEnv, inputDataStreamRow, new String[]{"name", "val"}, new TypeInformation<?>[]{AlinkTypes.STRING, AlinkTypes.INT}); inputTable.printSchema(); TableSourceStreamOp inputStreamOp = new TableSourceStreamOp(inputTable); StreamOperator<?> outputStreamOp = inputStreamOp .select("name, val + 1 AS val, 'output' AS type"); outputStreamOp.print(); Table outputTable = outputStreamOp.getOutputTable(); outputTable.printSchema(); StreamOperator.execute(); } }