MaxCompute EndPoint: https://help.aliyun.com/document_detail/34951.html,这里选择的是外网Endpoint: http://service.cn-beijing.maxcompute.aliyun.com/api
Redis 文档地址为:https://help.aliyun.com/product/26340.html
package org.example; import org.apache.flink.ml.api.misc.param.Params; import org.apache.flink.table.catalog.ObjectPath; import com.alibaba.alink.common.io.catalog.MySqlCatalog; import com.alibaba.alink.common.io.catalog.OdpsCatalog; import com.alibaba.alink.operator.batch.BatchOperator; import com.alibaba.alink.operator.batch.sink.CatalogSinkBatchOp; import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp; import com.alibaba.alink.operator.batch.source.CatalogSourceBatchOp; import com.alibaba.alink.params.io.HasCatalogObject.CatalogObject; import com.alibaba.alink.params.shared.HasOverwriteSink; public class HelloAlink { public static void main(String[] args) throws Exception { runSinkRedis(); } public static void runSinkRedis() throws Exception { // 填写好 EndPoint,AccessKeyID,AccessKeySecret, Project。 RunningProject 填写和 Project 相同 OdpsCatalog odpsCatalog = new OdpsCatalog( "odps", "default", "0.36.4-public", "AccessKeyID", "AccessKeySecret", "Project", "EndPoint", "Project" ); // schema: sepal_length double, sepal_width double, petal_length double, petal_width double, category string CatalogSourceBatchOp source = new CatalogSourceBatchOp() .setCatalogObject( new CatalogObject( odpsCatalog, // 填写好 Project new ObjectPath("Project", "test_alink_iris"), new Params().set(HasOverwriteSink.OVERWRITE_SINK, true) ) ); RedisSinkBatchOp sink = new RedisSinkBatchOp() .setRedisIPs("ip:port") .setRedisPassword("PASSWORD") .setClusterMode(true) .setDatabaseIndex(0) .setKeyCols("sepal_length", "sepal_width", "petal_length", "petal_width") .setValueCols("category") .setPluginVersion("2.9.0"); source.link(sink); BatchOperator.execute(); } }