完成本文步骤有以下要求:
注意:实时计算 Flink 全托管产品可能在使用过程中向您收取费用!
将下面的代码复制,然后保存为本地文件 pyalink_on_vvp.py,之后用于在 VVP 上执行。
import logging
import os
import numpy as np
import pandas as pd
os.environ['ALINK_DEPS_DIR'] = '/flink/usrlib/xxx/' # just set to something, not really used
os.environ['ALINK_PLUGINS_DIR'] = '/flink/usrlib/xxx/' # just set to something, not really used for now
from pyalink.alink import *
getMLEnv()
data = np.array([
[1.1, True, "2", "A"],
[1.1, False, "2", "B"],
[1.1, True, "1", "B"],
[2.2, True, "1", "A"]
])
df = pd.DataFrame({"double": data[:, 0], "bool": data[:, 1], "number": data[:, 2], "str": data[:, 3]})
source = BatchOperator.fromDataframe(df, schemaStr='double double, bool boolean, number int, str string')
p = "oss://[oss bucket]/path/to/output.csv"
sink = CsvSinkBatchOp() \
.setFilePath(FilePath(p, FlinkFileSystem(p))) \
.setOverwriteSink(True) \
.linkFrom(source)
BatchOperator.execute()
这段代码简单地从 pandas.DataFrame 创建一个 source,然后写到 OSS 的 csv 文件。代码第 23 行的 “[oss bucket]”需要替换为 VVP 所绑定的 OSS bucket。后面紧跟这的路径可以根据需要填写。
其中,需要特别注意与一般的 PyAlink 脚本不同的地方:
getMLEnv(),而不是 useLocalEnv 或者 useRemoteEnv:这样能通过 PyFlink 来执行 PyAlink 的代码。print、collectoToDataFrame 等语句:因为 VVP 上执行脚本采用的是 DETACHED 模式,不能使用这些方法。FlinkFileSystem,从 vvp 直接绑定的 OSS bucket 读写数据:其他形式的数据 source 或者 sink 暂时不支持,以后根据适配情况进行调整。ALINK_DEPS_DIR 和 ALINK_PLUGINS_DIR:因为 VVP 上安装 whl 包的方式跟一般机器不同,需要设置这两个环境变量避免一些报错,以后可能会根据适配情况进行调整。由于 PyAlink 是以第三方库的形式在 VVP 上运行,因此需要额外添加 PyAlink 包以及 jar 包。官方文档 中介绍了如何添加这些依赖。
pyalink,单机搜索按钮;pyalink 包;
上述步骤完成后,在本机上会有一个 pyalink-1.x.x-py3-non-any.whl 的文件。
这里的步骤参考 官方文档,但略有改变,请注意。
requirements.txt;pandas>=1.0,<1.2.0 jupyter scipy Rx tqdm deprecation packaging
build.sh;#!/bin/bash
set -e -x
yum install -y zip
PYBIN=/opt/python/cp37-cp37m/bin
"${PYBIN}/pip" install --target __pypackages__ -r requirements.txt
cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
rm -rf __pypackages__docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
deps.zip 的文件。mkdir jars cd jars tar xvf /path/to/pyalink-1.x.x-py3-none-any.whl
pyalink/lib 下的两个 jar 包备用。所有文件上传完之后的效果如下:

pipeline.classpaths: >- file:///flink/usrlib/alink_core_flink_xxx.jar;file:///flink/usrlib/alink_python_xxx.jar
所有配置填写完之后的效果如下:

运行成功后的页面显示大致如下:
