这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错,https://help.aliyun.com/document_detail/169596.html
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>AlinkMavenExample</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.alibaba.alink</groupId> <artifactId>alink_core_flink-1.13_2.11</artifactId> <version>0.1-patches-flink-1.13-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!-- Scala Compiler --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile-first</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 直接加上 shade 过程,这样可以使用生成的jar包 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>post-shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizeJar>false</minimizeJar> <shadedArtifactAttached>true</shadedArtifactAttached> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
这里以读 http csv 数据源并写系统 oss 为例:
package org.example; import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.FlinkFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; public class HelloAlink { public static void main(String[] args) throws Exception { runOssSink(); } public static void runOssSink() throws Exception { CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv") .setSchemaStr( "sepal_length double, sepal_width double, petal_length double, petal_width double, category string" ); CsvSinkBatchOp sink = new CsvSinkBatchOp() // 修改 xxx 为新建的 oss bucket .setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx"))) .setOverwriteSink(true); source.link(sink); BatchOperator.execute(); } }
点击 maven 中 package
在 target 目录中找到生成的 shade 包
将上一步生成的jar,作为资源上传到 Flink 全托管集群上
这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认
将图中红框框选的内容,填入合适的内容,这里注意几项
点击如图按钮即可
点击下图中红框中按钮即可启动作业
当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据
OSS Region和Endpoint对照表: https://help.aliyun.com/document_detail/31837.html,这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: http://oss-cn-beijing-internal.aliyuncs.com/
package org.example; import com.alibaba.alink.common.io.filesystem.FilePath; import com.alibaba.alink.common.io.filesystem.OssFileSystem; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; public class HelloAlink { public static void main(String[] args) throws Exception { runOthersOssSink(); } public static void runOthersOssSink() throws Exception { // 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret OssFileSystem oss = new OssFileSystem( "3.4.1", "EndPoint", "Bucket", "AccessKeyID", "AccessKeySecret" ); CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv") .setSchemaStr( "sepal_length double, sepal_width double, petal_length double, petal_width double, category string" ); CsvSinkBatchOp sink = new CsvSinkBatchOp() .setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss)) .setOverwriteSink(true); source.link(sink); BatchOperator.execute(); } }
MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
package org.example; import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink; public class HelloAlink { public static void main(String[] args) throws Exception { runSinkOdps(); } public static void runSinkOdps() throws Exception { // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同 OdpsCatalog odpsCatalog = new OdpsCatalog( "odps", "default", "0.36.4-public", "AccessKeyID", "AccessKeySecret", "Project", "EndPoint", "RunningProject" ); CsvSourceBatchOp source = new CsvSourceBatchOp() .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv") .setSchemaStr( "sepal_length double, sepal_width double, petal_length double, petal_width double, category string" ); CatalogSinkBatchOp sink = new CatalogSinkBatchOp() .setCatalogObject( new CatalogObject( odpsCatalog, // 填写好 Project new ObjectPath("Project", "test_alink_iris"), new Params().set(HasOverwriteSink.OVERWRITE_SINK, true) ) ); source.link(sink); BatchOperator.execute(); } }