完成本文步骤有以下要求:
注意:实时计算 Flink 全托管产品可能在使用过程中向您收取费用!
将下面的代码复制,然后保存为本地文件 pyalink_on_vvp.py
,之后用于在 VVP 上执行。
import logging import os import numpy as np import pandas as pd os.environ['ALINK_DEPS_DIR'] = '/flink/usrlib/xxx/' # just set to something, not really used os.environ['ALINK_PLUGINS_DIR'] = '/flink/usrlib/xxx/' # just set to something, not really used for now from pyalink.alink import * getMLEnv() data = np.array([ [1.1, True, "2", "A"], [1.1, False, "2", "B"], [1.1, True, "1", "B"], [2.2, True, "1", "A"] ]) df = pd.DataFrame({"double": data[:, 0], "bool": data[:, 1], "number": data[:, 2], "str": data[:, 3]}) source = BatchOperator.fromDataframe(df, schemaStr='double double, bool boolean, number int, str string') p = "oss://[oss bucket]/path/to/output.csv" sink = CsvSinkBatchOp() \ .setFilePath(FilePath(p, FlinkFileSystem(p))) \ .setOverwriteSink(True) \ .linkFrom(source) BatchOperator.execute()
这段代码简单地从 pandas.DataFrame 创建一个 source,然后写到 OSS 的 csv 文件。代码第 23 行的 “[oss bucket]
”需要替换为 VVP 所绑定的 OSS bucket。后面紧跟这的路径可以根据需要填写。
其中,需要特别注意与一般的 PyAlink 脚本不同的地方:
getMLEnv()
,而不是 useLocalEnv
或者 useRemoteEnv
:这样能通过 PyFlink 来执行 PyAlink 的代码。print
、collectoToDataFrame
等语句:因为 VVP 上执行脚本采用的是 DETACHED 模式,不能使用这些方法。FlinkFileSystem
,从 vvp 直接绑定的 OSS bucket 读写数据:其他形式的数据 source 或者 sink 暂时不支持,以后根据适配情况进行调整。ALINK_DEPS_DIR
和 ALINK_PLUGINS_DIR
:因为 VVP 上安装 whl 包的方式跟一般机器不同,需要设置这两个环境变量避免一些报错,以后可能会根据适配情况进行调整。由于 PyAlink 是以第三方库的形式在 VVP 上运行,因此需要额外添加 PyAlink 包以及 jar 包。官方文档 中介绍了如何添加这些依赖。
pyalink
,单机搜索按钮;pyalink
包;上述步骤完成后,在本机上会有一个 pyalink-1.x.x-py3-non-any.whl
的文件。
这里的步骤参考 官方文档,但略有改变,请注意。
requirements.txt
;pandas>=1.0,<1.2.0 jupyter scipy Rx tqdm deprecation packaging
build.sh
;#!/bin/bash set -e -x yum install -y zip PYBIN=/opt/python/cp37-cp37m/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
deps.zip
的文件。mkdir jars cd jars tar xvf /path/to/pyalink-1.x.x-py3-none-any.whl
pyalink/lib
下的两个 jar 包备用。所有文件上传完之后的效果如下:
pipeline.classpaths: >- file:///flink/usrlib/alink_core_flink_xxx.jar;file:///flink/usrlib/alink_python_xxx.jar
所有配置填写完之后的效果如下:
运行成功后的页面显示大致如下: