Alink教程(Python版)

第4章 数据库与数据表

本章包括下面各节:
4.1 简介
4.1.1 Catalog的基本操作
4.1.2 Source组件和Sink组件
4.2 Hive示例
4.3 Derby示例
4.4 MySQL示例

详细内容请阅读纸质书《Alink权威指南:基于Flink的机器学习实例入门(Python)》,这里为本章对应的示例代码。

from pyalink.alink import *
useLocalEnv(1)

from utils import *
import os
import pandas as pd
from pyflink.table.catalog import CatalogDatabase
from pyflink.table.catalog import ObjectPath

DATA_DIR = ROOT_DIR + "db" + os.sep

ALINK_PLUGIN_DIR = "/Users/yangxu/Downloads/alink_plugin"

IRIS_URL = "http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
IRIS_SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"

DB_NAME = "test_db"
BATCH_TABLE_NAME = "batch_table"
STREAM_TABLE_NAME = "stream_table"

HIVE_VERSION = "2.3.4"
HIVE_CONF_DIR = None

DERBY_VERSION = "10.6.1.0"
DERBY_DIR = "derby"

MYSQL_VERSION = "5.1.27"
MYSQL_URL = None
MYSQL_PORT = None
MYSQL_USER_NAME = None
MYSQL_PASSWORD = None


if None!=ALINK_PLUGIN_DIR :
    AlinkGlobalConfiguration.setPluginDir(ALINK_PLUGIN_DIR)
    
    AlinkGlobalConfiguration.setPrintProcessInfo(True)
    DOWNLOADER = AlinkGlobalConfiguration.getPluginDownloader()
    
    if None!=HIVE_CONF_DIR :
        DOWNLOADER.downloadPlugin("hive", HIVE_VERSION)
    
    if None!=DERBY_DIR :
        DOWNLOADER.downloadPlugin("derby", DERBY_VERSION)

    if None!=MYSQL_URL :
        DOWNLOADER.downloadPlugin("mysql", MYSQL_VERSION)

#c_2

if None!=ALINK_PLUGIN_DIR and None!=HIVE_CONF_DIR :
    hive = HiveCatalog("hive_catalog", None, HIVE_VERSION, HIVE_CONF_DIR)
    hive.open()

    gateway = get_java_gateway()
    database_properties = gateway.jvm.java.util.HashMap()
    database_comment = None
    j_database = gateway.jvm.org.apache.flink.table.catalog.CatalogDatabaseImpl(
        database_properties, database_comment)
    hive.create_database(DB_NAME, CatalogDatabase(j_database), True)

    hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
    hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    # batch sink
    CsvSourceBatchOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .lazyPrintStatistics("< origin data >")\
        .link(
            CatalogSinkBatchOp().setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
        )

    BatchOperator.execute()

    # stream sink
    CsvSourceStreamOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            CatalogSinkStreamOp()\
                .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
    )

    StreamOperator.execute()

    # batch source
    CatalogSourceBatchOp()\
        .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
        .lazyPrintStatistics("< batch catalog source >")

    BatchOperator.execute();

    # stream source
    CatalogSourceStreamOp()\
        .setCatalogObject(CatalogObject(hive, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
        .sample(0.02)\
        .print()

    StreamOperator.execute()

    # meta operation

    print("< tables before drop >")
    print(hive.list_tables(DB_NAME))

    if hive.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
        hive.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
    hive.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    print("< tables after drop >")
    print(hive.list_tables(DB_NAME))

    hive.drop_database(DB_NAME, True)

    hive.close()
#c_3

if None!=ALINK_PLUGIN_DIR and None!=DERBY_DIR :
    derby = DerbyCatalog("derby_catalog", None, DERBY_VERSION, DATA_DIR + DERBY_DIR)

    derby.open()

    derby.create_database(DB_NAME, CatalogDatabase(None), True)

    derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
    derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    # batch sink
    CsvSourceBatchOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .lazyPrintStatistics("< origin data >")\
        .link(
            CatalogSinkBatchOp().setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
        )

    BatchOperator.execute()

    # stream sink
    CsvSourceStreamOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            CatalogSinkStreamOp()\
                .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
    )

    StreamOperator.execute()

    # batch source
    CatalogSourceBatchOp()\
        .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
        .lazyPrintStatistics("< batch catalog source >")

    BatchOperator.execute();

    # stream source
    CatalogSourceStreamOp()\
        .setCatalogObject(CatalogObject(derby, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
        .sample(0.02)\
        .print()

    StreamOperator.execute()

    # meta operation

    print("< tables before drop >")
    print(derby.list_tables(DB_NAME))

    if derby.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
        derby.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
    derby.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    print("< tables after drop >")
    print(derby.list_tables(DB_NAME))

    derby.drop_database(DB_NAME, True)

    derby.close()
#c_3

if None!=ALINK_PLUGIN_DIR and None!=MYSQL_URL :
    mysql = MySqlCatalog("mysql_catalog", "mysql", MYSQL_VERSION, MYSQL_URL, MYSQL_PORT, MYSQL_USER_NAME, MYSQL_PASSWORD)

    mysql.open()

    mysql.create_database(DB_NAME, CatalogDatabase(None), True)

    mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), True)
    mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    # batch sink
    CsvSourceBatchOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .lazyPrintStatistics("< origin data >")\
        .link(
            CatalogSinkBatchOp().setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))
        )

    BatchOperator.execute()

    # stream sink
    CsvSourceStreamOp()\
        .setFilePath(IRIS_URL)\
        .setSchemaStr(IRIS_SCHEMA_STR)\
        .link(
            CatalogSinkStreamOp()\
                .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))
    )

    StreamOperator.execute()

    # batch source
    CatalogSourceBatchOp()\
        .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, BATCH_TABLE_NAME)))\
        .lazyPrintStatistics("< batch catalog source >")

    BatchOperator.execute();

    # stream source
    CatalogSourceStreamOp()\
        .setCatalogObject(CatalogObject(mysql, ObjectPath(DB_NAME, STREAM_TABLE_NAME)))\
        .sample(0.02)\
        .print()

    StreamOperator.execute()

    # meta operation

    print("< tables before drop >")
    print(mysql.list_tables(DB_NAME))

    if mysql.table_exists(ObjectPath(DB_NAME, BATCH_TABLE_NAME)) :
        mysql.drop_table(ObjectPath(DB_NAME, BATCH_TABLE_NAME), False)
    mysql.drop_table(ObjectPath(DB_NAME, STREAM_TABLE_NAME), True)

    print("< tables after drop >")
    print(mysql.list_tables(DB_NAME))

    mysql.drop_database(DB_NAME, True)

    mysql.close()