本章包括下面各节:
4.1 简介
4.1.1 Catalog的基本操作
4.1.2 Source组件和Sink组件
4.2 Hive示例
4.3 Derby示例
4.4 MySQL示例
详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。
from pyalink.alink import *
useLocalEnv(1)
from utils import *
import os
import pandas as pd
from pyflink.table.catalog import CatalogDatabase
from pyflink.table.catalog import ObjectPath
DATA_DIR = ROOT_DIR + "db" + os.sep
ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin"
IRIS_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
DB_NAME = "test_db"
BATCH_TABLE_NAME = "batch_table"
STREAM_TABLE_NAME = "stream_table"
HIVE_VERSION = "2.3.4"
HIVE_CONF_DIR = None
DERBY_VERSION = "10.6.1.0"
DERBY_DIR = "derby"
MYSQL_VERSION = "5.1.27"
MYSQL_URL = None
MYSQL_PORT = None
MYSQL_USER_NAME = None
MYSQL_PASSWORD = None
if None!=ALINK_PLUGIN_DIR :
AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR)
AlinkGlobalConfiguration.setPrintProcessInfo(True)
DOWNLOADER = AlinkGlobalConfiguration.getPluginDownloader()
if None!=HIVE_CONF_DIR :
DOWNLOADER.downloadPlugin("hive", HIVE_VERSION)
if None!=DERBY_DIR :
DOWNLOADER.downloadPlugin("derby", DERBY_VERSION)
if None!=MYSQL_URL :
DOWNLOADER.downloadPlugin("mysql", MYSQL_VERSION)
#c_2
if None!=ALINK_PLUGIN_DIR and None!=HIVE_CONF_DIR :
hive = HiveCatalog("hive_catalog", None, HIVE_VERSION, HIVE_CONF_DIR)
hive.open()
gateway = get_java_gateway()
database_properties = gateway.jvm.java.util.HashMap()
database_comment = None
j_database = gateway.jvm.org.apache.flink.table.catalog.CatalogDatabaseImpl(
database_properties, database_comment)
hive.create_database(DB_NAME, CatalogDatabase(j_database), True)
hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
# batch sink
CsvSourceBatchOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.lazyPrintStatistics("< origin data >")\
.link(
CatalogSinkBatchOp().setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
)
BatchOperator.execute()
# stream sink
CsvSourceStreamOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
CatalogSinkStreamOp()\
.setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
)
StreamOperator.execute()
# batch source
CatalogSourceBatchOp()\
.setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
.lazyPrintStatistics("< batch catalog source >")
BatchOperator.execute();
# stream source
CatalogSourceStreamOp()\
.setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
.sample(0.02)\
.print()
StreamOperator.execute()
# meta operation
print("< tables before drop >")
print(hive.list_tables(DB_NAME))
if hive.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
print("< tables after drop >")
print(hive.list_tables(DB_NAME))
hive.drop_database(DB_NAME, True)
hive.close()
#c_3
if None!=ALINK_PLUGIN_DIR and None!=DERBY_DIR :
derby = DerbyCatalog("derby_catalog", None, DERBY_VERSION, DATA_DIR + DERBY_DIR)
derby.open()
derby.create_database(DB_NAME, CatalogDatabase(None), True)
derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
# batch sink
CsvSourceBatchOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.lazyPrintStatistics("< origin data >")\
.link(
CatalogSinkBatchOp().setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
)
BatchOperator.execute()
# stream sink
CsvSourceStreamOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
CatalogSinkStreamOp()\
.setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
)
StreamOperator.execute()
# batch source
CatalogSourceBatchOp()\
.setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
.lazyPrintStatistics("< batch catalog source >")
BatchOperator.execute();
# stream source
CatalogSourceStreamOp()\
.setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
.sample(0.02)\
.print()
StreamOperator.execute()
# meta operation
print("< tables before drop >")
print(derby.list_tables(DB_NAME))
if derby.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
print("< tables after drop >")
print(derby.list_tables(DB_NAME))
derby.drop_database(DB_NAME, True)
derby.close()
#c_3
if None!=ALINK_PLUGIN_DIR and None!=MYSQL_URL :
mysql = MySqlCatalog("mysql_catalog", "mysql", MYSQL_VERSION, MYSQL_URL, MYSQL_PORT, MYSQL_USER_NAME, MYSQL_PASSWORD)
mysql.open()
mysql.create_database(DB_NAME, CatalogDatabase(None), True)
mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
# batch sink
CsvSourceBatchOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.lazyPrintStatistics("< origin data >")\
.link(
CatalogSinkBatchOp().setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
)
BatchOperator.execute()
# stream sink
CsvSourceStreamOp()\
.setFilePath(IRIS_URL)\
.setSchemaStr(IRIS_SCHEMA_STR)\
.link(
CatalogSinkStreamOp()\
.setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
)
StreamOperator.execute()
# batch source
CatalogSourceBatchOp()\
.setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
.lazyPrintStatistics("< batch catalog source >")
BatchOperator.execute();
# stream source
CatalogSourceStreamOp()\
.setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
.sample(0.02)\
.print()
StreamOperator.execute()
# meta operation
print("< tables before drop >")
print(mysql.list_tables(DB_NAME))
if mysql.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)
print("< tables after drop >")
print(mysql.list_tables(DB_NAME))
mysql.drop_database(DB_NAME, True)
mysql.close()