Alink教程(Python版)
该文档涉及的组件

第1.2.2节 在 Flink 集群上运行 PyAlink 任务

编写代码

在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 setXXX 设置参数,通过 link/linkTo/linkFrom 与其他组件相连。代码一般有以下几个基本组成部分

  1. 创建运行环境
  2. 创建数据源
  3. 连接算法组件
  4. 指定结果输出方式
  5. 执行命令

下面分别简述每部分如何编写。

运行环境 分为本地(以多线程方式运行),集群(在flink集群运行),通过以下命令创建

# 1. 本地运行,并发度为2
useLocalEnv(2)
# 2. flink集群运行,参数为地址、端口和并发度
useRemoteEnv("127.0.0.1", 8081, 2)
# 3. flink集群运行,没有参数,运行前需将
# .../site-packages/pyalink/lib下的jar包拷贝到 pyalink/lib
getMLEnv()

创建命令与运行脚本方式相关

useLocalEnv 与 useRemoteEnv

用 “python” 命令运行,useRemoteEnv会将 PyAlink 任务会自动发送到集群运行

getMLEnv

  1. pyalink/lib下的 jar包拷贝到 flink/lib下
  2. “flink/bin/start-cluster.sh” 命令启动flink
  3. “flink/bin/flink run -m 集群地址:端口 -py PyAlink脚本 -p 并行度”命令运行任务

Alink支持多种数据源,可以参考以下文档。

数据源场景

参考文档

python数组

《python数组如何转换为Alink数据源》

pandas DataFrame

《DataFrame转批式数据》

DataFrame转流式数据》

Alink Java提供的数据导入组件

Alink组件文档

下面的代码片段读取了一个csv文件,setSchemaStr 提供了表信息,如何写schema请参考《Schema简介》

source = CsvSourceBatchOp()\
    .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\
    .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")

下面的代码将一个算法组件与数据源连接

oneHotLabelOp = OneHotTrainBatchOp().setSelectedCols(["category"])
model = source.link(oneHotLabelOp)
# 或 model = oneHotLabel.linkFrom(source)

💡 此代码片段中的算法组件是一个Operator,这类组件通过link或linkFrom方法使用,此外还有Pipeline类组件,通过fit和transform方法使用,详见各组件的文档《Alink组件文档》


Alink支持多种数据导出方法,可以参考Alink组件文档,导出组件与算法组件使用方法类似,通过link或linkFrom方法导出。


Alink的执行命令有以下三种

# 1. 通过打印命令启动
model.print()
# 2. 通过execute方法启动
BatchOperator.execute()
StreamOperator.execute()

💡 使用execute启动程序时,如果也想看到屏幕输出可以使用lazyPrint方法,例如 model.lazyPrint(3)


额外步骤

如何在集群上使用Alink插件参见 https://www.yuque.com/pinshu/alink_guide/gayyvf#RPdbV