

这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错,https://help.aliyun.com/document_detail/169596.html
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>AlinkMavenExample</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.13_2.11</artifactId>
<version>0.1-patches-flink-1.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.13.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 直接加上 shade 过程,这样可以使用生成的jar包 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>post-shade</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>这里以读 http csv 数据源并写系统 oss 为例:

package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.FlinkFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOssSink();
}
public static void runOssSink() throws Exception {
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
// 修改 xxx 为新建的 oss bucket
.setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx")))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}点击 maven 中 package

在 target 目录中找到生成的 shade 包

将上一步生成的jar,作为资源上传到 Flink 全托管集群上


这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认

将图中红框框选的内容,填入合适的内容,这里注意几项

点击如图按钮即可

点击下图中红框中按钮即可启动作业


当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据

OSS Region和Endpoint对照表: https://help.aliyun.com/document_detail/31837.html,这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: http://oss-cn-beijing-internal.aliyuncs.com/
package org.example;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.OssFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runOthersOssSink();
}
public static void runOthersOssSink() throws Exception {
// 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret
OssFileSystem oss =
new OssFileSystem(
"3.4.1",
"EndPoint",
"Bucket",
"AccessKeyID",
"AccessKeySecret"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CsvSinkBatchOp sink = new CsvSinkBatchOp()
.setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss))
.setOverwriteSink(true);
source.link(sink);
BatchOperator.execute();
}
}MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
package org.example;
import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.catalog.ObjectPath;
import com.alibaba.alink.common.io.catalog.OdpsCatalog;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import com.alibaba.alink.params.shared.HasOverwriteSink;
public class HelloAlink {
public static void main(String[] args) throws Exception {
runSinkOdps();
}
public static void runSinkOdps() throws Exception {
// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
OdpsCatalog odpsCatalog = new OdpsCatalog(
"odps", "default",
"0.36.4-public", "AccessKeyID", "AccessKeySecret",
"Project",
"EndPoint",
"RunningProject"
);
CsvSourceBatchOp source = new CsvSourceBatchOp()
.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
.setSchemaStr(
"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
);
CatalogSinkBatchOp sink = new CatalogSinkBatchOp()
.setCatalogObject(
new CatalogObject(
odpsCatalog,
// 填写好 Project
new ObjectPath("Project", "test_alink_iris"),
new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
)
);
source.link(sink);
BatchOperator.execute();
}
}