MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
Mysql 文档地址为 https://help.aliyun.com/document_detail/95798.html
package org.example; import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath; import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink; public class HelloAlink { public static void main(String[] args) throws Exception { runReadOdpsWriteMysql(); } public static void runReadOdpsWriteMysql() throws Exception { // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同 OdpsCatalog odpsCatalog = new OdpsCatalog( "odps", "default", "0.36.4-public", "AccessKeyID", "AccessKeySecret", "Project", "EndPoint", "RunningProject" ); // 填写好 DefaultDatabase,MySQL URL,PORT, USER, PASSWORD。 MySqlCatalog mySqlCatalog = new MySqlCatalog( "odps2mysql", "DefaultDatabase", "5.1.27", "MySQL URL", "PORT", "USER", "PASSWORD" ); CatalogSourceBatchOp odpsSourceBatchOp = new CatalogSourceBatchOp() .setCatalogObject( new CatalogObject( odpsCatalog, // 填写好 Project new ObjectPath("Project", "test_alink_iris") ) ); CatalogSinkBatchOp mysqlSinkBatchOp = new CatalogSinkBatchOp() .setCatalogObject( new CatalogObject( mySqlCatalog, new ObjectPath("DATABASE", "test_alink_iris"), new Params().set(HasOverwriteSink.OVERWRITE_SINK, true) ) ); odpsSourceBatchOp.link(mysqlSinkBatchOp); BatchOperator.execute(); } }