Alink教程(Java版)
Alink教程(Python版)

读取MaxCompute数据表,写入MySQL

MaxCompute

MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api

MySQL

Mysql 文档地址为 https://help.aliyun.com/document_detail/95798.html

  1. 创建实例

  1. 这里创建非高可用的实例,且 MySQL 版本选择的 5.7,生产中可以考虑创建对应的高可用实例类型
  2. 将 MySQL 示例创建进和 Flink 全托管相同的网段内
  3. 创建用户
  4. 设置内网网段进白名单,使 Flink 集群可以访问到 MySQL 实例

实例

  1. 代码
package org.example;

import org.apache.flink.ml.api.misc.param.Params;
import org.apache.flink.table.catalog.ObjectPath;

import com.alibaba.alink.common.io.catalog.MySqlCatalog;
import com.alibaba.alink.common.io.catalog.OdpsCatalog;
import com.alibaba.alink.operator.batch.BatchOperator;
import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp;
import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp;
import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject;
import com.alibaba.alink.params.shared.HasOverwriteSink;

public class HelloAlink {

	public static void main(String[] args) throws Exception {
		runReadOdpsWriteMysql();
	}

	public static void runReadOdpsWriteMysql() throws Exception {
		// 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同
		OdpsCatalog odpsCatalog = new OdpsCatalog(
			"odps",
			"default",
			"0.36.4-public",
			"AccessKeyID",
			"AccessKeySecret",
			"Project",
			"EndPoint",
			"RunningProject"
		);

        // 填写好 DefaultDatabase,MySQL URL,PORT, USER, PASSWORD。
		MySqlCatalog mySqlCatalog = new MySqlCatalog(
			"odps2mysql",
			"DefaultDatabase",
			"5.1.27",
			"MySQL URL",
			"PORT",
			"USER",
			"PASSWORD"
		);

		CatalogSourceBatchOp odpsSourceBatchOp = new CatalogSourceBatchOp()
			.setCatalogObject(
				new CatalogObject(
					odpsCatalog,
					// 填写好 Project
					new ObjectPath("Project", "test_alink_iris")
				)
			);

		CatalogSinkBatchOp mysqlSinkBatchOp = new CatalogSinkBatchOp()
			.setCatalogObject(
				new CatalogObject(
					mySqlCatalog,
					new ObjectPath("DATABASE", "test_alink_iris"),
					new Params().set(HasOverwriteSink.OVERWRITE_SINK, true)
				)
			);
        
		odpsSourceBatchOp.link(mysqlSinkBatchOp);

		BatchOperator.execute();
	}
}