在 PyAlink 中,算法组件提供的接口基本与 Java API 一致,即通过默认构造方法创建一个算法组件,然后通过 setXXX 设置参数,通过 link/linkTo/linkFrom 与其他组件相连。代码一般有以下几个基本组成部分
下面分别简述每部分如何编写。
运行环境 分为本地(以多线程方式运行),集群(在flink集群运行),通过以下命令创建
# 1. 本地运行,并发度为2 useLocalEnv(2) # 2. flink集群运行,参数为地址、端口和并发度 useRemoteEnv("127.0.0.1", 8081, 2) # 3. flink集群运行,没有参数,运行前需将 # .../site-packages/pyalink/lib下的jar包拷贝到 pyalink/lib getMLEnv()
创建命令与运行脚本方式相关
用 “python” 命令运行,useRemoteEnv会将 PyAlink 任务会自动发送到集群运行
Alink支持多种数据源,可以参考以下文档。
数据源场景 | 参考文档 |
python数组 | |
pandas DataFrame | |
Alink Java提供的数据导入组件 |
下面的代码片段读取了一个csv文件,setSchemaStr 提供了表信息,如何写schema请参考《Schema简介》。
source = CsvSourceBatchOp()\ .setSchemaStr("sepal_length double, sepal_width double, petal_length double, petal_width double, category string")\ .setFilePath("https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv")
下面的代码将一个算法组件与数据源连接
oneHotLabelOp = OneHotTrainBatchOp().setSelectedCols(["category"]) model = source.link(oneHotLabelOp) # 或 model = oneHotLabel.linkFrom(source)
💡 此代码片段中的算法组件是一个Operator,这类组件通过link或linkFrom方法使用,此外还有Pipeline类组件,通过fit和transform方法使用,详见各组件的文档《Alink组件文档》
Alink支持多种数据导出方法,可以参考Alink组件文档,导出组件与算法组件使用方法类似,通过link或linkFrom方法导出。
Alink的执行命令有以下三种
# 1. 通过打印命令启动 model.print() # 2. 通过execute方法启动 BatchOperator.execute() StreamOperator.execute()
💡 使用execute启动程序时,如果也想看到屏幕输出可以使用lazyPrint方法,例如 model.lazyPrint(3)
如何在集群上使用Alink插件参见 https://www.yuque.com/pinshu/alink_guide/gayyvf#RPdbV。