Alink教程(Java版)
Alink教程(Python版)
该文档涉及的组件

在Flink集群部署Alink

介绍

在Flink集群部署Alink,需要部署三个Jar包(本文会有一个部分专门讲述如何获取),对于不同Flink集群环境,部署方式存在区别,本文将介绍Standalone、Yarn、Docker和K8s。

集群运行Alink程序的提交命令与运行普通 Flink 程序相同,详细步骤见Flink官方部署文档,通常,只要将Alink程序依赖的alink_core_flink库添加到 Flink 的 lib 目录下就能用集群资源计算了。Flink集群有不同的搭建方法(本地、YARN、Kubernetes等)和部署模式,请参考Flink文档首先选择合适的部署模式搭建好集群。

Application、Session和Per-job模式

  • Application模式 为每个程序启动一个集群,该集群仅用于该程序,程序完成后,集群将被清理。该模式的特点是减少了客户端的工作,它将其他模式下客户端需要完成的:(1)下载依赖(2)生成JobGraph(3)将依赖和JobGraph发送到集群,在集群完成,减轻了客户端的负担。
  • Per-Job 模式 (已废弃)为每个作业(Job)启动一个集群,该集群仅适用于该作业, 作业结束后,集群将被清理。
  • Session模式 预先分配集群资源,不同作业可能在同一个TaskManager上运行。

部署模式选择注意

  • 当一个main()方法里有多个execute()时需要用application或session attached模式。因为有多个execute()的程序会产生多个作业,Per-job模式下,对此会申请多个集群资源,而不同作业间的资源隔离导致只会成功执行其中一个作业。
  • 使用application模式需要将程序的jar包提前放到所有 Flink 组件(JobManager、TaskManager)都能找到的地方,例如:(1)每个机器的$FLINK_HOME/lib路径下都上传一个程序的jar包,(2)上传到HDFS,再通过 Flink 的 yarn.provided.lib.dirs 参数指定存储的路径为hdfs路径。

接下来介绍如何获取集群部署所需的jar包,以及如何打包自己的 Alink 程序。

准备集群部署所需的jar包

由于 Alink 可以通过 Java 和 Python 两种方式提交,建议在集群部署的时候将相关 Jar 包一起部署上去。尽管 Alink Java 没有单独提供集群部署所需 Jar 包的下载,但它们与 PyAlink 所使用的一致,所以可以从 PyAlink 的lib目录中获取。 具体方式如下:

方式一:

按照Alink快速开始 安装好 PyAlink, 执行命令

python3 -c "import os; import pyalink; print(os.path.join(pyalink.__path__[0], 'lib'))"

在 Python 标准输出中即可拿到对应的 PyAlink lib目录的路径,拷贝该目录下的jar包。

方式二:

下载PyAlink 1.10的whl包,相关链接:链接1链接2(MD5: f92b6fcff0caea332f531f5d97cb00fe),解压开PyAlink 1.10的whl包,解压后的文件结构如下图所示,在pyalink/lib下为集群部署所需的三个jar包。

将jar包添加到集群环境中

Standalone 和 YARN

  • 启动集群前,将上节获取到的lib目录中的 jar 包拷贝到 Flink 的lib目录。
  • 在 Flink 配置中增加:classloader.resolve-order: parent-first,在配置文件conf/flink-conf.yaml里添加,该配置的功能见Flink文档Inverted Class Loading Order》

Docker 和 Kubernetes

在 Dockerfile 中用 ADD 命令将 Alink 的 jar 包,添加到镜像中的 Flink 的 lib 目录。

ADD alink_connector_all-1.1-SNAPSHOT.jar $FLINK_LIB_DIR/alink_connector_all-1.1-SNAPSHOT.jar ADD alink_core_flink-1.10_2.11-1.1-SNAPSHOT.jar $FLINK_LIB_DIR/alink_core_flink-1.10_2.11-1.1-SNAPSHOT.jar

ADD alink_python-1.1-SNAPSHOT-shaded.jar $FLINK_LIB_DIR/ alink_python-1.1-SNAPSHOT-shaded.jar

使用Flink官方提供的部署方式,其中增加classloader.resolve-order: parent-first配置项。参考:

https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html

编译和集群运行Alink程序

当已经在idea调试运行成功了 Alink 程序,在编译打包前,为了避免与集群的 Flink 版本冲突,首先修改 pom.xml ,将 Flink lib目录下有的依赖加上 <scope>provided</scope> 标签。

<!--
Note: Add scope as `provided` to exclude flink packages when running on cluster.
Ps: If we add `provided` as default, it will be crash when running on local.
-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${alink.scala.major.version}</artifactId>
<version>${alink.flink.version}</version>
<scope>provided</scope> <!--修改 flink 相关依赖-->

......
</dependency>

使用maven 打包 Alink 程序,并用shade 插件处理依赖,命令如下:

mvn clean package shade:shade

集群运行Alink程序的提交命令与运行普通 Flink 程序相同,详细步骤见Flink官方部署文档

额外步骤

如果代码中使用了 Alink 插件功能,并且集群无法联网,那么需要额外准备 plugin 目录。

当前,这部分功能还在不断改进中。可以尝试以下两种方式绕过:

1. 当提交作业的机器和集群机器具有相同的文件系统时:

    1. 使用 AlinkGlobalConfiguration.setPluginDir()设置插件目录为一个提交作业的机器和集群机器都有权限访问的绝对路径。
    2. 在提交作业的机器上,使用本地模式单机运行作业,将自动下载好所需的插件到所设置的插件目录下。
    3. 将提交作业的机器上插件目录的内容同步到所有集群机器上的插件目录下。

2. 当能够修改 Flink 集群的 flink-conf.yaml,并且能够重启集群时:

    1. 在提交作业的机器上,使用本地模式单机运行作业,将自动下载好所需的插件。默认下载目录为相对路径 plugins 下,使用 AlinkGlobalConfiguration.setPluginDir()设置插件目录。
    2. 选择一个集群机器都能访问的路径,例如 /tmp/alink_plugins,将提交作业的机器上插件目录的内容同步到所有集群机器上的该路径下。
    3. 在 flink-conf.yaml 中添加一行 env.java.opts: -DALINK_PLUGINS_DIR=xxx,"xxx" 替换为所选择的绝对路径,例如 env.java.opts: -DALINK_PLUGINS_DIR=/tmp/alink_plugins
    4. 重启 Flink 集群。