pyalink
包对应为 Alink 所支持的最新 Flink 版本,当前为 1.13,而 pyalink-flink-***
为旧版本的 Flink 版本,当前提供 pyalink-flink-1.12
, pyalink-flink-1.11
, pyalink-flink-1.10
和 pyalink-flink-1.9
。1.4.0
。pip install pyalink
、pip install pyalink-flink-1.12
、pip install pyalink-flink-1.11
、pip install pyalink-flink-1.10
或者 pip install pyalink-flink-1.9
。更多内容参见:
pyalink
和 pyalink-flink-***
不能同时安装,也不能与旧版本同时安装。pyalink
或者 pyalink-flink-***
,请使用pip uninstall pyalink
或者 pip uninstall pyalink-flink-***
卸载之前的版本。pip
安装缓慢或不成功的情况,可以参考这篇文章修改pip源,或者直接使用下面的链接下载 whl 包,然后使用 pip
安装:
pip
,比如 pip3
;如果使用 Anaconda,则需要在 Anaconda 命令行中进行安装。安装 PyAlink 之后,可以直接运行 download_pyalink_dep_jars
命令,下载支持文件系统功能所需要的 jar 包。
(如果提示找不到这个命令,可以尝试直接运行脚本: python3 -c 'from pyalink.alink.download_pyalink_dep_jars import main;main()'
。)
运行这个命令后,将提问是否安装某种文件系统对应的 jar 包,并选择合适的版本。 当前支持的文件系统包括:
这些 jar 包将被下载到 PyAlink 安装路径的 lib/plugins
目录下,所以要求运行命令时有 PyAlink 安装目录的权限。
运行命令时,也可以增加参数:download_pyalink_dep_jars -d
,将自动下载所有的 jar 包。
可以通过 Jupyter Notebook 来开始使用 PyAlink,能获得更好的使用体验。
使用步骤:
jupyter notebook
,并新建 Python 3 的 Notebook 。from pyalink.alink import *
。useLocalEnv(parallism, flinkHome=None, config=None)
。parallism
表示执行所使用的并行度;flinkHome
为 flink 的完整路径,一般情况不需要设置;config
为Flink所接受的配置参数。运行后出现如下所示的输出,表示初始化运行环境成功:JVM listening on ***
source = CsvSourceBatchOp()\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\ .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv") res = source.select(["sepal_length", "sepal_width"]) df = res.collectToDataframe() print(df)
在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 setXXX
设置参数,通过 link/linkTo/linkFrom
与其他组件相连。
这里利用 Jupyter Notebook 的自动补全机制可以提供书写便利。
对于批式作业,可以通过批式组件的 print/collectToDataframe/collectToDataframes
等方法或者 BatchOperator.execute()
来触发执行;对于流式作业,则通过 StreamOperator.execute()
来启动作业。