本章包括下面各节:
4.1 简介
4.1.1 Catalog的基本操作
4.1.2 Source和Sink组件
4.2 Hive示例
4.3 Derby示例
4.4 MySQL示例
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。
package com.alibaba.alink;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.io.catalog.DerbyCatalog;
import com.alibaba.alink.common.io.catalog.HiveCatalog;
import com.alibaba.alink.common.io.catalog.MySqlCatalog;
import com.alibaba.alink.common.io.plugin.PluginDownloader;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.sink.CatalogSinkStreamOp;
import com.alibaba.alink.operator.stream.source.CatalogSourceStreamOp;
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import java.io.File;
import java.util.HashMap;
import java.util.List;
public class Chap04 {
static final String DATA_DIR = Utils.ROOT_DIR + "db" + File.separator;
static final String ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin";
static final String IRIS_URL =
"http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data";
static final String IRIS_SCHEMA_STR =
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
static final String DB_NAME = "test_db";
static final String BATCH_TABLE_NAME = "batch_table";
static final String STREAM_TABLE_NAME = "stream_table";
static final String HIVE_VERSION = "2.3.4";
static final String HIVE_CONF_DIR = null;
static final String DERBY_VERSION = "10.6.1.0";
static final String DERBY_DIR = "derby";
static final String MYSQL_VERSION = "5.1.27";
static final String MYSQL_URL = null;
static final String MYSQL_PORT = null;
static final String MYSQL_USER_NAME = null;
static final String MYSQL_PASSWORD = null;
static PluginDownloader DOWNLOADER;
public static void main(String[] args) throws Exception {
if (null != ALINK_PLUGIN_DIR) {
AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR);
AlinkGlobalConfiguration.setPrintProcessInfo(true);
DOWNLOADER = AlinkGlobalConfiguration.getPluginDownloader();
DOWNLOADER.downloadPlugin("hive", HIVE_VERSION);
DOWNLOADER.downloadPlugin("derby", DERBY_VERSION);
DOWNLOADER.downloadPlugin("mysql", MYSQL_VERSION);
c_2();
c_3();
c_4();
}
}
static void c_2() throws Exception {
if (null != HIVE_CONF_DIR) {
HiveCatalog hive = new HiveCatalog("hive_catalog", null, HIVE_VERSION, HIVE_CONF_DIR);
hive.open();
hive.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap <>(), ""), true);
hive.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), true);
hive.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
new CsvSourceBatchOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.lazyPrintStatistics("< origin data >")
.link(
new CatalogSinkBatchOp()
.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
);
BatchOperator.execute();
new CsvSourceStreamOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new CatalogSinkStreamOp()
.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
);
StreamOperator.execute();
new CatalogSourceBatchOp()
.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
.lazyPrintStatistics("< batch catalog source >");
BatchOperator.execute();
new CatalogSourceStreamOp()
.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
.sample(0.02)
.print();
StreamOperator.execute();
System.out.println("< tables before drop >");
System.out.println(JsonConverter.toJson(hive.listTables(DB_NAME)));
if (hive.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
hive.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
}
hive.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
System.out.println("< tables after drop >");
System.out.println(JsonConverter.toJson(hive.listTables(DB_NAME)));
hive.dropDatabase(DB_NAME, true);
hive.close();
}
}
static void c_3() throws Exception {
DerbyCatalog derby = new DerbyCatalog("derby_catalog", null, DERBY_VERSION, DATA_DIR + DERBY_DIR);
derby.open();
derby.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap<>(), ""), true);
derby.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), true);
derby.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
new CsvSourceBatchOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.lazyPrintStatistics("< origin data >")
.link(
new CatalogSinkBatchOp()
.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
);
BatchOperator.execute();
new CsvSourceStreamOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new CatalogSinkStreamOp()
.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
);
StreamOperator.execute();
new CatalogSourceBatchOp()
.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
.lazyPrintStatistics("< batch catalog source >");
BatchOperator.execute();
new CatalogSourceStreamOp()
.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
.sample(0.02)
.print();
StreamOperator.execute();
System.out.println("< tables before drop >");
System.out.println(JsonConverter.toJson(derby.listTables(DB_NAME)));
if (derby.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
derby.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
}
derby.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
System.out.println("< tables after drop >");
System.out.println(JsonConverter.toJson(derby.listTables(DB_NAME)));
derby.dropDatabase(DB_NAME, true);
derby.close();
}
static void c_4() throws Exception {
if (null != MYSQL_URL) {
MySqlCatalog mySql = new MySqlCatalog("mysql_catalog", null, MYSQL_VERSION,
MYSQL_URL, MYSQL_PORT, MYSQL_USER_NAME, MYSQL_PASSWORD);
mySql.open();
mySql.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap <>(), ""), true);
new CsvSourceBatchOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.lazyPrintStatistics("< origin data >")
.link(
new CatalogSinkBatchOp()
.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
);
BatchOperator.execute();
new CsvSourceStreamOp()
.setFilePath(IRIS_URL)
.setSchemaStr(IRIS_SCHEMA_STR)
.link(
new CatalogSinkStreamOp()
.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
);
StreamOperator.execute();
new CatalogSourceBatchOp()
.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
.lazyPrintStatistics("< batch catalog source >");
BatchOperator.execute();
new CatalogSourceStreamOp()
.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
.sample(0.02)
.print();
StreamOperator.execute();
System.out.println("< tables before drop >");
System.out.println(JsonConverter.toJson(mySql.listTables(DB_NAME)));
if (mySql.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
mySql.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
}
mySql.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
System.out.println("< tables after drop >");
System.out.println(JsonConverter.toJson(mySql.listTables(DB_NAME)));
mySql.dropDatabase(DB_NAME, true);
mySql.close();
}
}
}