Alink教程(Python版)

第2.7节 文件系统与数据库

我们处理的数据主要存储在文件系统和数据库中。本书的第3章将专门介绍文件系统,第4章会详细介绍与数据库、数据表相关的操作。Alink采用插件的方式来管理各种文件系统、数据库以及各个版本所需的函数库。本节还会专门介绍如何利用Alink的插件(Plugin)工具自动下载相关的函数库。

下图为Alink文件系统(File System)与数据库(Catalog)的架构图,具体说明如下:


  • Alink通过定义统一的File System,规范常用的文件操作;Alink定义统一的Catalog,抽象常用的数据库(Database)和表(Table)操作。
  • 我们在实际应用中,不会同时用到所有文件系统和数据库。另外,我们还要考虑版本的问题。使用插件机制,方便大家选择适合自己的方式与版本。
  • 在数据源和导出方面,数据文件和数据库均利用统一定义的File System和Catalog来统一操作流程,避免逐个定义带来大量的类似接口。
  • 数据库方面的操作比较直接。确定好Catalog以及数据表的路径信息(数据库→表)。此外,还要统一定义数据表的批式/流式数据源组件(CatalogSourceBatchOp/Catalog- SourceStreamOp),以及定义数据表的批式/流式导出组件(CatalogSinkBatchOp/Catalog- SinkStreamOp)。
  • 对于数据文件,需要考虑两个方面:文件的格式和所在文件系统的路径。由于每种文件格式所需的参数不同,因此Alink按文件格式定义相应的数据源和导出组件。对于XX文件格式,定义批式/流式数据源组件(XXSourceBatchOp/ XXSourceStreamOp),同样定义数据表的批式/流式导出组件(XXSinkBatchOp/ XXSinkStreamOp)。
  • 各组件的相同之处是,文件路径参数的设置方式相同;基本形式是使用FilePath类进行设置,包括两类信息:所在文件系统的信息和文件路径的信息。另外,对于本地路径,可以直接设置路径字符串。


插件下载

Alink能够支持不同第三方库(例如OSSHiveDerbyMySQL等)的不同版本(例如Hive2.3.4版本、2.3.6版本等)。为了更好管理插件(外部的第三方库),我们提供了插件下载器(PluginDownloader)来管理不同插件的多个版本。

插件下载器封装了插件的常见功能,如下所示

    • 枚举仓库中的所有插件。
    • 枚举某个插件的所有版本。
    • 下载某个插件的特定版本/默认版本。
    • 下载所有插件的默认版本。
    • 升级所有的插件。

在Python代码中可以这样使用插件下载器:

# 初始化 PyAlink
from pyalink.alink import *
useLocalEnv(1)

# 设置插件下载的位置。当路径不存在时,会自行创建路径
AlinkGlobalConfiguration.setPluginDir("/Users/xxx/alink_plugins/")

# 获得Alink插件下载器
pluginDownloader = AlinkGlobalConfiguration.getPluginDownloader()

# 从远程加载插件的配置项
pluginDownloader.loadConfig()

# 展示所有可用的插件名称
plugins = pluginDownloader.listAvailablePlugins()
print(plugins)
# 输出结果:['oss', 'hive', 'derby', 'mysql', 'hadoop', 'sqlite']

# 显示第0个插件的所有版本
pluginName = plugins[0] # oss
availableVersions = pluginDownloader.listAvailablePluginVersions(pluginName)
print(availableVersions)
# 输出结果:[3.4.1]

# 下载某个插件的特定版本
pluginVersion = availableVersions[0]
pluginDownloader.downloadPlugin(pluginName, pluginVersion)
# 运行结束后,插件会被下载到"/Users/xxx/alink_plugins/"目录中

# 下载某个插件的默认版本
pluginDownloader.downloadPlugin(pluginName)
# 运行结束后,插件会被下载到"/Users/xxx/alink_plugins/"目录中

# 下载配置文件中所有插件的默认版本
pluginDownloader.downloadAll()

# 插件升级
# 在升级的过程中,会先对旧的插件进行备份,备份文件名称的后缀为.old。等到插件更新完毕后,
# 会统一删除旧的插件包
# 若插件更新中断,则用户可以从.old文件中恢复旧版插件
pluginDownloader.upgrade()