本章包括下面各节:
4.1 简介
4.1.1 Catalog的基本操作
4.1.2 Source组件和Sink组件
4.2 Hive示例
4.3 Derby示例
4.4 MySQL示例
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import * useLocalEnv(1) from utils import * import os import pandas as pd from pyflink.table.catalog import CatalogDatabase from pyflink.table.catalog import ObjectPath DATA_DIR = ROOT_DIR + "db" + os.sep ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin" IRIS_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data" IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string" DB_NAME = "test_db" BATCH_TABLE_NAME = "batch_table" STREAM_TABLE_NAME = "stream_table" HIVE_VERSION = "2.3.4" HIVE_CONF_DIR = None DERBY_VERSION = "10.6.1.0" DERBY_DIR = "derby" MYSQL_VERSION = "5.1.27" MYSQL_URL = None MYSQL_PORT = None MYSQL_USER_NAME = None MYSQL_PASSWORD = None if None!=ALINK_PLUGIN_DIR : AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR) AlinkGlobalConfiguration.setPrintProcessInfo(True) DOWNLOADER = AlinkGlobalConfiguration.getPluginDownloader() if None!=HIVE_CONF_DIR : DOWNLOADER.downloadPlugin("hive", HIVE_VERSION) if None!=DERBY_DIR : DOWNLOADER.downloadPlugin("derby", DERBY_VERSION) if None!=MYSQL_URL : DOWNLOADER.downloadPlugin("mysql", MYSQL_VERSION)
#c_2 if None!=ALINK_PLUGIN_DIR and None!=HIVE_CONF_DIR : hive = HiveCatalog("hive_catalog", None, HIVE_VERSION, HIVE_CONF_DIR) hive.open() gateway = get_java_gateway() database_properties = gateway.jvm.java.util.HashMap() database_comment = None j_database = gateway.jvm.org.apache.flink.table.catalog.CatalogDatabaseImpl( database_properties, database_comment) hive.create_database(DB_NAME, CatalogDatabase(j_database), True) hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True) hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) # batch sink CsvSourceBatchOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .lazyPrintStatistics("< origin data >")\ .link( CatalogSinkBatchOp().setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME))) ) BatchOperator.execute() # stream sink CsvSourceStreamOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( CatalogSinkStreamOp()\ .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME))) ) StreamOperator.execute() # batch source CatalogSourceBatchOp()\ .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\ .lazyPrintStatistics("< batch catalog source >") BatchOperator.execute(); # stream source CatalogSourceStreamOp()\ .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\ .sample(0.02)\ .print() StreamOperator.execute() # meta operation print("< tables before drop >") print(hive.list_tables(DB_NAME)) if hive.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) : hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False) hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) print("< tables after drop >") print(hive.list_tables(DB_NAME)) hive.drop_database(DB_NAME, True) hive.close()
#c_3 if None!=ALINK_PLUGIN_DIR and None!=DERBY_DIR : derby = DerbyCatalog("derby_catalog", None, DERBY_VERSION, DATA_DIR + DERBY_DIR) derby.open() derby.create_database(DB_NAME, CatalogDatabase(None), True) derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True) derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) # batch sink CsvSourceBatchOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .lazyPrintStatistics("< origin data >")\ .link( CatalogSinkBatchOp().setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME))) ) BatchOperator.execute() # stream sink CsvSourceStreamOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( CatalogSinkStreamOp()\ .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME))) ) StreamOperator.execute() # batch source CatalogSourceBatchOp()\ .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\ .lazyPrintStatistics("< batch catalog source >") BatchOperator.execute(); # stream source CatalogSourceStreamOp()\ .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\ .sample(0.02)\ .print() StreamOperator.execute() # meta operation print("< tables before drop >") print(derby.list_tables(DB_NAME)) if derby.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) : derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False) derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) print("< tables after drop >") print(derby.list_tables(DB_NAME)) derby.drop_database(DB_NAME, True) derby.close()
#c_3 if None!=ALINK_PLUGIN_DIR and None!=MYSQL_URL : mysql = MySqlCatalog("mysql_catalog", "mysql", MYSQL_VERSION, MYSQL_URL, MYSQL_PORT, MYSQL_USER_NAME, MYSQL_PASSWORD) mysql.open() mysql.create_database(DB_NAME, CatalogDatabase(None), True) mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True) mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) # batch sink CsvSourceBatchOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .lazyPrintStatistics("< origin data >")\ .link( CatalogSinkBatchOp().setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME))) ) BatchOperator.execute() # stream sink CsvSourceStreamOp()\ .setFilePath(IRIS_URL)\ .setSchemaStr(IRIS_SCHEMA_STR)\ .link( CatalogSinkStreamOp()\ .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME))) ) StreamOperator.execute() # batch source CatalogSourceBatchOp()\ .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\ .lazyPrintStatistics("< batch catalog source >") BatchOperator.execute(); # stream source CatalogSourceStreamOp()\ .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\ .sample(0.02)\ .print() StreamOperator.execute() # meta operation print("< tables before drop >") print(mysql.list_tables(DB_NAME)) if mysql.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) : mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False) mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True) print("< tables after drop >") print(mysql.list_tables(DB_NAME)) mysql.drop_database(DB_NAME, True) mysql.close()