Alink教程(Java版)
Alink教程(Python版)

部署运行Alink Java版

编写代码

新建Maven项目

  1. Intellij中点击菜单 File -》new -> Project,弹出如下界面

  1. 点击 Next, 有如下界面,填写好项目名字,这里叫 AlinkMavenExample, 点击 Finish,完成项目创建

修改 pom.xml

这里依赖 Flink 1.13 版本,写文章时 Flink 全托管支持的版本最新为 vvr-4.0.12-flink-1.13, 不要使用其他Flink 1.11 版本及以下版本,会在启动作业时超时报错https://help.aliyun.com/document_detail/169596.html

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>AlinkMavenExample</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.alink</groupId>
            <artifactId>alink_core_flink-1.13_2.11</artifactId>
            <version>0.1-patches-flink-1.13-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.13.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.13.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile-first</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <!-- 直接加上 shade 过程,这样可以使用生成的jar包 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <id>post-shade</id>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>false</minimizeJar>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

编写 main 函数

这里以读 http csv 数据源并写系统 oss 为例:

  1. 在 org.example 中新建 Java 类 HelloAlink

  1. 编写代码
package org.example;

import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.FlinkFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;

public class HelloAlink {
    
    public static void main(String[] args) throws Exception {
        runOssSink();
    }
    
    public static void runOssSink() throws Exception {
        CsvSourceBatchOp source = new CsvSourceBatchOp()
            .setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
            .setSchemaStr(
            "sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
        );
        
        CsvSinkBatchOp sink = new CsvSinkBatchOp()
            // 修改 xxx 为新建的 oss bucket
            .setFilePath(new FilePath("/alink_data/iris_new.csv", new FlinkFileSystem("oss://xxx")))
            .setOverwriteSink(true);
        
        source.link(sink);
        
        BatchOperator.execute();
    }
}
  1. 编译为 shade 的 jar 包

点击 maven 中 package

在 target 目录中找到生成的 shade 包


上传 jar 包

将上一步生成的jar,作为资源上传到 Flink 全托管集群上

作业开发

新建作业

输入作业信息

这里名字定为 hello_alink , 文件类型为 流作业 / JAR ,部署目标为默认,存储位置为默认

配置作业

将图中红框框选的内容,填入合适的内容,这里注意几项

  1. Entry Point Class 为 org.example.HelloAlink,参考代码中 main 函数的位置
  2. 并行度这里我们设置为 1 , 因为这里是一个很小的任务,为了省资源,填写最小值
  3. 引擎版本选择 vvr-4.0.12-flink-1.13 ,原因可以参考 "修改 pom.xml" 章节
  4. Flink 重启策略配置选择 No Restarts ,因为我们提交的为一个批任务,如果一直重启的话,会造成任务失败,不结束


上线

点击如图按钮即可


作业运维

启动作业

点击下图中红框中按钮即可启动作业

日志查看

结果查看

当前任务为将iris数据写入新建的 oss bucket 路径中,所以当任务完成是,我们可以使用 oss 客户端查看是否写入数据

Alink实例

读 Http 写入到 自定义 Oss 中

  1. 选择Oss Bucket

OSS Region和Endpoint对照表: https://help.aliyun.com/document_detail/31837.html,这里因为使用的是和 Flink 全托管集群相同 Region,所以 Endpoint 选择了内网Endpoint: http://oss-cn-beijing-internal.aliyuncs.com/

  1. 代码如下:
package org.example;

import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.filesystem.OssFileSystem;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CsvSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;

public class HelloAlink {

	public static void main(String[] args) throws Exception {
		runOthersOssSink();
	}

	public static void runOthersOssSink() throws Exception {
        // 填写好 EndPoint,Bucket,AccessKeyID,AccessKeySecret
		OssFileSystem oss =
			new OssFileSystem(
				"3.4.1",
				"EndPoint",
				"Bucket",
				"AccessKeyID",
				"AccessKeySecret"
			);

		CsvSourceBatchOp source = new CsvSourceBatchOp()
			.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
			.setSchemaStr(
				"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
			);

		CsvSinkBatchOp sink = new CsvSinkBatchOp()
			.setFilePath(new FilePath("/alink_data/iris_new_oss_filesystem.csv", oss))
			.setOverwriteSink(true);

		source.link(sink);

		BatchOperator.execute();
	}
}

读 Http 写入到 MaxCompute 中

  1. 选择 MaxCompute project

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

  1. 代码如下:
package org.example;

import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.OdpsCatalog;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CsvSourceBatchOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

	public static void main(String[] args) throws Exception {
		runSinkOdps();
	}

	public static void runSinkOdps() throws Exception {
		// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
		OdpsCatalog odpsCatalog = new OdpsCatalog(
			"odps", "default",
			"0.36.4-public", "AccessKeyID", "AccessKeySecret",
			"Project",
			"EndPoint",
			"RunningProject"
		);

		CsvSourceBatchOp source = new CsvSourceBatchOp()
			.setFilePath("https://alink-test-data.oss-cn-hangzhou.aliyuncs.com/iris.csv")
			.setSchemaStr(
				"sepal_length double, sepal_width double, petal_length double, petal_width double, category string"
			);

		CatalogSinkBatchOp sink = new CatalogSinkBatchOp()
			.setCatalogObject(
				new CatalogObject(
					odpsCatalog,
                    // 填写好 Project
					new ObjectPath("Project", "test_alink_iris"),
					new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
				)
			);

		source.link(sink);

		BatchOperator.execute();
	}
}