Alink教程(Java版)

第4章 数据库与数据表

本章包括下面各节:
4.1 简介
4.1.1 Catalog的基本操作
4.1.2 Source和Sink组件
4.2 Hive示例
4.3 Derby示例
4.4 MySQL示例

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Java)》,这里为本章对应的示例代码。

package com.alibaba.alink;

import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.io.catalog.DerbyCatalog;
import com.alibaba.alink.common.io.catalog.HiveCatalog;
import com.alibaba.alink.common.io.catalog.MySqlCatalog;
import com.alibaba.alink.common.io.plugin.PluginDownloader;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.operator.stream.StreamOperator;
import com.alibaba.alink.operator.stream.sink.CatalogSinkStreamOp;
import com.alibaba.alink.operator.stream.source.CatalogSourceStreamOp;
import com.alibaba.alink.operator.stream.source.CsvSourceStreamOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;

import java.io.File;
import java.util.HashMap;
import java.util.List;

public class Chap04 {

	static final String DATA_DIR = Utils.ROOT_DIR + "db" + File.separator;

	static final String ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin";

	static final String IRIS_URL =
		"http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data";
	static final String IRIS_SCHEMA_STR =
		"sepal_length double, sepal_width double, petal_length double, petal_width double, category string";

	static final String DB_NAME = "test_db";
	static final String BATCH_TABLE_NAME = "batch_table";
	static final String STREAM_TABLE_NAME = "stream_table";

	static final String HIVE_VERSION = "2.3.4";
	static final String HIVE_CONF_DIR = null;

	static final String DERBY_VERSION = "10.6.1.0";
	static final String DERBY_DIR = "derby";

	static final String MYSQL_VERSION = "5.1.27";
	static final String MYSQL_URL = null;
	static final String MYSQL_PORT = null;
	static final String MYSQL_USER_NAME = null;
	static final String MYSQL_PASSWORD = null;

	static PluginDownloader DOWNLOADER;

	public static void main(String[] args) throws Exception {

		if (null != ALINK_PLUGIN_DIR) {

			AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR);

			AlinkGlobalConfiguration.setPrintProcessInfo(true);
			DOWNLOADER = AlinkGlobalConfiguration.getPluginDownloader();
			DOWNLOADER.downloadPlugin("hive", HIVE_VERSION);
			DOWNLOADER.downloadPlugin("derby", DERBY_VERSION);
			DOWNLOADER.downloadPlugin("mysql", MYSQL_VERSION);

			c_2();

			c_3();

			c_4();
		}

	}

	static void c_2() throws Exception {

		if (null != HIVE_CONF_DIR) {
			HiveCatalog hive = new HiveCatalog("hive_catalog", null, HIVE_VERSION, HIVE_CONF_DIR);

			hive.open();

			hive.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap <>(), ""), true);

			hive.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), true);
			hive.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);

			new CsvSourceBatchOp()
				.setFilePath(IRIS_URL)
				.setSchemaStr(IRIS_SCHEMA_STR)
				.lazyPrintStatistics("< origin data >")
				.link(
					new CatalogSinkBatchOp()
						.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
				);
			BatchOperator.execute();

			new CsvSourceStreamOp()
				.setFilePath(IRIS_URL)
				.setSchemaStr(IRIS_SCHEMA_STR)
				.link(
					new CatalogSinkStreamOp()
						.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
				);
			StreamOperator.execute();

			new CatalogSourceBatchOp()
				.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
				.lazyPrintStatistics("< batch catalog source >");
			BatchOperator.execute();

			new CatalogSourceStreamOp()
				.setCatalogObject(new CatalogObject(hive, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
				.sample(0.02)
				.print();
			StreamOperator.execute();

			System.out.println("< tables before drop >");
			System.out.println(JsonConverter.toJson(hive.listTables(DB_NAME)));

			if (hive.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
				hive.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
			}
			hive.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);

			System.out.println("< tables after drop >");
			System.out.println(JsonConverter.toJson(hive.listTables(DB_NAME)));

			hive.dropDatabase(DB_NAME, true);
			hive.close();
		}
	}

	static void c_3() throws Exception {

		DerbyCatalog derby = new DerbyCatalog("derby_catalog", null, DERBY_VERSION, DATA_DIR + DERBY_DIR);

		derby.open();

		derby.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap<>(), ""), true);
		derby.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), true);
		derby.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);
		new CsvSourceBatchOp()
			.setFilePath(IRIS_URL)
			.setSchemaStr(IRIS_SCHEMA_STR)
			.lazyPrintStatistics("< origin data >")
			.link(
				new CatalogSinkBatchOp()
					.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
			);
		BatchOperator.execute();

		new CsvSourceStreamOp()
			.setFilePath(IRIS_URL)
			.setSchemaStr(IRIS_SCHEMA_STR)
			.link(
				new CatalogSinkStreamOp()
					.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
			);
		StreamOperator.execute();

		new CatalogSourceBatchOp()
			.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
			.lazyPrintStatistics("< batch catalog source >");
		BatchOperator.execute();

		new CatalogSourceStreamOp()
			.setCatalogObject(new CatalogObject(derby, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
			.sample(0.02)
			.print();
		StreamOperator.execute();

		System.out.println("< tables before drop >");
		System.out.println(JsonConverter.toJson(derby.listTables(DB_NAME)));

		if (derby.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
			derby.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
		}
		derby.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);

		System.out.println("< tables after drop >");
		System.out.println(JsonConverter.toJson(derby.listTables(DB_NAME)));

		derby.dropDatabase(DB_NAME, true);
		derby.close();
	}

	static void c_4() throws Exception {

		if (null != MYSQL_URL) {
			MySqlCatalog mySql = new MySqlCatalog("mysql_catalog", null, MYSQL_VERSION,
				MYSQL_URL, MYSQL_PORT, MYSQL_USER_NAME, MYSQL_PASSWORD);

			mySql.open();

			mySql.createDatabase(DB_NAME, new CatalogDatabaseImpl(new HashMap <>(), ""), true);

			new CsvSourceBatchOp()
				.setFilePath(IRIS_URL)
				.setSchemaStr(IRIS_SCHEMA_STR)
				.lazyPrintStatistics("< origin data >")
				.link(
					new CatalogSinkBatchOp()
						.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
				);
			BatchOperator.execute();

			new CsvSourceStreamOp()
				.setFilePath(IRIS_URL)
				.setSchemaStr(IRIS_SCHEMA_STR)
				.link(
					new CatalogSinkStreamOp()
						.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
				);
			StreamOperator.execute();

			new CatalogSourceBatchOp()
				.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
				.lazyPrintStatistics("< batch catalog source >");
			BatchOperator.execute();

			new CatalogSourceStreamOp()
				.setCatalogObject(new CatalogObject(mySql, new ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
				.sample(0.02)
				.print();
			StreamOperator.execute();

			System.out.println("< tables before drop >");
			System.out.println(JsonConverter.toJson(mySql.listTables(DB_NAME)));

			if (mySql.tableExists(new ObjectPath(DB_NAME, BATCH_TABLE_NAME))) {
				mySql.dropTable(new ObjectPath(DB_NAME, BATCH_TABLE_NAME), false);
			}
			mySql.dropTable(new ObjectPath(DB_NAME, STREAM_TABLE_NAME), true);

			System.out.println("< tables after drop >");
			System.out.println(JsonConverter.toJson(mySql.listTables(DB_NAME)));

			mySql.dropDatabase(DB_NAME, true);
			mySql.close();
		}
	}

}