Alink教程(Python版)
该文档涉及的组件

第1.2.1节 PyAlink快速开始


使用前准备


包名和版本说明:


  • PyAlink 根据 Alink 所支持的 Flink 版本提供不同的 Python 包:
    其中,pyalink 包对应为 Alink 所支持的最新 Flink 版本,当前为 1.13,而 pyalink-flink-*** 为旧版本的 Flink 版本,当前提供 pyalink-flink-1.12, pyalink-flink-1.11, pyalink-flink-1.10pyalink-flink-1.9
  • Python 包的版本号与 Alink 的版本号一致,例如1.4.0


安装步骤:

  1. 确保使用环境中有Python3,版本限于 3.6,3.7 和 3.8。
  2. 确保使用环境中安装有 Java 8。
  3. 使用 pip 命令进行安装:
    pip install pyalinkpip install pyalink-flink-1.12pip install pyalink-flink-1.11pip install pyalink-flink-1.10 或者 pip install pyalink-flink-1.9


更多内容参见:

  • 第1.2.1.1节 PyAlink安装准备——MacOS
  • 第1.2.1.2节 PyAlink安装准备——阿里云服务器
  • 第1.2.1.3节 如何安装最新版本PyAlink?
  • 第1.2.1.4节 PyAlink的版本查询、卸载旧版本

安装注意事项:


  1. pyalinkpyalink-flink-*** 不能同时安装,也不能与旧版本同时安装。
    如果之前安装过 pyalink 或者 pyalink-flink-***,请使用pip uninstall pyalink 或者 pip uninstall pyalink-flink-*** 卸载之前的版本。
  2. 出现pip安装缓慢或不成功的情况,可以参考这篇文章修改pip源,或者直接使用下面的链接下载 whl 包,然后使用 pip 安装:
    • Flink 1.13:链接 (MD5: 94751cc1e31f174b446142455293fb30)
    • Flink 1.12:链接 (MD5: 4cbafe08b24b3467d9096f8a8a07321f)
    • Flink 1.11:链接 (MD5: 1810f6769bd2d2d77358f4e51948a937)
    • Flink 1.10:链接 (MD5: 79ceea2788ad2159ae23ad3e9e83a261)
    • Flink 1.9: 链接 (MD5: fc143df37d15d6bd3b0733fef8969ef1)

  1. 如果有多个版本的 Python,可能需要使用特定版本的 pip,比如 pip3;如果使用 Anaconda,则需要在 Anaconda 命令行中进行安装。


下载安装文件系统或 Catalog 依赖 jar 包:


安装 PyAlink 之后,可以直接运行 download_pyalink_dep_jars 命令,下载支持文件系统功能所需要的 jar 包。
(如果提示找不到这个命令,可以尝试直接运行脚本: python3 -c 'from pyalink.alink.download_pyalink_dep_jars import main;main()'。)


运行这个命令后,将提问是否安装某种文件系统对应的 jar 包,并选择合适的版本。 当前支持的文件系统包括:


  • OSS:3.4.1
  • Hadoop:2.8.3
  • Hive:2.3.4
  • MySQL: 5.1.27
  • Derby: 10.6.1.0
  • SQLite: 3.19.3
  • S3-hadoop: 1.11.788
  • S3-presto: 1.11.788
  • odps: 0.36.4-public


这些 jar 包将被下载到 PyAlink 安装路径的 lib/plugins 目录下,所以要求运行命令时有 PyAlink 安装目录的权限。


运行命令时,也可以增加参数:download_pyalink_dep_jars -d,将自动下载所有的 jar 包。


开始使用


可以通过 Jupyter Notebook 来开始使用 PyAlink,能获得更好的使用体验。


使用步骤:


  1. 在命令行中启动Jupyter:jupyter notebook,并新建 Python 3 的 Notebook 。
  2. 导入 pyalink 包:from pyalink.alink import *
  3. 使用方法创建本地运行环境:
    useLocalEnv(parallism, flinkHome=None, config=None)
    其中,参数 parallism 表示执行所使用的并行度;flinkHome 为 flink 的完整路径,一般情况不需要设置;config为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功:


JVM listening on ***


  1. 开始编写 PyAlink 代码,例如:


source = CsvSourceBatchOp()\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
res = source.select(["sepal_length", "sepal_width"])
df = res.collectToDataframe()
print(df)


编写代码


在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 setXXX 设置参数,通过 link/linkTo/linkFrom 与其他组件相连。
这里利用 Jupyter Notebook 的自动补全机制可以提供书写便利。


对于批式作业,可以通过批式组件的 print/collectToDataframe/collectToDataframes 等方法或者 BatchOperator.execute() 来触发执行;对于流式作业,则通过 StreamOperator.execute() 来启动作业。